Using Monix with Kafka, Avro and Schema Registry

In this brief tech article, Ill show how to add a serializer, deserializer to monix-kafka to work with Confluents schema registry. This is important to ensure out-of-the-box KSQL and Kafka connect compatibility when integrating Monix microservices.

Working with Confluent's schema registry solves problems like where to store schemas in a polyglot environment, how to keep track of versions and how to evolve schemas over time. It's also required to have implicit schemas for your Kafka topics when using Kafka connectors or KSQL. If you don't need the former, you might as well just use a type of Array[Byte] and serialize with Avro4s or avrohugger.

So what's Monix? Monix is a high-performance Scala / Scala.js library for composing asynchronous, event-based programs. It's leveraging concepts of Scalaz' task and provides further out-of-the-box compatibility with the Reactive Streams protocol. The developers behind Monix also created monix-kafka, which is a wrapper for Kafka to easily integrate with Monix-based services. In addition, a consumer can also be turned into an Observable, see also this comparison to Akka Actors, Akka Streams and FS2.

Add Dependencies

When using monix-kafka, you instantiate your Kafka producer by passing a configuration and type parameters of how your records look like:

val producerCfg = KafkaProducerConfig.default.copy(bootstrapServers = brokers.toList)
private val producer = KafkaProducer[Array[Byte], Array[Byte]](producerCfg, scheduler)

The type for both key and value is the supported Array[Byte] type. However, let's serialize with Confluent's Avro serializer. For this, first, we need to add the missing link, a new resolver URL and the serializer dependency. Be sure you add the right version depending the schema registry you're running.

    resolvers += "confluent.io" at "http://packages.confluent.io/maven/"
    libraryDependencies ++= "io.confluent" % "kafka-avro-serializer" % "5.0.0"

Extend Monix Kafka

Thanks to implicits we can easily add it to extend the capabilities of Monix' Kafka producer and consumer. Let's create an Object that contains functions to create implicit MonixSerializer and MonixDeserializer values given a serializer, deserializer configuration and a boolean parameter to indicate whether it is the record key (needed by Confluent's Kafka Avro Serializer). In the configuration we can now pass the schema registry URL.

    import io.confluent.kafka.serializers.{KafkaAvroDeserializer, KafkaAvroSerializer}
    import monix.kafka.{Serializer => MonixSerializer}
    import monix.kafka.{Deserializer => MonixDeserializer}
    import collection.JavaConverters._
    
    object AvroSerializer {
      def serializer(cfg: Map[String, String], isKey: Boolean): MonixSerializer[Object] =
        MonixSerializer[Object](
          className = "io.confluent.kafka.serializers.KafkaAvroSerializer",
          classType = classOf[KafkaAvroSerializer],
          constructor = _ => {
            val serializer = new KafkaAvroSerializer()
            serializer.configure(cfg.asJava, isKey)
            serializer
          }
        )
    
       def deserializer(cfg: Map[String, String], isKey: Boolean): MonixDeserializer[Object] =
        MonixDeserializer[Object](
          className = "io.confluent.kafka.serializers.KafkaAvroDeserializer",
          classType = classOf[KafkaAvroDeserializer],
          constructor = _ => {
            val deserializer = new KafkaAvroDeserializer()
            deserializer.configure(cfg.asJava, isKey)
            deserializer
          }
        )
    }

Monix Kafka Producer

Let's use these implicits now to instantiate a Kafka producer which serializes to Confluent's Avro format and uses the schema registry to lookup the schema for a specific topic, great for typesafetyness in Kafka topics. Here's how the rest of the code looks like. It's analogous to use Avro in the key field or instantiating a Kafka consumer.

    case class RecordValue(someInt: Int)

    val serCfg = Map("schema.registry.url" -> "http://schemaregistry:8081")
    implicit val serializer: Serializer[Object] = AvroSerializer.serializer(serCfg, false)
    implicit val format = RecordValue[ValueFormat]

    val producerCfg = KafkaProducerConfig.default.copy(bootstrapServers = brokers.toList)
    val producer = KafkaProducer[String, Object](producerCfg, scheduler)
    
    val recordVal = format.to(RecordValue(1))
    val record = new ProducerRecord[String, Object](topic, 0, "key", recordVal)

    val task = producer.send(record)