How to use kafka with testcontainers in golang applications

Testcontainers is an incredible library that allows us to scale up docker images programatically. It was developed initially for Java, but it's already ported to many other languages: Golang, Python, NodeJS, .NET, Scale, Rust and maybe others that I'm not aware.

How does testcontainers work?

In a nutshell, when you start a container using testcontainers, the library will be responsible to download the docker image, scale the container and also wait the container to be ready (by specifying a wait strategy). Furthermore, when your application stops, testcontainers will also take care of scaling all the containers down, which is a also a huge benefit!

  1. Run tests
  2. When tests finishes, testcontainers take care of scalling down all instances

How to use kafka with testcontainers?

Using kafka with testcontainers is not straightforward, given the following reasons:

Zookeeper dependency

Kafka depends on zookeeper to run, so we also need to configure the zookeeper instance beforehand and setup kafka with the right zookeeper parameters.

The Chicken or the egg dilemma

After your container is scaled up, your application will need to communicate with it. This communication generally is done using the container's port on the host netwkork .And here is the caveat, testcontainers doesn't allow us to define the container's exposed port on the host network beforehand (avoiding the case when the port is already in use). It's only possible to specify which container's port you want to expose, and it will take care of finding an available host port. The only way to get the exposed port on the host network is after the container has started, using a specific method.

  1. Get host's exposed port and setup kafka configuration in container
  2. Start kafka

Solution

I'm going to explain how we can do this in golang! If you want the quick answer you can check this file.

1. Create docker network

network, err := testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{
NetworkRequest: testcontainers.NetworkRequest{Name: CLUSTER_NETWORK_NAME},
})

2. Create zookeeper container

zookeeperContainer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: ZOOKEEPER_IMAGE,
ExposedPorts: []string{ZOOKEEPER_PORT},
Env: map[string]string{"ZOOKEEPER_CLIENT_PORT": ZOOKEEPER_PORT, "ZOOKEEPER_TICK_TIME": "2000"},
Networks: []string{network.Name},
NetworkAliases: map[string][]string{network.Name: {"zookeeper"}},
},
})
  • Env: Sets the environmental variables for the container. The main variable is ZOOKEEPER_PORT, which defines the port that zookeeper will listen to.
  • Networks: specifies the network that this container should use (the one created in step 1)

3. Create kafka container

kafkaContainer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: KAFKA_IMAGE,
ExposedPorts: []string{KAFKA_CLIENT_PORT},
Env: map[string]string{
"KAFKA_BROKER_ID": "1",
"KAFKA_ZOOKEEPER_CONNECT": "zookeeper:" + ZOOKEEPER_PORT,
"KAFKA_LISTENERS": "PLAINTEXT://0.0.0.0:" + KAFKA_CLIENT_PORT + ",BROKER://0.0.0.0:" + KAFKA_BROKER_PORT,
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT",
"KAFKA_INTER_BROKER_LISTENER_NAME": "BROKER",
"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "1",
},
Networks: []string{network.Name},
NetworkAliases: map[string][]string{network.Name: {"kafka"}},
// the container only starts when it finds and run /testcontainers_start.sh
Cmd: []string{"sh", "-c", "while [ ! -f /testcontainers_start.sh ]; do sleep 0.1; done; /testcontainers_start.sh"},
},
})
  • Env: Sets the environmental variables for the container. The most importants are: KAFKA_ZOOKEEPER_CONNECT (the zookeeper url), KAFKA_LISTENERS (defines each protocol's port. In this example, the PLAINTEXT protocol will be used by clients to connect to kafka)
  • Networks: specifies the network that this container should use (the one created in step 1)
  • Cmd: defines the command that should run when the container is started. In this example we want to run a custom bash script

4. Start zookeeper and kafka containers

kc.zookeeperContainer.Start(ctx) 
kc.kafkaContainer.Start(ctx)

5. Start kafka

When we started kafka container in the previous step, it will run the command defined when we created the kafka container, which is run the script /testcontainers_start.sh. Since this container doesn't exist yet, it will keep trying to run it.

kafkaStartFile, err := ioutil.TempFile("", "testcontainers_start.sh") if err != nil {  panic(err) } 
defer os.Remove(kafkaStartFile.Name())

// needs to set KAFKA_ADVERTISED_LISTENERS with the exposed kafka port
exposedHost := kc.GetKafkaHost() kafkaStartFile.WriteString("#!/bin/bash \n")
kafkaStartFile.WriteString("export KAFKA_ADVERTISED_LISTENERS='PLAINTEXT://" + exposedHost + ",BROKER://kafka:" + KAFKA_BROKER_PORT + "'\n")
kafkaStartFile.WriteString(". /etc/confluent/docker/bash-config \n")
kafkaStartFile.WriteString("/etc/confluent/docker/configure \n") kafkaStartFile.WriteString("/etc/confluent/docker/launch \n")
err = kc.kafkaContainer.CopyFileToContainer(ctx, kafkaStartFile.Name(), "testcontainers_start.sh", 0700)
  1. Set KAFKA_ADVERTISED_LISTENERS environmental variable with the port obtained in the previous step
  2. Set the commands to start kafka
  3. Transfer the file to the running kafka container

How to use it

If you want to check a test using kafka with testcontainers, I prepared this test that uses everything we discussed so far. It produces messages into a real kafka instance and then consumes them.

Wrap up

In this post we discussed what is testcontainers and how we can use it to scale a real kafka instance (alongside with zookeeper) to use it on a test scenario.

Senior Software Engineer @ LogMeIn

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store