Building highly concurrent and scalable applications with Akka

Posted by Natalia Zelenskaya on

Overview

At Grubhub, we always keep an eye on a new technologies, analyzing and evaluating their suitability for building tools that keep our hungry diners happy. Users have little patience for software that doesn’t respond immediately — in fact, only 16% of users will try a failing application more than twice. Working software that responds quickly is even more vital for our users, who are often on our website or mobile app because they’re hungry or looking for a favorite meal.

Resilient and fast applications are central to delivering a great user experience. But building such applications that can also handle high amounts of traffic can present challenges for an engineering team.

For an example, let’s consider the scenario of confirming an order with a restaurant. Once a user places an order at Grubhub, the restaurant must confirm that they can actually prepare the food and ideally, give the user a time estimate for delivery or pick-up. What might seem a trivial data flow is, in reality, a complex system with different triggers, notifications, and means of communication. It’s challenging to test, deploy, and maintain such systems because of their complex structure — and this same complexity means we’re dealing with multiple potential points of failure.

To make the Grubhub system resilient, we must keep all components isolated, so if one fails, it will not provoke a cascading effect, eventually pulling the entire system down. Of course, isolated system components still must interact with each other. The best means of ensuring interaction between isolated components is via asynchronous message passing. Why? Because it ensures loose coupling and because it establishes boundaries between different parts of the system. Asynchronous message passing also ensures fast, responsive applications, as it provides non-blocking communications while allowing more efficient use of system resources.

With these architectural principles in mind, we decided to try out Akka, the toolkit for asynchronous logic flow built on the actor model. Akka provides scalable, real-time data processing, which is perfect for large, complex ordering systems like Grubhub’s. The actor is the core primitive of Akka. An actor is a lightweight but powerful object that gives you a simple, high-level abstraction for distribution, concurrency, and parallelism. You can instantiate several million actors per GB of heap memory. Each actor can receive asynchronous messages, make local decisions, create more actors, send more messages, and determine how to respond to the next message received.

Akka-based systems can easily scale up, thanks to a lightweight-concurrency model and they can also scale out by adding more cluster nodes for hosting Akka actors. Moreover, because Akka is a unified runtime and programming model, we don’t need to involve external services in our Akka ecosystem. For example, we can easily avoid using message queues or enterprise service buses, which bring more complexity to the platform and add more cost.

Akka-based system design

Now that we’ve explored Akka basics, let’s get back to our “simple” use case: Grubhub wants to confirm a user’s order with a restaurant. We can break this task into three major components:

  1. For our order confirmation flow, we’re going to use an Akka-FSM (Finite State Machine).
  2. For interaction with external components, we’ll use Akka-Http.
  3. And finally, for scalability and cloud deployment, we’re going to use Akka-Clustering.

 

 

All message exchanges occur in the asynchronous “fire and forget” fashion, giving us a non-blocking flow behavior. Each component is scalable and can have multiple instances in the cluster.

Actors in our system design

  • Order FSM Akka actor — Order FSM Akka actor is managing a state of a single order and at each state it can be persisted in the DB or S3 for fault-tolerance. This actor is initialized by the Frontend Dispatcher Akka actor once it receives a new confirmation request from the Order service. Order FSM Akka actor releases system resources once an order is in its final state or the lifetime of the actor reaches its dedicated time interval.
  • Frontend Dispatcher Akka actor — A key role of the Frontend Dispatcher Akka actor is to create new Order FSM Akka actors based on Order Service requests. This actor also channels asynchronous, peer-to-peer messages through the system, connecting a phone service with a required instance of an Order FSM Akka actor.
  • Public REST API Akka http — Our REST API is based on Akka Http and functions as an HTTP-based integration layer that connects the external world with the internal actors system.

Since we now have a global overview of the Grubhub system and its actors, let’s take a look at details of implementation, including code samples.

Order confirmation Akka-FSM

The Grubhub order confirmation flow can be presented as a finite state machine. Each event in the order confirmation flow transfers data from one state to the next:

 

 

Defining Data, Events, States and FSM with Akka

sealed trait FsmOrderData
case object NoOrder extends FsmOrderData
case class HandlingOrder(orderNumber: String) extends FsmOrderData
 
sealed trait OrderConfirmationEvent
case class CallStarted(orderNumber: String) extends OrderConfirmationEvent
case class PinEntered(pin: Short, attempt: Short) extends OrderConfirmationEvent
case class OrderEstimated(estimated: FiniteDuration) extends OrderConfirmationEvent
case object NoAnswer extends OrderConfirmationEvent
case object TakePoisonPill extends OrderConfirmationEvent
 
sealed trait FsmOrderState
case object Waiting extends FsmOrderState
case object Started extends FsmOrderState
case object PinAccepted extends FsmOrderState
 
class FsmOrderConfirmation(config: Config) extends Actor with FSM[FsmOrderState, FsmOrderData] with ActorLogging 

FsmOrderData is data associated with a state. Once order confirmation flow is initialized, this data will contain an order number.
OrderConfirmationEvent triggers the transition from one state into another.
TakePoisonPill is a special internal event that fires automatically after a specific timeout to kill an actor and release the resources.
FsmOrderState is an order confirmation state.
FmsOrderConfirmation is an FSM Akka actor which is initialized with configuration settings at construction time.

Initializing FSM

import context.dispatcher

val expectedPin: Short = config.getString("services.banana-phone.order-pin").toShort
 val pinRetries: Short = config.getString("services.banana-phone.pin-retries").toShort
 val retryCallInterval: FiniteDuration = FiniteDuration(config.getDuration("services.banana-phone.retry-call-interval").getSeconds, TimeUnit.SECONDS)
 val poisonPillTimeout: FiniteDuration = FiniteDuration(config.getDuration("services.banana-phone.take-poison-timeout").getSeconds, TimeUnit.SECONDS)

startWith(Waiting, NoOrder)
 context.system.scheduler.scheduleOnce(poisonPillTimeout, self, TakePoisonPill)

 

At initialization, we read the following configuration settings:
expectedPin — The expected confirmation pin number.
pinRetries — The number of permitted attempts to enter the correct pin.
retryCallInterval — The time interval that must pass before the system can retry a call after a “hangUp”.
poisonPillTimeout — The time interval after which an actor stops further attempts to call and communicate.

These settings will be used in the FSM logic.

We initialize actor’s state with “Waiting” and data with “NoOrder”. Then, we register a “TakePoisonPill” event to fire up after the given time interval. This event will kill an Akka actor regardless of its current state and release the system resources.

Handling events in FSM

when(Waiting) {
 case Event(CallStarted(orderNumber), NoOrder) =>
 sender() ! askForPin(pinRetries)
 goto(Started) using HandlingOrder(orderNumber)
 }

when(Started) {
 case Event(PinEntered(pin, remainingAttempts), _) =>
 if (pin == expectedPin) {
 sender() ! askForEstimation()
 goto(PinAccepted)
 }
 else {
 if (remainingAttempts > 0) {
 sender() ! askForPin(remainingAttempts)
 stay()
 }
 else {
 sender() ! HangUp(confirmed = false, estimation = None)
 stop()
 }
 }

case Event(NoAnswer, HandlingOrder(orderNumber)) =>
 retryCall(orderNumber)
 stop()
 }

when(PinAccepted) {
 case Event(OrderEstimated(estimation), _) =>
 sender() ! HangUp(confirmed = true, estimation = Some(estimation))
 stop()

case Event(NoAnswer, _) =>
 sender() ! HangUp(confirmed = true, estimation = None)
 stop()
 }

 

Every when() block defines a current state of the order. Each state handles a specific set of events, always ending with the transfer of an Akka actor to its next state.

Let’s take a closer look with the following code sample:

when(Waiting) {
 case Event(CallStarted(orderNumber), NoOrder) =>
 sender() ! askForPin(pinRetries)
 goto(Started) using HandlingOrder(orderNumber)
 }

Here, if an actor is in the state “Waiting”, and it receives the event “CallStarted with the order-number” and data for our current state indicates “NoOrder”, then we do the following:

Reply to the sender of the “CallStarted”-event, which in our case is the Frontend Dispatcher Akka actor, with an asynchronous message “Ask-For-Pin” (“!” means fire and forget).
Transfer the actor’s state to “Started” and set data for the state to “HandlingOrder with the order-number”.

All communication between actors is happening asynchronously, but the modification of the actor’s state is thread safe.
stop() function kills an Akka actor and releases all resources.
stay() function doesn’t transfer an actor into another state.

We also need to add a handler for our special “TakePoisonPill” event:

whenUnhandled {
 case Event(TakePoisonPill, _) =>
 log.info("Oops, wrong pill...")
 stop()
 }

Any event not handled by the when() block is passed to the whenUnhandled() block. Theoretically, “TakePoisonPill”-event can be used for notifying supervising actors of a failure and persisting the current state to a storage.
To retry a call, we’ll send a retry-call message to the Frontend Dispatcher Akka actor after a specific timeout.
Here’s how we implement this:

private def retryCall(orderNumber: String): Unit = {
 context.system.scheduler.scheduleOnce(retryCallInterval, sender(), RetryCall(orderNumber))
 }

private def askForPin(remainingAttempts: Short): Ask = Ask("pin", remainingAttempts)

private def askForEstimation(): Ask = Ask("estimation", 0)

Putting this line at the end of the Order FSM Akka actor is our magic touch:

initialize()

This concludes our look at some of the code samples we’ve implemented for the asynchronous FSM for order confirmation flow.