Streaming Recommender with Spark

In this article, I explain how I implemented a distributed streaming article recommender with Apache Spark, also using Akka, RabbitMQ and Cassandra.

In this article, I explain how I implemented a distributed streaming article recommender in Scala with Akka, RabbitMQ, Cassandra and Spark. If you dont know the technologies it is difficult to understand the following explainations. Therefore I recommend to read the introductions on the linked sites first.

Akka is a framework for highly concurrent, distributed, and resilient message-driven applications. RabbitMQ is a message broker. Cassandra is a distributed NoSQL database providing high availability. Last but not least, Spark is a framework for distributed computation which can run on scalable and high availability cluster managers like Mesos.

I try to keep the article as short but informative as possible. The article is quite long, so let me start with an outline: First, I will explain the setting and give some numbers. Second, I will explain why the initial monolithic design failed. Afterwards, I will describe the current service-oriented design, including the functionality of all involved components. Furthermore, I will go into the details of the model storage and fault tolerancy in the system in general. In the end, I will conclude and present possible future efforts for further improving the system.

Production System Facts

The recommender will recommend articles based on their similarity score of features extracted from the full text, for instance word2vec word vectors. The current architecture is running successfully in a production environment with thousands of items distributed among isolated sets. Also, about 6000 of similarities are calcuated each second. The recommender calcuates the similarities of all possible article pairs. While this approach runs well in production, the design has to continously improve since it has to cope with increasing throughput of item create and delete events. Which also implies a changing model size.

Prototype

In the company where I work, we are doing the transition to Spark-based recommenders, but when we started we had no experience so far with Spark nor experts of the field in the team. Even though, there is a great documentation of the project, still, best practices for certain use cases have not yet been formed. So I began with an explorative prototype, to understand how I could use Spark to scale the computation process of the similarity calculation. I started with a monolithic prototype, a standalone Spark streaming driver application.

A Better Approach

The main and innovative idea of Spark is to do distributed computation of a processing flow graph. Without going too much into the details, the driver coordinates the processing tasks which are parallized and balanced among different executors. Building a model and maintaining it (for instance doing I/O with a database) would be possible in the Spark driver, but it is a very bad idea to do so for the following reasons:

Blocking due to e.g. I/O in the driver, would block the whole streaming process for an undetermined time. Furthermore, one has to tune the memory limit of the driver not only based on the computation tasks but also according to the I/O usage. Because of these reasons I decided to use Spark Streaming just for its intended use, streaming computation and implement another service for model maintainance.

Components

The main components of the architecture are (1) the model service, which is a Scala service which is implemented using Akka and (2) the Spark driver application. The first component is the entry point of incoming item create and delete events. It also does preprocessing like stop word filtering, language detection and so on. Its core functionality is the management of the model stored in the Cassandra cluster. The Spark driver application receives processing tasks from the model service, does the distributed calculation in the cluster, and sends the results back to the model service.

The components communicate over the message broker RabbitMQ. I chose it, because it was part of the existing infrastructure. Besides, it has a nifty UI and also has support for message acknowledgement, which is useful for fault tolerancy in the model service. For the ingestion of item events into the system the log of Kafka would be very beneficial to replay item events to retrain the recommender with past item events.

Functionality of the Model Service

In the situation of incoming item create events, it will (1) group new item create events of a domain within a certain time window, (2) fetch feature data of all articles of this domain from Cassandra and (3) send the processing task with all required data to the Spark driver application. Keep in mind that the article recommendations are calculated in isolated domains, so we can parallelize step one, two and three on a domain level.

At the point of sending the task to the Spark driver, the model services sets this domain to a waiting state. It has to wait until the model is updated with new information, before it can send another task request. Keep in mind, that only this domain is blocked for the time between publishing and finishing updating the model.

Functionality of the Spark Driver

The Spark driver runs on a cluster and receives the computation tasks via RabbitMQ. In the concrete implementation it does feature extraction as well as the computation of similarities based on the features of new articles and features of existing articles in the model (which were fetched from Cassandra in the model service). The result is send back to the model service via RabbitMQ.

Model Storage

In this paragraph, I want to get into the details of the model storage. Since the storage is only use to store data, a simple key-value store like Redis would be sufficient. But it was clear, that this solution does not scale in the future or further work would be necessary.

So, at this point, I decided to use the widely-used Cassandra database for a couple of reasons: (1) A Cassandra cluster scales with new nodes and data is distributed equally, when the definition of the data model is right. (2) Cassandra has been designed to allow fast writes, and reads with the help of built-in caching. (3) It supports synchronous and asynchronous (not waiting for Cassandra to accept) queries. Important to mention: Scalability comes with the downside of eventual consistency and last but not least: (4) High availability of the Cassandra cluster nodes.

Fault Tolerancy

The model service is built upon the Akka framework which is a huge help when developing fault-tolerant applications. Akka is a framework for distributed and fault-tolerant Actor systems. If you are not familiar with Akka or actor-based systems, I like to refer to the official introduction.

Akka handles unexpected errors in the following way: If an exception is thrown in an actor, the actor is simply restarted, which means it will loose its state. If this actor is stateless, the message will just be discarded and it will continue working. If there was a state, you should handle this unexpected behaviour with a custom failure protocol or an escalation to a severe exception that will halt the whole application; Of course, the latter should be avoided.

Restart, Replication and Timeouts

As a last resort, the cluster manager, e.g. Mesos, will restart the model service, or the Spark driver. Furthermore, a replication of the model service and the Spark driver is possible to handle hardware outtages. In this case, multiple instances balance the load (sharded dependend on the domain).

In the concrete implementation, I also added a duration supervisor which acts a timeout observer of computation tasks sent to the Spark driver. Consequently, an outtage of the Spark driver, will not result into a blocked state because the model service waits for the results.

Conclusion and Future Efforts

As I stated in the beginning, I wanted to keep the article as short but informative as possible. I hope that this article gave you some insights of how a streaming recommender with Spark can be deployed. Needless to say, there is much room for improvement, for instance:

Since the model service can easily keep up with the throughput of item events and model data for now, there is no need for scaling this part of the architecture yet. Nevertheless, two options are possible: (1) Sharding model services dependend on the domain groups, or (2) scaling the instances of remote Akka actors across the cluster. Both options can be seen as possible future efforts. If you have questions regarding the implementation feel free to write me an email.


One or two mails a month about the latest technology I'm hacking on.