The documentation on monitoring of Kafka Streams is a bit sparse, so I will shed some light on interesting metrics to monitor when running Kafka Streams applications. In addition to Kafka producer, consumer metrics, each Kafka Streams application has stream-metrics, stream-rocksdb-state-metrics, and stream-rocksdb-window-metrics.
In this article, let's focus on the stream processing metrics only. I'll present a way to access the metrics using the command-line application jmxterm, and I'll give some insights about the semantics of these metrics, so you can use them for monitoring and alerting. For an introduction into monitoring, I highly recommend the Monitoring 101 in the resources section at the end. Among other valueable resources.
JMX-exposed Stream Metrics
Let's dive into the metrics. Start jmxterm and open a connection to the local JVM with the Kafka Streams application running. For connecting you can use the open command. The command beans will show you all exposed beans. In this article I will only cover the global stream metrics. The same metrics are defined for each processor. When using jmxterm, it is important to switch client-id and type in the bean name when querying, otherwise, jmxterm responds with bean name invalid, see my problem description and Sachin Mittal's workaround on the helpful Kafka users mailing list.
$>info -d kafka.streams -b kafka.streams:type=stream-metrics, client-id=app-fdddffde-5b10-44c8-82dc-3e343cdab829-StreamThread-1 #mbean = kafka.streams:type=stream-metrics, client-id=app-fdddffde-5b10-44c8-82dc-3e343cdab829-StreamThread-1 #class name = org.apache.kafka.common.metrics.JmxReporter$KafkaMbean # attributes %0 - commit-latency-avg (double, r) %1 - commit-latency-max (double, r) %2 - commit-rate (double, r) %3 - poll-latency-avg (double, r) %4 - poll-latency-max (double, r) %5 - poll-rate (double, r) %6 - process-latency-avg (double, r) %7 - process-latency-max (double, r) %8 - process-rate (double, r) %9 - punctuate-latency-avg (double, r) %10 - punctuate-latency-max (double, r) %11 - punctuate-rate (double, r) %12 - skipped-records-rate (double, r) %13 - task-closed-rate (double, r) %14 - task-created-rate (double, r)
The latencies are in milliseconds, the rates are per-second. To get the values of an attribute use the get command. For the original descriptions of these metrics, have a look at this code.
To understand what these metrics are measuring, we need to know the answers to the following questions in Kafka jargon: What is a commit request? What is a poll? What is a process? What is a task? What is puncate? And finally: What are skipped records?
- A commit or commit request is a request from a Kafka consumer to the broker to commit a certain processed offset to mark the latest offset it read from.
- A poll or long poll is the process of fetching records from the broker to the consumer. If there are no new records available it will block (busy-wait). Metrics for polling are only updated if long polling is used in the consumer. However, it will be initialized with zero.
- A process is a function evaluation of a StreamTask in a StreamThread associated with a PartitionGroup, compare with documentation on stream partition and thread model and code.
- The punctuate method is part of the Low-Level Processer API. The punctuate method is executed periodically based on elapsed time. When you use the High-Level Streams DSL you don't have to bother with this metric, since it is not used. However, it will be initialized with zero.
- The record is skipped when it is added to the StreamTask record queue. In the code documentation it says, a record is skipped when it has an invalid timestamp, see StreamTask code and StreamThread code. So this should be zero, otherwise, you might want to check your timestamps.
Custom Metrics Reporter
You might want to push your metrics to e.g. statsd or some SaaS to monitor and alert on anomalies or thresholds. There are some libraries available for Kafka, which might work for Kafka Streams metrics as well. Otherwise, you can fairly simple add your own custom metrics reporter by creating a class which implements the MetricsReporter interface and passing the class name to the metric.reporters parameter in the StreamsConfig.
If you found a mistake, or like to add something, please feel free to send me a mail under contact.
- Introducing Kafka Streams: Stream Processing Made Simple
- Collecting Kafka Performance Metrics
- Monitoring Kafka Performance Metrics
- Official Kafka Streams Documentation Monitoring
- Stream Processing with Kafka Streams by Hugo Picado
- Kafka Papers and Presentations Wiki