Kafka

Using Protobuf with Apache Kafka and without Schema Registry

Introduction

Solution

Message Production

Message Consumption

Implementation

Model definition

syntax = "proto2";

package com.franklin.samples.kafka.models;

option java_outer_classname = "Models";
option java_package = "com.franklin.samples.kafka";

message User {
required string name = 1;
optional int32 age = 2;
}

Plugin configuration

<plugin>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar-maven-plugin</artifactId>
<version>${deps.protobuf.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<addProtoSources>all</addProtoSources>
<inputDirectories>
<directory>${basedir}/src/main/resources/protobuf</directory>
</inputDirectories>
<outputTargets>
<outputTarget>
<type>java</type>
<outputDirectory>${basedir}/target/generated-sources/protobuf</outputDirectory>
</outputTarget>
</outputTargets>
</configuration>
</execution>
</executions>
</plugin>

Springboot configuration

spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
auto-offset-reset: earliest
group_id: group-id
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer

Kafka consumer

public class Consumer {

private final Logger logger = LoggerFactory.getLogger(Consumer.class);

private CountDownLatch latch = new CountDownLatch(1);
private Models.User payload = null;

@KafkaListener(topics = "${app.topic}")
public void consume(byte[] message) throws IOException {
latch.countDown();
payload = Models.User.parseFrom(message);

logger.info("Consumed message [{}]", payload);
}

public CountDownLatch getLatch() {
return latch;
}

public Models.User getPayload() {
return payload;
}
}

Kafka producer

public class Producer {

private static final Logger logger = LoggerFactory.getLogger(Producer.class);

@Autowired
private KafkaTemplate<String, byte[]> kafkaTemplate;

public void send(final String topic, final Models.User message) {
logger.info("Producing message [{}]", message);

kafkaTemplate.send(topic, message.toByteArray());
}
}

Testing the solution

@Test
public void whenUserMessageIsProduced_thenUserMessageIsConsumed() throws Exception
{
final Models.User userMessage = Models.User.newBuilder()
.setName("Jonh")
.setAge(33)
.build();

producer.send(topic, userMessage);

consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(consumer.getLatch().getCount(), equalTo(0L));
assertThat(consumer.getPayload(), equalTo(userMessage));
}

Conclusion

Senior Software Engineer @ LogMeIn