Kafka Streams in Scala with Schema Registry

In this example, I'll use the new Scala API which was released in Kafka 2.0 with Confluent's schema registry and Avro4s to convert a GenericRecord into case class. I couldn't find anything on the net when working on this, so I might as well provide an example here. Further, Circe and Avro4s are great to also create JSON schema files programmatically.

Working with Confluent's schema registry solves problems like having type safety Kafka topics, where to store schemas in a polyglot environment, how to keep track of versions and how to evolve schemas over time. Further, you have automatic schemas in Kafka connectors or KSQL. If you don't need the former advantages, you might as well just use a type of Array[Byte] and serialize with Avro4s or avrohugger.

Circe and Avro4s to Create Schemas

In this example, I'll use the new Scala API which was released in Kafka 2.0 with Confluent's schema registry and Avro4s to convert a GenericRecord into a case class. Avro4s is also great to generate your Avro schema in JSON, with a small hack, easy peasy:

import java.io.{File, PrintWriter}
import io.circe.generic.auto._
import io.circe.syntax._
import com.sksamuel.avro4s.AvroSchema

case class Block(hash: String, number: Long)
case class AvroSchemaImport(schema: String)

val inner = AvroSchema[Block].toString(pretty = false)
val schema = AvroSchemaImport(inner).asJson.noSpaces
val writer = new PrintWriter(new File("block.json"))
writer.write(schema)
writer.close()

You can load these using the corresponding API calls into the registry. So, regarding the Kafka Streams example with Avro serialization and schema registry I couldn't find anything on the net when working on this, so I might as well provide an example here.

Add Dependencies

So, let's have a look at the crucial dependencies. In case you want to also create schemas, add "io.circe" %% "circe-generic" % "0.9.3" or newer as well.

resolvers += "confluent.io" at "http://packages.confluent.io/maven/"
libraryDependencies ++= 
    Seq("org.apache.kafka" %% "kafka-streams-scala" % "2.0.0",
        "io.confluent" % "kafka-streams-avro-serde" % "5.0.0",
        "com.sksamuel.avro4s" %% "avro4s-core" % "1.9.0")

Kafka Streams Example (using Scala API in Kafka 2.0)

When I searched on the net for a proper setup of a Kafka Streams application with a schema registry using Avro the Scala way, I couldn't find anything. One quirk integrating the GenericRecord is the need for manually specifiying the implicit Serde[GenericRecord] value. Which is needed for all the implicit conversions which make it possible to write beautifully uncluttered code. Here's the code skeleton most people might want to use in some version or another:

import scala.collection.JavaConverters._
import com.company.Schemas.Block
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}

import com.sksamuel.avro4s.{FromRecord, RecordFormat, ToRecord}
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer
import io.confluent.kafka.streams.serdes.avro.GenericAvroDeserializer

import org.apache.avro.generic.GenericRecord
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream.KStream

object Avro {
  type SchemaRegistryUrl = String

  def stream[T: ToRecord : FromRecord]
    (builder: StreamsBuilder, topic: String)
    (implicit url: SchemaRegistryUrl): KStream[String, T] = {

    val config = Map("schema.registry.url" -> url).asJava
    implicit def stringSerde = Serdes.String()
    implicit def genericAvroSerde = Serdes.serdeFrom({
      val ser = new GenericAvroSerializer()
      ser.configure(config, isSerializerForRecordKeys = false)
      ser
    }, {
      val de = new GenericAvroDeserializer()
      de.configure(config, isDeserializerForRecordKeys = false)
      de
    })

  builder
    .stream[String, GenericRecord](topic)
    .mapValues(v => RecordFormat[T].from(v))
  }
}

object Main extends App with LazyLogging {

  val config = new Properties()
  config.put(StreamsConfig.APPLICATION_ID_CONFIG, "application_id")
  config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092")

  implicit val url: Avro.SchemaRegistryUrl = "http://localhost:8081"

  val builder = new StreamsBuilder()
  val blocks  = Avro.stream[Block](builder, "blocks")

  (new KafkaStreams(builder.build(), config)).start()

  sys.ShutdownHookThread {
    streams.close(10, TimeUnit.SECONDS)
  }
}

Kafka Streams Example (using Lightbend's Scala API)

In the first version of this article I was using Lightbend's Scala API. Matthias Sax of Confluent let me know that it was integrated in Kafka 2.0, so I updated the code (see example above). However, I guess it also makes sense to keep the old version for reference (which might help you in some situations).

import java.util.Properties
import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.common.serialization.Serdes

import com.company.schemas.Block
import com.lightbend.kafka.scala.streams.StreamsBuilderS
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
import com.sksamuel.avro4s.{FromRecord, RecordFormat, ToRecord}

object Main extends App with LazyLogging {
  val config = new Properties()
  config.put(StreamsConfig.APPLICATION_ID_CONFIG, "application_id")
  config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092")
  config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
      Serdes.String().getClass.getName)
  config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
      classOf[GenericAvroSerde].getCanonicalName)
  config.put("schema.registry.url", "http://schemaregistry:8081")

  val builder = new StreamsBuilderS()
  builder
    .stream[String, GenericRecord](topic)
    .mapValues(v => RecordFormat[Block].from(v))

  (new KafkaStreams(builder.build(), config)).start()

  sys.ShutdownHookThread {
    streams.close(10, TimeUnit.SECONDS)
  }
}

And remember, keep the types safe!


One or two mails a month about the latest technology I'm hacking on.