How to use kafka with testcontainers in golang applications

Franklin Lindemberg Guimarães
5 min readSep 20, 2020

--

Testcontainers is an incredible library that allows us to scale up docker images programatically. It was developed initially for Java, but it's already ported to many other languages: Golang, Python, NodeJS, .NET, Scale, Rust and maybe others that I'm not aware.

How does testcontainers work?

In a nutshell, when you start a container using testcontainers, the library will be responsible to download the docker image, scale the container and also wait the container to be ready (by specifying a wait strategy). Furthermore, when your application stops, testcontainers will also take care of scaling all the containers down, which is a also a huge benefit!

I've been using testcontainers mainly for integration/acceptance tests where I want to have some real external dependencies (redis, sql database, kafka). It works as following:

  1. Before the test, testcontainers scale up all containers needed
  2. Run tests
  3. When tests finishes, testcontainers take care of scalling down all instances

This allows us to run these external dependencies (and also the tests) seamlessly, since we don't depend on the host machine (it even works in jenkins!)

How to use kafka with testcontainers?

Using kafka with testcontainers is not straightforward, given the following reasons:

Zookeeper dependency

Kafka depends on zookeeper to run, so we also need to configure the zookeeper instance beforehand and setup kafka with the right zookeeper parameters.

Although its possible to run zookeeper in the same container as kafka, I prefer to run it in a different container, and for this we need to configure zookeeper with testcontainers as well.

The Chicken or the egg dilemma

After your container is scaled up, your application will need to communicate with it. This communication generally is done using the container's port on the host netwkork .And here is the caveat, testcontainers doesn't allow us to define the container's exposed port on the host network beforehand (avoiding the case when the port is already in use). It's only possible to specify which container's port you want to expose, and it will take care of finding an available host port. The only way to get the exposed port on the host network is after the container has started, using a specific method.

You might be wondering, why would this be a problem for setting up kafka with testcontainers? Briefly, if we want to access kafka from the host network, we need to configure it with the host's exposed port. But to get the host's exposed port the container needs to be running.

This could be the chicken or the egg dilemma (which came first?), but fortunately we have a solution for this by doing the following steps:

  1. Start kafka container (but do not start kafka)
  2. Get host's exposed port and setup kafka configuration in container
  3. Start kafka

By doing this we will be able to run kafka with the right configuration and access it from the host network!

Solution

I'm going to explain how we can do this in golang! If you want the quick answer you can check this file.

Obs: for java there's already a KafkaContainer class that implements all this logic (with embedded zookeeper)

1. Create docker network

network, err := testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{
NetworkRequest: testcontainers.NetworkRequest{Name: CLUSTER_NETWORK_NAME},
})

We first create a docker network so we can have both the zookeeper and kafka containers in the same network. It simplifies communication between containers.

2. Create zookeeper container

zookeeperContainer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: ZOOKEEPER_IMAGE,
ExposedPorts: []string{ZOOKEEPER_PORT},
Env: map[string]string{"ZOOKEEPER_CLIENT_PORT": ZOOKEEPER_PORT, "ZOOKEEPER_TICK_TIME": "2000"},
Networks: []string{network.Name},
NetworkAliases: map[string][]string{network.Name: {"zookeeper"}},
},
})

This creates the zookeeper container (but don't start it yet). The main configurations are:

  • ExposedPorts: Defines the container port that we want testcontainers to expose in the host network
  • Env: Sets the environmental variables for the container. The main variable is ZOOKEEPER_PORT, which defines the port that zookeeper will listen to.
  • Networks: specifies the network that this container should use (the one created in step 1)

3. Create kafka container

kafkaContainer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: KAFKA_IMAGE,
ExposedPorts: []string{KAFKA_CLIENT_PORT},
Env: map[string]string{
"KAFKA_BROKER_ID": "1",
"KAFKA_ZOOKEEPER_CONNECT": "zookeeper:" + ZOOKEEPER_PORT,
"KAFKA_LISTENERS": "PLAINTEXT://0.0.0.0:" + KAFKA_CLIENT_PORT + ",BROKER://0.0.0.0:" + KAFKA_BROKER_PORT,
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT",
"KAFKA_INTER_BROKER_LISTENER_NAME": "BROKER",
"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "1",
},
Networks: []string{network.Name},
NetworkAliases: map[string][]string{network.Name: {"kafka"}},
// the container only starts when it finds and run /testcontainers_start.sh
Cmd: []string{"sh", "-c", "while [ ! -f /testcontainers_start.sh ]; do sleep 0.1; done; /testcontainers_start.sh"},
},
})

This creates the kafka container (but don’t start it yet). The main configurations are:

  • ExposedPorts: Defines the container port that we want testcontainers to expose in the host network.
  • Env: Sets the environmental variables for the container. The most importants are: KAFKA_ZOOKEEPER_CONNECT (the zookeeper url), KAFKA_LISTENERS (defines each protocol's port. In this example, the PLAINTEXT protocol will be used by clients to connect to kafka)
  • Networks: specifies the network that this container should use (the one created in step 1)
  • Cmd: defines the command that should run when the container is started. In this example we want to run a custom bash script

4. Start zookeeper and kafka containers

kc.zookeeperContainer.Start(ctx) 
kc.kafkaContainer.Start(ctx)

In this step we just use the testcontainers interface to start both containers.

5. Start kafka

When we started kafka container in the previous step, it will run the command defined when we created the kafka container, which is run the script /testcontainers_start.sh. Since this container doesn't exist yet, it will keep trying to run it.

The way we create (and move to the container) the /testcontainers_start.sh script is defined below:

kafkaStartFile, err := ioutil.TempFile("", "testcontainers_start.sh") if err != nil {  panic(err) } 
defer os.Remove(kafkaStartFile.Name())

// needs to set KAFKA_ADVERTISED_LISTENERS with the exposed kafka port
exposedHost := kc.GetKafkaHost() kafkaStartFile.WriteString("#!/bin/bash \n")
kafkaStartFile.WriteString("export KAFKA_ADVERTISED_LISTENERS='PLAINTEXT://" + exposedHost + ",BROKER://kafka:" + KAFKA_BROKER_PORT + "'\n")
kafkaStartFile.WriteString(". /etc/confluent/docker/bash-config \n")
kafkaStartFile.WriteString("/etc/confluent/docker/configure \n") kafkaStartFile.WriteString("/etc/confluent/docker/launch \n")
err = kc.kafkaContainer.CopyFileToContainer(ctx, kafkaStartFile.Name(), "testcontainers_start.sh", 0700)

This is script will do the following

  1. Get the kafka exposed port on the host network
  2. Set KAFKA_ADVERTISED_LISTENERS environmental variable with the port obtained in the previous step
  3. Set the commands to start kafka
  4. Transfer the file to the running kafka container

When the script is moved to the kafka container, the start up command define for it will find the script and will run it, starting kafka as expected.

How to use it

If you want to check a test using kafka with testcontainers, I prepared this test that uses everything we discussed so far. It produces messages into a real kafka instance and then consumes them.

Wrap up

In this post we discussed what is testcontainers and how we can use it to scale a real kafka instance (alongside with zookeeper) to use it on a test scenario.

If you have any comment, question, or ideas on how we could use testcontainers, please leave a message and I'll be happy to reach back to you!

--

--