Building the Grubhub Delivery Job Scheduler

Posted by Kiel Loysen & Ovi Tisler on

At some point in their careers, most software engineers have been asked to build features that need to run on a schedule. Sending a daily email, reporting on system health, and cleaning up stale data are a few examples of such automated tasks. Usually, these tasks can be implemented with Cron or by using a third-party scheduling tool like Obsidian. While these tools work well, they don’t have a good solution for high availability, such as running in multiple regions.

In this blog post, we’ll show how we created a solution that meets these requirements by implementing a stand-alone job scheduling microservice using Cassandra for global leader election.  This service will be solely focused on managing scheduled jobs and contacting other services to let them know when to do the work — but not how to do the work.

Requirements

In order to satisfy our scaling and availability requirements, we need to be able to:

  1. Run scheduled jobs in multiple data centers.
  2. Balance load between multiple instances of a service and scale horizontally.
  3. Centralize monitoring and disaster recovery.

Datacenter aware

Let’s tackle our first requirement — “Run scheduled jobs in multiple data centers.”

When we say “data center,” we’re referring to the different AWS regions where we host our system. Grubhub currently uses two AWS regions — one in the east and one in the west — and we wanted to design our system so it’s easy to add more if needed.  We also need to be able to run a job in any of our supported AWS regions seamlessly — and this is where Cassandra comes into play.  

Currently, our Cassandra datastore is the only layer of our architecture that can talk across data centers.  This ability will be a key part of the solution we are building in this post.  Our plan is to design a table in Cassandra that will be used to create a simple globally aware leader election process.  For instance, if we have 6 microservice nodes split across 2 data centers, the leader election will decide which node has the right to perform our job.  But before we can jump into writing the CQL statements for our tables, we should touch on a few important topics.  

Cassandra Query best practices

When working with Cassandra, we have plenty of ways to tune our queries to be optimal for any given situation.  Every query can have its own consistency level that lets us optimize for accuracy of data or performance.  Our specific use case requires the absolute highest accuracy of data which we can achieve using:

  1. Quorum consistency level.
  2. Lightweight transactions (if not exists, writes)

These types of queries are very expensive and are generally frowned upon unless absolutely necessary.  In this section, I’ll touch on what makes them so expensive and why we will need to them despite the disadvantages they bring.

Quorum write

In a perfect world, all of our queries run at the LOCAL_ONE consistency so we can have an incredibly fast response time.  We choose LOCAL_QUORUM when we need our data to be more consistent because it allows for a majority of Cassandra nodes in a data center to agree on the latest state.

In our case of leader election, LOCAL_QUORUM would occasionally result in each data center having their own leader.  Unfortunately, we need 1 leader to be elected across the entire Cassandra cluster, which can only be done with the QUORUM level.  QUORUM is less performant because each data center will need to have a majority of its replica nodes agree to the latest state of the data.  Any QUORUM level queries will also fail if our data centers lose connection to each other. This connection loss is a rare scenario, to be sure, but one we must consider as we design our solution.

Lightweight transactions

Lightweight transactions are what you do when strong consistency simply isn’t enough.  They are quite complicated but essentially allows the node that receives the query to try and become the “leader” and write a valid record that all other nodes will honor as being the truth (sounds familiar doesn’t it?).  Technically, our lightweight transactions will run at another level of consistency called SERIAL that is essentially a multi-phase QUORUM read (ouch!).  This is another severe impact on the performance of our query but hopefully, we can find a way to live with it.  

Now that we’ve touched on the core concepts our leader election query is based on, we can start to design our tables.

Simple leader election

We will create a table called `leader_election` that will have columns for job name and leader name:


CREATE TABLE IF NOT EXISTS leader_election (
    		job_name text,
    		leader_name uuid,
    		primary key(job_name)
	);

 

We’ll use the following query to try and write their own unique id to this table:  


INSERT INTO leader_election 
	(JOB_NAME, ID) 
	VALUES (“ATTN_CHECK”, 1) 
	IF NOT EXISTS 
	USING TTL 90;

Here is what that same query looks like using the datastax java driver:

LeaderElectionDao.java:

ResultSet resultSet  = session.execute(getInsertElectionStatement()
.bind(job, leaderName, getLeaderIp(), getTtl())
	.setConsistencyLevel(getConsistency(isGlobal))
      	.setSerialConsistencyLevel(getSerialConsistency(isGlobal));

Row row = resultSet.one();
return row.getBool(0); // did this node win the election?

The node that writes its record first will win the election and be cleared to execute the job.

In the code above, we’re using a Time to Live (TTL) value for our records. The TTL allows our election to act as a “lease” so if a record for the same job is written while a lease is still active it will be rejected.  Nodes in multiple data centers can attempt to write their lease record to the database — but our use of QUORUM will ensure only one of these nodes will “win” — i.e., be free to execute our job. We don’t expect these leases to be long-lived or reused across multiple executions, but allowing for both scenarios may be useful to us down the road.  

Distributing load

Typically, the microservice responsible for running the jobs is also actively used for normal API operations in the same domain. If our job has high processing demands, we’re now effectively overloading our leader node, which will jeopardize both the job and its normal operations.  

Here’s where another core requirement comes into play: we want to balance the load between multiple instances of a service and scale horizontally.

We have a way to divide up our work into segments that align perfectly with Grubhub’s business needs while ensuring the load remains balanced.  We call these divisions markets.  A market is a discrete, geographically bounded area made up of restaurants, delivery drivers, and staff we employ to support our operations in that area.  A market can start out as an entire city, and as business increases there, it can be further divided.  

The market structure grants us a certain level of equilibrium toward the effort required to manage and analyze each instance of a service since it fits perfectly into our job model.  Rather than analyzing the health of every Market in one job, we can easily divide it into N executions.  

Dividing each market into N executions requires some updates to our leader election table:


CREATE TABLE IF NOT EXISTS leader_election_v2 (
    		job_name text,
    		group_id uuid,
    		leader_name uuid,
    		primary key((job_name, group_id))
	);

Now instead of each node participating in one election, it will participate in N elections, and only execute the markets for which it won the lease. Executing this requires a minor update to our Java code.

LeaderElectionDao.java:

ResultSet resultSet  = session.execute(getInsertElectionStatement()
.bind(job, groupId, leaderName, getLeaderIp(), getTtl())
	.setConsistencyLevel(getConsistency(isGlobal))
      	.setSerialConsistencyLevel(getSerialConsistency(isGlobal));

Row row = resultSet.one();
return row.getBool(0); // did this node win the election?

Now, we’ve spread the load evenly across all our nodes, reducing the impact on our system and also minimizing the total time it will take to run.

Almost there: ensuring successful reporting across regions

We’re very close to our solution but must still make some adjustments. As engineers who occasionally must go on call — which keeps us motivated to create stable systems — we choose to be safe and run our new election process in a pre-production environment with lots of monitoring. After our latest change, we note we aren’t reporting successful elections for all of our regions. Roughly 10% of all regions per run are going leaderless and without execution. This is a major problem — we can design our process to adapt to blips in the system, but excluding 10% of our regions is much more than just a blip.

After some intense investigation we discovered something interesting — while 10% of our regions were going leaderless, we still find the correct number of leader records written to our table. It appears that the 10% failure rate occurred because of write timeouts. The “winning” node doesn’t know that it’s cleared to write the lease record.

Let’s see if tweaking our code can help us resolve these failures:


session.execute(getInsertElectionStatement()
.bind(job, groupId, leaderName, getLeaderIp(), getTtl())
	.setConsistencyLevel(getConsistency(isGlobal))
      	.setSerialConsistencyLevel(getSerialConsistency(isGlobal));
 
ResultSet resultSet = session.execute(getSelectElectionWinnerStatement()
.bind(job, groupId)
.setConsistencyLevel(getConsistency(isGlobal)));
 
Row row = resultSet.one();
UUID leader = row.getUUID("leader_name");
return leader.equals(leaderName); // did this node win the election?

Previously, we were reading the result of the UPSERT to see if row insertion was successful. Now we’re running a separate query after the fact to see if the leader column matches our node’s universally unique identifier (UUID). Adding another quorum read to our process isn’t the most ideal solution, but it works and actually fixes all our of election misses.

We’re now in a state where each node is getting the list of markets, running an election for each one, and then executing the job only for the markets where it holds the leader lease. This is a pretty good place to be most of the time, but we can still make improvements.

The messaging queue

When we introduced our market concept earlier, we mentioned that it allows for size equilibrium to exist between our different areas of business. This is true to an extent, but at some point, over-dividing up markets — i.e. large, dense urban areas like New York City — delivers diminishing returns. This means we have a handful of heavy markets with the potential to stress our nodes. In isolation, these markets are processed just fine, but if we have another overwhelmed node that wins all or most of these heavy markets, we could have a problem on our hands. If we reached the point where the node ran out of CPU or memory, it would crash the execution of all the jobs on all of our most important markets!

The solution for this involves taking advantage of a very prominent part of our infrastructure. Using Amazon SNS and SQS, we can set up a topic in each AWS datacenter, and have all of our nodes in a datacenter read from the same queue. Now, our leader election process is more of a fight for the right to publish a message to our topic instead of actually running the job. Once the queue starts filling up with messages, the service nodes will consume one at a time and process the target market. Now, the nodes that get our big markets can focus on their work while the other nodes process the rest of the messages on the queue.

Standalone service

So far, we’ve implemented our solution in the microservice that will also be executing the job. This is fine, as long as we break out our code so it can be pulled in by another microservice that needs the behavior. In fact, this is what other teams at Grubhub do currently, and it’s working well for them.

Our team decided to take a different approach of creating a new microservice focused exclusively on scheduling our background jobs. This service would contain all the logic we’ve created thus far and will be very easy to split out because we are already using AWS queues to notify our executing services to start processing. This service was compelling to us for a number of reasons, especially because it covers another major requirement — centralized monitoring and disaster recovery.

Monitoring and disaster recovery

Dedicating one microservice to global leader election and task initiation allows us to closely monitor the processes health. We can make sure elections are happening at a healthy rate with minimal errors. This service won’t be responsible for other logic, so we don’t need to worry about other code potentially taxing its resources when our leader election needs it.

Our dedicated microservice also allows us to handle our previously mentioned split brain scenario with ease. Unfortunately, it’s a bit manual, but if we’re in a situation where QUORUM writes can’t succeed, we can choose the healthiest data center and flip a config setting. The nodes will fall back to using LOCAL_QUORUM while disabling the nodes in the rest of the datacenters.

Other benefits to our approach

We’ve been ignoring the other major aspect of highly available task scheduling — the part actually scheduling the times to kick off work. With this single service we have a one-stop shop that enables us to do the following without any code changes:

  1. Create new jobs.
  2. Edit existing jobs
  3. Disabling jobs if something is in an unhealthy state.
  4. Manually running a job outside its normal schedule.

The ability to dynamically update the markets a job should run on is key to our business because we like to test new jobs in a few markets at a time. This helps us ensure each job’s running as intended and has the desired impact. This design is also very modular and will allow us to replace our leader election implementation without disruption to other Grubhub services if we choose to do so.

Performance

Our biggest concern with this approach was overloading Cassandra with too many high-impact queries at once while taking down multiple processes from the fallout. This service allows us to control exactly how much is run at what time through our use of the TTL, and via storing all job-scheduling in-house. Currently, our service is set up so the nodes wake up every 2 minutes, perform elections, and run any tasks for the markets they have “won.” Essentially, we’ve maximized our concurrent number of elections to the number of markets our business has created.

Successful outcomes

After reviewing our approach, you may be asking, “Does this solution actually work?”

Yes, it does! Our Job Scheduling service has been working in production for roughly 6 months without a single outage. We have fewer than 10 jobs communicating with a variety of different microservices. In fact, every potential issue we’ve looked into has wound up being a problem with the consuming service, not our scheduler.