Kafka

Using Protobuf with Apache Kafka and without Schema Registry

Franklin Lindemberg Guimarães
4 min readFeb 14, 2021

Introduction

There are many resources talking about how to use Protobuf on Kafka, but most of them require Schema Registry. What if you want to leverage Protobuf on your application but don't want to use Schema Registry? Well, then this post is exactly for you!

I'll assume that you are already familiar with Protobuf and the advantages of using it with Kafka.But if it'’ not the case, you can check these links:

Solution

The idea behind using Protobuf with Kafka is pretty simple! We basically need to use a byteArray SerDes and do an extra step when producing/consuming:

Message Production

  • Use org.apache.kafka.common.serialization.ByteArraySerializer
  • Just before producing the message in kafka, use the method .toByteArray() available in the generated protobuf object. This way you will get the serialized Protobuf message and will be able to produce it in accordance to serializer we just set (org.apache.kafka.common.serialization.ByteArraySerializer).

Message Consumption

  • Use org.apache.kafka.common.serialization.ByteArrayDeserializer
  • Just after consuming the message use .parseFrom(byte[] messageBytes) available as a static method in the generated Protobuf class.Then you will be able to use that message parsed to the protobuf object that you need.

Another solution is to implement your own serdes for your protobuf message. But in this case you might need to create one serdes for every message type you have. I decided to go over the byte[] one since it's more generic.

Implementation

Model definition

Let's say we have a User model that contains a required name and an optional age properties. The proto model definition can be found below.

syntax = "proto2";

package com.franklin.samples.kafka.models;

option java_outer_classname = "Models";
option java_package = "com.franklin.samples.kafka";

message User {
required string name = 1;
optional int32 age = 2;
}

Plugin configuration

After defining the .proto file, we need to generate the java class from it, in order to use in our application. In java, this can be done by using the protoc-jar-maven-plugin. This plugin is interesting because you can generate code for multiple languages (java, golang, python, c#, etc). For our scenario we will be generating only in java. Below we have the configuration used in our scenario.

<plugin>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar-maven-plugin</artifactId>
<version>${deps.protobuf.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<addProtoSources>all</addProtoSources>
<inputDirectories>
<directory>${basedir}/src/main/resources/protobuf</directory>
</inputDirectories>
<outputTargets>
<outputTarget>
<type>java</type>
<outputDirectory>${basedir}/target/generated-sources/protobuf</outputDirectory>
</outputTarget>
</outputTargets>
</configuration>
</execution>
</executions>
</plugin>

Springboot configuration

The next step is to setup kafka in our spring application configuration. This can be done using the code below. Notice that the consumer key-deserializer and the producer key-serializer are set to org.apache.kafka.common.serialization.ByteArraySerializer.

spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
auto-offset-reset: earliest
group_id: group-id
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer

Kafka consumer

The consumer definition can be found below. Notice that just after consuming the message byte[], we parse it to the protobuf object using Models.User.parseFrom(message). The full code can be found here.

public class Consumer {

private final Logger logger = LoggerFactory.getLogger(Consumer.class);

private CountDownLatch latch = new CountDownLatch(1);
private Models.User payload = null;

@KafkaListener(topics = "${app.topic}")
public void consume(byte[] message) throws IOException {
latch.countDown();
payload = Models.User.parseFrom(message);

logger.info("Consumed message [{}]", payload);
}

public CountDownLatch getLatch() {
return latch;
}

public Models.User getPayload() {
return payload;
}
}

Kafka producer

The producer definition can be found below. Notice that just before producing the message, we parse it to byte[] using message.toByteArray(). The full code can be found here.

public class Producer {

private static final Logger logger = LoggerFactory.getLogger(Producer.class);

@Autowired
private KafkaTemplate<String, byte[]> kafkaTemplate;

public void send(final String topic, final Models.User message) {
logger.info("Producing message [{}]", message);

kafkaTemplate.send(topic, message.toByteArray());
}
}

Testing the solution

I created a small test so we can see all of this in action. This test will use a springboot embedded Kafka (we could also change it to use testcontainers or connect to an external Kafka). Basically what the test does is produce a given Models.User message and wait at most 10s to consume it, validating that the consumed message is equal to the previous Models.User message. The full code can be found here.

@Test
public void whenUserMessageIsProduced_thenUserMessageIsConsumed() throws Exception
{
final Models.User userMessage = Models.User.newBuilder()
.setName("Jonh")
.setAge(33)
.build();

producer.send(topic, userMessage);

consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(consumer.getLatch().getCount(), equalTo(0L));
assertThat(consumer.getPayload(), equalTo(userMessage));
}

Conclusion

The main goal of this article was to show how we could use Protobuf with Kafka in a very simple way!

If you wanna check the complete code, it can be found in this repo.

Please let me know if you have any comments, suggestions or ideas!

--

--