Processing Tweets with Kafka Streams

I'll present an example application for Kafka Streams which computes word counts in tumbling windows of tweets fetched from Twitter's Streaming API.

In this article, I'll present an example application which retrieves tweets from Twitter's Streaming API and computes word counts in tumbling windows with the new stream processing library Kafka Streams. The final dockerized project can be found here. Scroll down for a short demo screencast.

Stream Processing

The content of this article will be a practical application example rather than an introduction into stream processing, why it is important or a summarization of Kafka Streams. Further, I will explain only topics which are essential to the example. For an introduction into stream processing, I like to refer to Tyler Akidau's Streaming 101 and Streaming 102. Also, check this list to get an overview of stream processing frameworks and libraries.

Readers, familiar with Scala, should be able to understand the code easily. The example consists of an ingesting service, which is retrieving tweets from Twitter, and an aggregation service which uses Kafka Streams to aggregate word counts in tumbling time windows. The example application can be found in this repository. If you want to see it in action, here's a short screencast.

Kafka Streams

Apache Kafka, often used for ingesting raw events into the backend. It is a high-throughput, distributed, publish-subscribe messaging system, which implements the brilliant concept of logs as the backbone of distributed systems, see this blog post. The latest version 0.10 of Kafka introduces Kafka Streams, which takes a different angle to stream processing. Besides of being a flexible deployment-agnostic library instead of being a cluster-computation framework it tries to focus on simplicity in the semantics. Especially the 'Tables and Streams are Dual' concept is very interesting, see Jay Kreps or Guozhang Wang explaining it. To give you a short example:

A stream can be aggregated into a table, e.g. an aggregation of a stream in a time window can be seen as a table for a time span in a local state. The change log contains every update to that table. So you can transform the windowed aggregate to a stream of changes, which in term can be used in other aggregates or even joins. If you have not already read it, I highly recommend reading the introduction article. Back? Let's continue with an overview of how the services are integrated.

Overview

At the time of writing, Kafka 0.10.0.1 is the latest stable version. Note that the Stream API is marked as unstable and can be changed in later versions. The example application consists of two services written in Scala, an ingestion service (code) and an aggregation service (code). The ingestion service subscribes to the Twitter Streaming API and receives fresh tweets filtered by a list of terms. Any raw tweet is sent to the Kafka topic 'tweets' in JSON. The aggregation service retrieves raw tweets, parses tweets, and aggregates word counts in tumbling time windows, see the code here. Kafka Streams uses an embedded RocksDB for maintaining a local state. Any change to the aggregate will be propagated to the topic 'aggregate'.

Both services share the same SBT project, and will be located in the same fat jar including all dependencies. Which allows us to easily share code in this small example project. Both applications access the application.conf in runtime via the Settings object, see code. I wrote a small build script to compile the services, building the Docker images and running the containers.

Ingestion Service

For accessing the Twitter Streaming API we need OAuth credentials, namely the consumer key, consumer secret, token and token secret. You can generate those by adding an application in the Application Management. For accessing the API, I am using the Hosebird Java HTTP-based library, see code and for reference the SBT dependency. In both services, the Scala object mapper of Jackson is used to serialize and deserialize JSON objects, see code.

Aggregation Service

Let's now examine the aggregation service, which main purpose is the aggregation into word count windows. The processing is defined in the high-level DSL, see the documentation. The following excerpt shows the main part, view the full code here.

val out = builder.stream(new JSONSerde[TweetKey], 
  new JSONSerde[Tweet], Settings.rawTopic)

  .mapValues(new ValueMapper[Tweet, Array[String]] {
    override def apply(value: Tweet): Array[String] = 
      value.text
        .toLowerCase
        .split(" ")
        .map { word => word.trim }
  })
  .aggregateByKey(
    new WordHistogramInitializer(),
    new WordHistogramAggregator(),
    TimeWindows.of("WORD_HISTOGRAM", (10.minutes).inMillis),
    new JSONSerde[TweetKey],
    new JSONSerde[Map[String, Int]])
  .toStream
  .map {
    new KeyValueMapper[Windowed[TweetKey], Map[String, Int], 
      KeyValue[TweetKey, WindowedWordHistogram]] {

      override def apply(key: Windowed[TweetKey], 
        value: Map[String, Int]) = {

        new KeyValue(key.key(), 
        WindowedWordHistogram(key.window.start(), key.window.end(), value))
      }
    }
  }

out.print()
out.to(new JSONSerde[TweetKey], new JSONSerde[WindowedWordHistogram], 
  Settings.aggregationTopic)

In the excerpt, we subscribe to a Kafka topic and return a KStream. For this, we need to specify JSON serializers for the TweetKey as well as for the Tweet case class. Transformations such as the used mapValues method don't require state, so the output will stay a KStream (in contrast to the aggregateByKey function used in the next paragraph). In this transformation, the text is converted to lower case letters, is splitted and the resulting words are trimmed.

AggregateByKey

The operation after is the most interesting part, check also this example of Guozhang Wang. AggregateByKey on a KStream can be used to store an aggregate state. One aggregate per key is maintained. You are able to aggregate in windows or over all time. In this example we aggregate over tumbling 10 minute windows. The aggregate itself must be defined by the user with a custom Initializer and an Aggregator. If you want to use a different timestamp other than ingestion time, you are able to define your own TimestampExtractor.

As you might have understood already there is a local state in the Kafka streams application, which is persisted in the out-of-the-box embedded RocksDB or a specified external key-value store. Since we might to aggregate complex structures, we need to specify serializers and deserializers for the local store as well. For the sake of simplicity I defined a JSON Serde for the purpose of our application, see the code here. Please note, that using the local state with time windows without retention time, you are able to handle late-arrivals by keeping and maintaining the window state.

Initializer and Aggregator

The return type of the aggregation is a KTable reference. Since we are interested in the change stream, we transform the KTable back to a stream and define a final key-value mapping to a windowed word histogram case class. The output will be printed and sent to the aggregation topic. I will now explain the other interesting part: the WordHistogramInitializer and the WordHistogramAggregator.

case class WindowedWordHistogram(start: Long, end: Long, 
  histogram: Map[String, Int])

object AggregationApp extends App {

  class WordHistogramInitializer extends Initializer[Map[String, Int]] {
    override def apply(): Map[String, Int] = Map()
  }

  class WordHistogramAggregator 
    extends Aggregator[TweetKey, Array[String], Map[String, Int]] {

    override def apply(aggKey: TweetKey, value: Array[String], 
                       aggregate: Map[String, Int]) = {

      // within-list frequencies
      val frequencies = value
        .groupBy[String] { word => word }
        .map { case (word, instances) => (word -> instances.length) }

      // add frequencies to global word frequencies
      val updates = frequencies.keys.map { case key =>
        if (aggregate.contains(key)) 
          (key, aggregate(key) + frequencies(key)) 
        else 
          (key, 1)
      }

      aggregate ++ updates
    }
  }

For storing the word counts all we need is a Map[String, Int], which is empty on initialization. The aggregator takes three parameters, the aggregation key, which is the key of the incoming KStream, the new value on that key and current aggregate. The return type of that method is the new aggregate. Note, that the TweetKey is a constant in this example. So every tweet will be a new value. In the aggregation, we want to count the within-list counts and add them to our map of all word counts.

Running it in Docker

For running Kafka 0.10, we can use wurstmeister's of the shelf Docker image. Furthermore, I set up both services as individual Docker containers. Both extend the lightweight Alpine Linux base image with Java. The docker-compose file is in version 2, so you need to have at least Docker Compose 1.6.0+ and Docker Engine of version 1.10.0+ running. For convenience, I added a small build script for building the applications, building the Docker images and running the environment.

Conclusion

You might also want check out other articles. I couldn't find any Scala examples besides the official one. But, Bill Bejeck wrote three articles explaining different parts of Kafka Streams in Java: The Processor API and the KStreams API. He also integrates Kafka Streams with machine learning tasks in the article Machine Learning with Kafka Streams. Yuto Kawamura is explaining in Applying Kafka Streams for internal message delivery pipeline how Kafka Streams is employed in the LINE backend.

Other features, which in my opinion support Kafka Streams elegant design, are fault tolerancy and reprocessing based on the Kafka log, see Fault Tolerancy / State Migrations and Reprocessing. As an outlook I want to refer to this presentation which shows the Kafka Control Center part of the paid Confluent subscription for e.g. end-to-end message guarantees.

As I mentioned in the beginning, this article aims to be a beginner example for streaming processing with Kafka Streams and lots of topics have not even been mentioned. If you want to start using it seriously, I really recommend you to read the referenced articles. If you have any questions or suggestions on my article, feel free to send me an email. See the contact page for contact information.

EDIT: I added more references, especially to talk excerpts. Also, I added the Kafka version used in the example.

Kafka-related Resources

One or two mails a month about the latest technology I'm hacking on.