Stateful Streaming in Spark and Kafka Streams

This article is about aggregates in stateful stream processing in general. I write about the differences between Apache Spark and Apache Kafka Streams along concrete code examples. Further, I list the requirements which we might like to see covered by a stream processing framework.

TLDR: This article is about aggregates in stateful stream processing. It covers two concrete examples in Apache Spark (using the Streaming API with mapWithState) and Apache Kafka (using the high-level DSL in Streams). Spark Streaming and Kafka Streams differ much. Therefore the reader can get to know both approaches and can decide which fits best. I discuss what features for real-time aggregates besides reliability and scaling requirements we would like to see covered (e.g. to be queryable, re-processable, versionable, composable, unlimited updates, providing retention times, downsampling and having end-to-end guarantees). In the end, I will discuss the example code, also along those requirements.

Overview

I will start this article by giving an overview of streaming frameworks. After, I will discuss requirements on aggregates. I would like to present an example in Spark using Spark Streaming's mapWithState and one example in Kafka Streams using the high-level DSL. Spark as a cluster computation framework relying on HDFS and external databases such as Cassandra or HBase is very different from Kafka Streams, a topology-based deployment-agnostic processing library, which heavily relies on the distributed log system Kafka and a key-value store (e.g. RocksDB). Spark Streaming and Kafka Streams differ much. Therefore the reader can get to know both approaches and can decide which fits best.

For a complete example application in Kafka Streams, check out my previous article Processing Tweets with Kafka Streams. For the examples some experience with Spark and Kafka is needed, I will refer to introduction articles.

Streaming Analytics

Data-driven decision making in companies is becoming essential to stay competitive (adoption rates nearly tripled from 11 - 30% between 2005 and 2010, see this article). Analytics are valuable for both business, business partners or customers. Data warehousing, data mining and learning from archived data is essential to measure KPIs, gain insights and to plan a strategy to reach the companies goal. With a market size of $5.1 billion in 2015, it is forecasted to $53.7 billion by 2017, see here. However, in recent years, analytics have been transitioning from offline (batch) to online (streaming) approaches.

Streaming analytics is extensively used in a wide variety of domains such as healthcare, e-commerce, financial services, telecommunications, energy and utilities, manufacturing, government, and transportation. This paper summarizes growth forecasts in the listed fields.

Streaming Frameworks

For a conceptual introduction into stream processing, I like to refer to the introduction articles Streaming 101 and Streaming 102. A list of vast streaming framework can be found here. The following table shows Apache streaming frameworks. The original article where I got this useful comparison from is: An Overview of Apache Streaming Technologies.

Streaming Frameworks

The key differences of streaming frameworks are, when they process events (event-at-a-time or (micro-)batches, how the architecture looks like (cluster framework, library or distributed workers), what processing guarantees they have, e.g. at-least-once (duplicates possible), exactly-once processing and small latency or near-zero latency. When it comes to stateful streaming, it is also important how and where the state is stored and for how long. General problems like scalability, elasticity, fault tolerancy, high-availability and sub-problems like state migration, versioning of data / computation and reprocessing are also important to compare. All in all, as usual it highly depends on the requirements. As I wrote in the beginning, I would like to focus on aggregates with Apache Spark Streaming and Kafka Streams. But first, let's define what an aggregate is and what features we would like to have on aggregates.

Aggregates

A streaming computation could be simple as a shell command, a web request, a database insert, any arbitrary execution, or output data in form of an updated aggregate, a stream of transformed events, new data triggered by input events, new data dependent on external lookups in other tables (e.g. machine learning models) and so forth. The use case of maintaining a realtime aggregate is however the most interesting one since it needs to store a state somehow, somewhere.

For example, an aggregate could be the total amount of viewers of a page in a certain time window. This aggregate depends on events fired in this event time window. Desired features of such an aggregate, besides reliability, scalability and fault tolerancy, would be: to be queryable (like a database, give me the current state of the aggregate), reprocessable (if the aggregated was computed in the wrong way), versionable (we have multiple versions of the aggregate since the computation changed), composable (one aggregate depends on another aggregate), unlimited updates (no watermarking needed, you should be able to always update an aggregate, some use case exceptions later on), sending updates (changes) of aggregates downstream (for consumers, e.g. analytics UI), providing retention times (when is this aggregate obsolete or not useful anymore), downsampling (different resolutions of an aggregate value) and having end-to-end guarantees (is this event used for the current aggregate?).

In the next sections I will give the two concrete examples and will then refer to those features some of them rely on design decisions of the overall architecture and some directly depend on the streaming framework.

Aggregates in Spark Streaming

Even though Spark 2.0.0 introduced Structured Streaming, in which you should be able to perform SQL queries on infinite data frames representing streams, Structured Streaming is an ALPHA RELEASE, and the APIs are still experimental. Many important features are not available yet, e.g. joins between streams, the distinct operation and the update output mode for aggregate changes. Therefore, for general purpose stream processing, I highly recommend to stick to the Spark Streaming API, which before 1.6 supplied the updateStateByKey DStream function, and since 1.6 the mapWithState DStream function. See Faster Stateful Stream Processing in Apache Spark's Streaming for an official comparison. As another reference, I also like to refer to the online book Mastering Apache Spark, chapter Working with State using Stateful Operations.

Don't Use UpdateStateByKey

The big caveat of using updateStateByKey is, that on each new value the transformation iterates over the entire state store, regardless whether a new value for each key has been consumed or not, see here. Smaller caveats are: no timeout mechanism (if you need it) and it will only return what is stored. In Spark 1.6, mapWithState was introduced, which has a built-in timeout mechanism, partial updates, arbitrary return types and an initial state. A well-written comparison can be found here: Exploring Stateful Streaming with Apache Spark. Okay, let me show a simple example employing mapWithState: Maintaining a distinct set of strings per key.

mapWithState

mapWithState has the following function signature: mapWithState[StateType, MappedType](spec: StateSpec[K, V, StateType, MappedType]). K, V are the types of the input stream, the StateType the type of the state stored and MappedType the returning stream type. The StateSpec can be defined by a function. In the following example, let us assume an input stream of type DStream[(SiteId, String)] representing page view events partitioned by SiteId. The string can represent a user id.

def stateSpecFunc(batchTime: Time, key: SiteId, value: Option[String],
                   state: State[List[String]]): Option[(SiteId, Int)] = {
  val v = value.get
  if (state.exists()) {
    val currentSet = state.get()
    if (currentSet.contains(v)) {
      None
    } else {
      state.update(currentSet.++(List(v)))
      Some(key, state.get.size)
    }
  } else {
    state.update(List(v))
    Some(key, state.get.size)
  }
}
val spec = StateSpec.function(stateSpecFunc)

As you can see, we have to do the state initialization and retrieval manually. As discussed, we can return an arbitrary DStream with mapWithState. We now use this trackStateFunc in our processing graph.

val stateSpec = StateSpec.function(trackStateFunc _).numPartitions(2)
val stateStream = input.mapWithState(stateSpec)
val stateSnapshotStream = stateStream.stateSnapshots().map {
  state => (state._1, state._2.size)
}
stateSnapshotStream.print()

ssc.checkpoint("file:/tmp/")
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate

Features

Let's re-visit the desired features of aggregates which we would like to have. First, the aggregate should be queryable. The state is stored virtually in an RDD and physically spread over the cluster worker nodes. Besides there is no official API to query the state. Therefore we need to store the aggregates somewhere downstream e.g. database. We can reprocess the data if we stored the original events (for instance in a Kafka log) and reprocess by event time. For event time processing we need to time-bucket events manually by a groupByKey operation. If reprocessing is a common use case, the iteration is an important variable when it comes to storing and querying. Versioning in the database and in the query API needs to manually implemented. Since we can return whatever data type in the state function, we can compose aggregates.

Let's discuss unlimited updates to an aggregate. So, if you deal with aggregates which are temporary and will be closed (e.g. an event window, any events after a certain watermark will be discarded for that window, even if the events lies in that window), you can push the final aggregate downstream and save into e.g. a database. In the case when the aggregate is not temporary, the RDD will grow, and gets eventually spilled onto disk. Retention times and downsampling needs to be done manually. Exactly-once processing guarantees can be achieved with extra effort, see here. As already discussed, both functions mapWithState and updateByStateKey the state is implemented using the RDD concept, therefore checkpointing for recovery in case of failure is also supported by the state (in the file system as well as HDFS).

Towards Structured Streaming

Tathagata Das mentions in his talk Apache Spark 2.0: A Deep Dive Into Structured Streaming some pain points which led to the development of structured streaming, including not having the concept of event-time processing, using the same engine for batch and streaming, and the difficulty of reasoning about end-to-end guarantees. Interestingly, he also mentions the table stream duality, originally described here by Jay Kreps. This concept inspired the Kafka people to fundamentally rethink stream processing.

Aggregates in Kafka Streams

The latest version 0.10 of Kafka introduces Kafka Streams. It is a deployment-agnostic stream processing library with event-at-a-time (not micro-batch) semantics written in Java. It scales via partitioning and tasks, is fault-tolerant and has an at-least-once guarantee when it comes to processing records. The framework tries to focus on simplicity in the semantics. Especially the 'Tables and Streams are Dual' concept is very interesting, see Jay Kreps or Guozhang Wang explaining it. To give you a short example:

A stream can be aggregated into a table, e.g. an aggregation of a stream in a time window can be seen as a table for a time span in a local state. The change log contains every update to that table. So you can transform the windowed aggregate to a stream of changes, which in term can be used in other aggregates or even joins. If you have not already read it, I highly recommend reading the introduction article.

Without further introduction let us examine the example. In this example, we do same processing: Maintaining a set of strings by key. The processing is defined in the high-level DSL in the latest Kafka 0.10.1, see the documentation.

class SetInitializer[V] extends Initializer[Set[V]] {
  override def apply() = Set[V]()
}

class DistinctAggregator[K, V] extends Aggregator[K, V, Set[V]] {
  override def apply(k: K, v: V, t: Set[V]) = t + v
}

builder
  .stream(Serde[String], Serde[String], topic)
  .groupByKey(Serdes.String(), Serdes.String())
  .aggregate(
    new SetInitializer[String],
    new DistinctAggregator[String, String],
    Serde[List[String]],
    "store-name")
  .toStream
  .to(Serde[String], Serde[Set[String], output-topic)

The return type of the aggregation is a KTable reference. Since we are interested in the change stream, we transform the KTable back to a change stream. The operation after is the most interesting part, also check this example of Guozhang Wang. Aggregate() on a KStream can be used to store an aggregate state. One aggregate per key is maintained. The aggregate itself must be defined by the user with a custom Initializer and an Aggregator. You might also want to check out other articles. Here are the official Confluent Scala and Java examples. The documentation for these examples can be found here. Another good resource is the Kafka papers and presentations wiki page.

Conclusion

If you want to start using it seriously, I recommend you to read the referenced articles. If you have any questions or suggestions on my article, feel free to send me an email. See the contact page for contact information.

Resources