ReactiveMongo with Akka, Scala and websockets

12 minute read

I was looking for a simple websocket server for one of my projects to test some stuff with reactive mongo. When looking around, though, I couldn’t really find a simple basic implementation without including a complete framework. Finally I stumbled upon one of Typesage activtor projects: http://typesafe.com/activator/template/akka-spray-websocket. Even though the name implies that spray is required, it actually uses websocket stuff from here: https://github.com/TooTallNate/Java-WebSocket, which provides a very simple to use basic websocket implementation.

So in this article I’ll show you how you can setup a very simple websocket server (without requiring additional frameworks), together with Akka and ReactiveMongo. The following screenshots shows what we’re aiming for: Screen Shot 2014-11-22 at 13.58.40.png In this screenshot you can see a simple websocket client that talks to our server. Our server has the following functionality:

  1. Anything the client sends is echo'd back.
  2. Any input added to a specific (capped) collection in mongoDB is automatically pushed towards all the listeners.

You can cut and paste all the code from this article, but it is probably easier to just get the code from git. You can find it in github here: https://github.com/josdirksen/smartjava/tree/master/ws-akka

Getting started

The first thing we need to do is setup our workspace, so lets start by looking at the sbt configuration:


organization  := "org.smartjava"

version       := "0.1"

scalaVersion  := "2.11.2"

scalacOptions := Seq("-unchecked", "-deprecation", "-encoding", "utf8")

libraryDependencies ++= {
  val akkaV = "2.3.6"
  Seq(
    "com.typesafe.akka"   %%  "akka-actor"    % akkaV,
    "org.java-websocket" % "Java-WebSocket" % "1.3.1-SNAPSHOT",
    "org.reactivemongo" %% "reactivemongo" % "0.10.5.0.akka23"
  )
}

resolvers ++= Seq("Code Envy" at "http://codenvycorp.com/repository/"
  ,"Typesafe" at "http://repo.typesafe.com/typesafe/releases/")

Nothing special here, we just specify our dependencies and add some resolvers so that sbt knows where to retrieve the dependencies from. Before we look at the code lets first look at the directory structure and the file of our project:


├── build.sbt
└── src
    └── main
        ├── resources
        │   ├── application.conf
        │   └── log4j2.xml
        └── scala
            ├── Boot.scala
            ├── DB.scala
            ├── WSActor.scala
            └── WSServer.scala

In the src/main/resources directory we store our configuration files and in src/main/scala we store all our scala files. Let start by looking at the configuration files. For this project we use two:

The Application.conf file contains our project’s configuration and looks like this:


akka {
  loglevel = "DEBUG"
}

mongo {
  db = "scala"
  collection = "rmongo"
  location = "localhost"
}

ws-server {
  port = 9999
}

As you can see we just define the log level, how to use mongo and on which port we want our websocket server to listen. And we also need a log4j2.xml file since the reactivemongo library uses that one for logging:

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO">
    <Appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Console>
    </Appenders>
    <Loggers>
        <Root level="INFO">
            <AppenderRef ref="Console"/>
        </Root>
    </Loggers>
</Configuration>

So, with the boring stuff out of the way lets look at the scala files.

Starting the websocket server and registering the paths

The Boot.scala file looks like this:

package org.smartjava

import akka.actor.{Props, ActorSystem}

/**
 * This class launches the system.
 */
object Boot extends App {
  // create the actor system
  implicit lazy val system = ActorSystem("ws-system")
  // setup the mongoreactive connection
  implicit lazy val db = new DB(Configuration.location, Configuration.dbname);

  // we'll use a simple actor which echo's everything it finds back to the client.
  val echo = system.actorOf(EchoActor.props(db, Configuration.collection), "echo")

  // define the websocket routing and start a websocket listener
  private val wsServer = new WSServer(Configuration.port)
  wsServer.forResource("/echo", Some(echo))
  wsServer.start

  // make sure the actor system and the websocket server are shutdown when the client is
  // shutdown
  sys.addShutdownHook({system.shutdown;wsServer.stop})
}

// load configuration from external file
object Configuration {
  import com.typesafe.config.ConfigFactory

  private val config = ConfigFactory.load
  config.checkValid(ConfigFactory.defaultReference)

  val port = config.getInt("ws-server.port")
  val dbname = config.getString("mongo.db")
  val collection = config.getString("mongo.collection")
  val location = config.getString("mongo.location")
}

In this source file we see two objects. The Configuration object allows us to easily access the configuration elements from the application.conf file and the Boot object will start our server. The comments in the code should pretty much explain what is happening, but let me point out the main things:

  1. We create an Akka actor system and a connection to our mongoDB instance.
  2. We define an actor which we can register to a specific websocket path.
  3. Then we create and start the websocketserver and register a path to the actor we just created.
  4. Finally we register a shutdown hook, to clean everything up.

And that’s it. Now lets look at the interesting part of the code. Next up is the WSServer.scala file.

Setting up a websocket server

In the WSServer.scala file we define the websocket server.


package org.smartjava

import akka.actor.{ActorSystem, ActorRef}
import java.net.InetSocketAddress
import org.java_websocket.WebSocket
import org.java_websocket.framing.CloseFrame
import org.java_websocket.handshake.ClientHandshake
import org.java_websocket.server.WebSocketServer
import scala.collection.mutable.Map
import akka.event.Logging

/**
 * The WSserver companion objects defines a number of distinct messages sendable by this component
 */
object WSServer {
  sealed trait WSMessage
  case class Message(ws : WebSocket, msg : String) extends WSMessage
  case class Open(ws : WebSocket, hs : ClientHandshake) extends WSMessage
  case class Close(ws : WebSocket, code : Int, reason : String, external : Boolean) 
                                                                                                         extends WSMessage
  case class Error(ws : WebSocket, ex : Exception) extends WSMessage
}

/**
 * Create a websocket server that listens on a specific address.
 *
 * @param port
 */
class WSServer(val port : Int)(implicit system : ActorSystem, db: DB ) 
                             extends WebSocketServer(new InetSocketAddress(port)) {

  // maps the path to a specific actor.
  private val reactors = Map[String, ActorRef]()
  // setup some logging based on the implicit passed in actorsystem
  private val log = Logging.getLogger(system, this);

  // Call this function to bind an actor to a specific path. All incoming
  // connections to a specific path will be routed to that specific actor.
  final def forResource(descriptor : String, reactor : Option[ActorRef]) {
    log.debug("Registring actor:" + reactor + " to " + descriptor);
    reactor match {
      case Some(actor) => reactors += ((descriptor, actor))
      case None => reactors -= descriptor
    }
  }

  // onMessage is called when a websocket message is recieved.
  // in this method we check whether we can find a listening
  // actor and forward the call to that.
  final override def onMessage(ws : WebSocket, msg : String) {

    if (null != ws) {
      reactors.get(ws.getResourceDescriptor) match {
        case Some(actor) => actor ! WSServer.Message(ws, msg)
        case None => ws.close(CloseFrame.REFUSE)
      }
    }
  }

  final override def onOpen(ws : WebSocket, hs : ClientHandshake) {
    log.debug("OnOpen called {} :: {}", ws, hs);
    if (null != ws) {
      reactors.get(ws.getResourceDescriptor) match {
        case Some(actor) => actor ! WSServer.Open(ws, hs)
        case None => ws.close(CloseFrame.REFUSE)
      }
    }
  }

  final override def onClose(ws : WebSocket, code : Int, reason : String, external : Boolean) {
    log.debug("Close called {} :: {} :: {} :: {}", ws, code, reason, external);
    if (null != ws) {
      reactors.get(ws.getResourceDescriptor) match {
        case Some(actor) => actor ! WSServer.Close(ws, code, reason, external)
        case None => ws.close(CloseFrame.REFUSE)
      }
    }
  }
  final override def onError(ws : WebSocket, ex : Exception) {
    log.debug("onError called {} :: {}", ws, ex);
    if (null != ws) {
      reactors.get(ws.getResourceDescriptor) match {
        case Some(actor) => actor ! WSServer.Error(ws, ex)
        case None => ws.close(CloseFrame.REFUSE)
      }
    }
  }
}

A large source file, but not difficult to understand. Let me explain the core concepts:

  1. We first define a number of messages as case classes. These are the messages that we sent to our actors. They reflect the messages our websocket server can receive from a client.
  2. The WSServer itself extends from the WebSocketServer provided by the org.java_websocket library.
  3. The WSServer defines one additional function called forResource. With this function we define which actor to call when we receive a message on our websocket server.
  4. and finally we override the different on* methods which are called when a specific event happens on to our websocket server.

Now lets look at the echo functionality

The akka echo actor

The echo actor has two roles in this scenario. First it provides the functionality to respond to incoming messages by responding with the same message. Besides that it also creates a child actor (named ListenActor) that handles the documents received from mongoDB.


object EchoActor {

  // Messages send specifically by this actor to another instance of this actor.
  sealed trait EchoMessage

  case class Unregister(ws : WebSocket) extends EchoMessage
  case class Listen() extends EchoMessage;
  case class StopListening() extends EchoMessage

  def props(db: DB): Props = Props(new EchoActor(db))
}

/**
 * Actor that handles the websocket request
 */
class EchoActor(db: DB) extends Actor with ActorLogging {
  import EchoActor._

  val clients = mutable.ListBuffer[WebSocket]()
  val socketActorMapping = mutable.Map[WebSocket, ActorRef]()

  override def receive = {

    // receive the open request
    case Open(ws, hs) => {
      log.debug("Received open request. Start listening for ", ws)
      clients += ws

      // create the child actor that handles the db listening
      val targetActor = context.actorOf(ListenActor.props(ws, db));

      socketActorMapping(ws) = targetActor;
      targetActor ! Listen
    }

    // recieve the close request
    case Close(ws, code, reason, ext) => {
      log.debug("Received close request. Unregisting actor for url {}", ws.getResourceDescriptor)

      // send a message to self to unregister
      self ! Unregister(ws)
      socketActorMapping(ws) ! StopListening
      socketActorMapping remove ws;
    }

    // recieves an error message
    case Error(ws, ex) => self ! Unregister(ws)

    // receives a text message
    case Message(ws, msg) => {
      log.debug("url {} received msg '{}'", ws.getResourceDescriptor, msg)
      ws.send("You send:" + msg);
    }

    // unregister the websocket listener
    case Unregister(ws) => {
      if (null != ws) {
        log.debug("unregister monitor")
        clients -= ws
      }
    }
  }
}

The code of this actor pretty much should explain itself. With this actor and the code so far we’ve got a simple websocket server that uses an actor to handle messages. Before we look at the ListenActor, which is started from the “Open” message received by the EchoHandler, lets quickly look at how we connect to mongoDB from our DB object:


package org.smartjava;

import play.api.libs.iteratee.{Concurrent, Enumeratee, Iteratee}
import reactivemongo.api.collections.default.BSONCollection
import reactivemongo.api._
import reactivemongo.bson.BSONDocument
import scala.concurrent.ExecutionContext.Implicits.global

/**
 * Contains DB related functions.
 */
class DB(location:String, dbname:String)  {

  // get connection to the database
  val db: DefaultDB = createConnection(location, dbname)
  // create a enumerator that we use to broadcast received documents
  val (bcEnumerator, channel) = Concurrent.broadcast[BSONDocument]
  // assign the channel to the mongodb cursor enumerator
  val iteratee = createCursor(getCollection(Configuration.collection))
                    .enumerate()
                    .apply(Iteratee
                        .foreach({doc: BSONDocument => channel.push(doc)}));

  /**
   * Return a simple collection
   */
  private def getCollection(collection: String): BSONCollection = {
    db(collection)
  }

  /**
   * Create the connection
   */
  private def createConnection(location: String, dbname: String)  : DefaultDB = {
    // needed to connect to mongoDB.
    import scala.concurrent.ExecutionContext

    // gets an instance of the driver
    // (creates an actor system)
    val driver = new MongoDriver
    val connection = driver.connection(List(location))

    // Gets a reference to the database
    connection(dbname)
  }

  /**
   * Create the cursor
   */
  private def createCursor(collection: BSONCollection): Cursor[BSONDocument] = {
    import reactivemongo.api._
    import reactivemongo.bson._
    import scala.concurrent.Future

    import scala.concurrent.ExecutionContext.Implicits.global

    val query = BSONDocument(
      "currentDate" -> BSONDocument(
        "$gte" -> BSONDateTime(System.currentTimeMillis())
      ));

    // we enumerate over a capped collection
    val cursor  = collection.find(query)
      .options(QueryOpts().tailable.awaitData)
      .cursor[BSONDocument]

    return cursor
  }

  /**
   * Simple function that registers a callback and a predicate on the
   * broadcasting enumerator
   */
  def listenToCollection(f: BSONDocument => Unit,
                         p: BSONDocument => Boolean ) = {

    val it = Iteratee.foreach(f)
    val itTransformed = Enumeratee.takeWhile[BSONDocument](p).transform(it);
    bcEnumerator.apply(itTransformed);
  }
}

Most of this code is fairly standard, but I’d like to point a couple of things out. At the beginning of this class we set up an iteratee like this:


  val db: DefaultDB = createConnection(location, dbname)
  val (bcEnumerator, channel) = Concurrent.broadcast[BSONDocument]
  val iteratee = createCursor(getCollection(Configuration.collection))
                    .enumerate()
                    .apply(Iteratee
                        .foreach({doc: BSONDocument => channel.push(doc)}));

What we do here is that we first create a broadcast enumerator using the Concurrent.broadcast function. This enumerator can push elements provided by the channel to multiple consumers (iteratees). Next we create an iteratee on the enumerator provided by our ReactiveMongo cursor, where we use the just created channel to pass the documents to any iteratee that is connected to the bcEnumerator. We connect iteratees to the bcEnumerator in the listenToCollection function:


  def listenToCollection(f: BSONDocument => Unit,
                         p: BSONDocument => Boolean ) = {

    val it = Iteratee.foreach(f)
    val itTransformed = Enumeratee.takeWhile[BSONDocument](p).transform(it);
    bcEnumerator.apply(itTransformed);
  }

In this function we pass in a function and a predicate. The function is executed whenever a document is added to mongo and the predicate is used to determine when to stop sending messages to the iteratee.

The only missing part is the ListenActor

ListenActor which responds to messages from Mongo

The following code shows the actor responsible for responding to messages from mongoDB. When it receives a Listen message it registers itself using the listenToCollection function. Whenever a message is passed in from mongo it sends a message to itself, to further propogate it to the websocket.

object ListenActor {
  case class ReceiveUpdate(msg: String);
  def props(ws: WebSocket, db: DB): Props = Props(new ListenActor(ws, db))
}
class ListenActor(ws: WebSocket, db: DB) extends Actor with ActorLogging {

  var predicateResult = true;

  override def receive = {
    case Listen => {

      log.info("{} , {} , {}", ws, db)

      // function to call when we receive a message from the reactive mongo
      // we pass this to the DB cursor
      val func = ( doc: BSONDocument) => {
        self ! ReceiveUpdate(BSONDocument.pretty(doc));
      }

      // the predicate that determines how long we want to retrieve stuff
      // we do this while the predicateResult is true.
      val predicate = (d: BSONDocument) => {predicateResult} :Boolean
      Some(db.listenToCollection(func, predicate))
    }

    // when we recieve an update we just send it over the websocket
    case ReceiveUpdate(msg) => {
      ws.send(msg);
    }

    case StopListening => {
      predicateResult = false;

      // and kill ourselves
      self ! PoisonPill
    }
  }
}

Now that we’ve done all that, we can run this example. On startup you’ll see something like this:


[DEBUG] [11/22/2014 15:14:33.856] [main] [EventStream(akka://ws-system)] logger log1-Logging$DefaultLogger started
[DEBUG] [11/22/2014 15:14:33.857] [main] [EventStream(akka://ws-system)] Default Loggers started
[DEBUG] [11/22/2014 15:14:35.104] [main] [WSServer(akka://ws-system)] Registring actor:Some(Actor[akka://ws-system/user/echo#1509664759]) to /echo
15:14:35.211 [reactivemongo-akka.actor.default-dispatcher-5] INFO  reactivemongo.core.actors.MongoDBSystem - The node set is now available
15:14:35.214 [reactivemongo-akka.actor.default-dispatcher-5] INFO  reactivemongo.core.actors.MongoDBSystem - The primary is now available

Next when we connect a websocket we see the following: Screen Shot 2014-11-22 at 15.15.26.png


[DEBUG] [11/22/2014 15:15:18.957] [WebSocketWorker-32] [WSServer(akka://ws-system)] OnOpen called org.java_websocket.WebSocketImpl@3161f479 :: org.java_websocket.handshake.HandshakeImpl1Client@6d9a6e19
[DEBUG] [11/22/2014 15:15:18.965] [ws-system-akka.actor.default-dispatcher-2] [akka://ws-system/user/echo] Received open request. Start listening for  WARNING arguments left: 1
[INFO] [11/22/2014 15:15:18.973] [ws-system-akka.actor.default-dispatcher-5] [akka://ws-system/user/echo/$a] org.java_websocket.WebSocketImpl@3161f479 , org.smartjava.DB@73fd64

Now lets insert a message into the mongo collection which we created with the following command:


db.createCollection( "rmongo", { capped: true, size: 100000 } )

And lets insert an message:


> db.rmongo.insert({"test": 1234567, "currentDate": new Date()})
WriteResult({ "nInserted" : 1 })

Which results in this in our websocket client: Screen Shot 2014-11-22 at 15.17.55_0.png

If you’re interested in the source files look at the following directory in GitHub: https://github.com/josdirksen/smartjava/tree/master/ws-akka

Updated: