Akka Typed Actors: Exploring the receiver pattern

In the previous article we looked at some of the basic features provided by Akka Typed. In this article and the next one we'll look a bit closer at some more features and do that by looking at the two different patterns provided by Akka Typed: the Receiver and the Receptionist pattern. If you're new to Akka Typed, it might be a good idea to first read the previous article, since that'll give you a bit of an introduction into Akka Typed. So for this article in our series on akka-typed we'll look at the Receiver pattern.

As always, you can find the code for this example in a Github Gist: https://gist.github.com/josdirksen/77e59d236c637d46ab32

The receiver pattern

In the Akka Typed distribution there is a package call akka.typed.patterns. In this package there are two different patterns the Receiver pattern and the Receptionist pattern. Why these two patterns were important enough to add to the distribution I don't really know to be honest, but they do provide a nice way to introduce some more concepts and ideas behind Akka Typed.

So let's look into the Receiver pattern and we'll do the Receptionist pattern in the next article. To understand what the Receiver pattern doen, lets just look at the messages that we can send to it:

 /**
   * Retrieve one message from the Receiver, waiting at most for the given duration.
   */
  final case class GetOne[T](timeout: FiniteDuration)(val replyTo: ActorRef[GetOneResult[T]]) extends Command[T]
  /**
   * Retrieve all messages from the Receiver that it has queued after the given
   * duration has elapsed.
   */
  final case class GetAll[T](timeout: FiniteDuration)(val replyTo: ActorRef[GetAllResult[T]]) extends Command[T]
  /**
   * Retrieve the external address of this Receiver (i.e. the side at which it
   * takes in the messages of type T.
   */
  final case class ExternalAddress[T](replyTo: ActorRef[ActorRef[T]]) extends Command[T]

As you can see from these messages what a Receiver does is that it queues messages of type T, and provides additional commands to either get one or more of those messages, while waiting a specific time. To use a receiver we need to get the ExternalAddress, so that we can send messages of type T to it. And from an other actor we can send get GetOne and GetAll messages to see whether there are any messages waiting in the receiver.

For our example we're going to create the following actors:

  • A producer which sends messages of type T to the receiver.
  • A consumer which can retrieve messages of type T from this receiver.
  • A root actor, which runs this scenario.

We'll start with the producer, which looks like this:

In this object we define the messages that can be sent to the actor, and the behavior. The registerReceiverMsgIn message provides the actor with the destination it should send messages to (more on this later), and the addHelloWorldMsg tells the behavior what message to send to the address provided by the registerReceiverMsgIn message. If you look at this behavior you can see that we use a Full[T] behavior. For this behavior we have to provide matchers for all the messages and signals, and as an added bonus we also get access to the actor ctx. In its initial state this behavior only responds to registerReceiverMsgIn messages. When it receives such a message it does two things:

  1. It defines a function which we can use to schedule a message, we we also directly call, to schedule a message being sent in half a second.
  2. It defines our new behavior. This new behavior can process the messages sent by the scheduleMessage function. When it receives that message, it sends the content to the provided messageConsumer (the Receiver), and calls the schedule message again. To keep sending messages every 500 ms.

So when we sent the initial registerReceiverMessage, it will result in an actor that sends a new message to the receiver every 500 ms. Now lets look at the other side: the consumer.

For the consumer we've also wrapped everything in an object, which looks like this:

In this object we defines a single behavior, which also switches its implementation after receiving the first message. The first message in this case is called registerReceiverCmdIn. With this message we get access to the actorRef (of the Receiver) that we need to send the GetAll and getOne messages to. After we've switched behavior, we process our own custom GetAllMessages message, which will trigger a GetAll message being sent to the Receiver. Since our own behavior isn't typed for the kind of responses received from the Receiver, we use an adapter (ctx.spawnAdapter). This adapter will receive the response from the Receiver and print out the messages.

The final message part is an actor which initiates this behavior:

Nothing to special here. We create the various actors in this scenario and use the ctx.spawnAdapter to get the external address of the receiver, which we pass to the producerActor. Next we pass the address of the receiver actor to the consumer. Now we call the GetAllMessages on the consumer address which gets the messages from the receiver and prints them out.

So summarising the steps that will be executed in this example:

  1. We create a root actor that will run this scenario.
  2. From this root actor we create the three actors: receiver, consumer and producer.
  3. Next we get the externalAddress from the receiver (the address to which we sent messages of type T) and using an adapter pass this to the producer.
  4. The producer, on receiving this message, switches behavior and starts sending messages to the passed in address.
  5. The root actor, in the meantime, passes the address of the Receiver to the consumer.
  6. The consumer ,when it receives this messages, changes behavior and now waits for messages of the type GetAllMessages.
  7. The root actor will now send a GetAllMessages to the consumer.
  8. When the consumer receives this messages it will use an adapter to send a GetAll message to the receiver. When the adapter receive a response it prints out the number of messages received, and handles further processing to the consumer by sending a PrintMessage for each received message from the receiver.

And the result of this scenario looks like this:

Scenario1: Started, now lets start up a number of child actors to do our stuff
Scenario1: Get all the messages
Consumer: Switching behavior
Consumer: requesting all messages
Producer: Switching behavior
Producer: Adding new 'Hello(hello @ 1446277162929)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277163454)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277163969)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: Received 3 messages
Consumer: Printing messages: Vector(Hello(hello @ 1446277162929), Hello(hello @ 1446277163454), Hello(hello @ 1446277163969))
  Hello(hello @ 1446277162929)
  Hello(hello @ 1446277163454)
  Hello(hello @ 1446277163969)
Producer: Adding new 'Hello(hello @ 1446277164488)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277165008)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: requesting all messages
Producer: Adding new 'Hello(hello @ 1446277165529)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277166049)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277166569)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277167089)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: Received 6 messages
Consumer: Printing messages: Vector(Hello(hello @ 1446277164488), Hello(hello @ 1446277165008), Hello(hello @ 1446277165529), Hello(hello @ 1446277166049), Hello(hello @ 1446277166569), Hello(hello @ 1446277167089))
  Hello(hello @ 1446277164488)
  Hello(hello @ 1446277165008)
  Hello(hello @ 1446277165529)
  Hello(hello @ 1446277166049)
  Hello(hello @ 1446277166569)
  Hello(hello @ 1446277167089)
Producer: Adding new 'Hello(hello @ 1446277167607)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277168129)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277168650)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277169169)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277169690)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277170210)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: requesting all messages
Producer: Adding new 'Hello(hello @ 1446277170729)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277171249)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277171769)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277172289)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: Received 10 messages
Consumer: Printing messages: Vector(Hello(hello @ 1446277167607), Hello(hello @ 1446277168129), Hello(hello @ 1446277168650), Hello(hello @ 1446277169169), Hello(hello @ 1446277169690), Hello(hello @ 1446277170210), Hello(hello @ 1446277170729), Hello(hello @ 1446277171249), Hello(hello @ 1446277171769), Hello(hello @ 1446277172289))
  Hello(hello @ 1446277167607)
  Hello(hello @ 1446277168129)
  Hello(hello @ 1446277168650)
  Hello(hello @ 1446277169169)
  Hello(hello @ 1446277169690)
  Hello(hello @ 1446277170210)
  Hello(hello @ 1446277170729)
  Hello(hello @ 1446277171249)
  Hello(hello @ 1446277171769)
  Hello(hello @ 1446277172289)
Producer: Adding new 'Hello(hello @ 1446277172808)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277173328)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277173849)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277174369)' to receiver: Actor[akka://Root/user/receiver#1097367365]

Cool right! As you can see from the message sequence, our producer sends messages to the receiver which queues them up. Next we have a consumer which requests all the messages that have been received so far and prints them out.

That's it for this article on Akka-Typed, in the next one we'll look at the Receptionist pattern also present in Akka-Typed.