There are two popular Docker images for Kafka that I have come across:
I chose these instead of via Confluent Platform because they’re more vanilla compared to the components Confluent Platform includes.
You can run both the Bitmami/kafka
and wurstmeister/kafka
images locally using the docker-compose
config below, I’ll duplicate it with the name of each image inserted:
-
Bitmami/kafka
:version: "3" services: zookeeper: image: 'bitnami/zookeeper:latest' ports: - '2181:2181' environment: - ALLOW_ANONYMOUS_LOGIN=yes kafka: image: 'bitnami/kafka:latest' ports: - '9092:9092' environment: - KAFKA_BROKER_ID=1 - KAFKA_LISTENERS=PLAINTEXT://:9092 - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 - ALLOW_PLAINTEXT_LISTENER=yes depends_on: - zookeeper
The Bitnami image is well documented and is where I pulled this nice
docker-compose
config from. -
wurstmeister/kafka
:version: "3" services: zookeeper: image: 'wurstmeister/zookeeper:latest' ports: - '2181:2181' environment: - ALLOW_ANONYMOUS_LOGIN=yes kafka: image: 'wurstmeister/kafka:latest' ports: - '9092:9092' environment: - KAFKA_BROKER_ID=1 - KAFKA_LISTENERS=PLAINTEXT://:9092 - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 - ALLOW_PLAINTEXT_LISTENER=yes depends_on: - zookeeper
It is important to have
KAFKA_ADVERTISED_LISTENERS
set or you won’t be able to connect to Kafka from an external application. More information on this topic can be found here, which I found extremely useful when I did this incorrectly.
Once you have started the Kafka and Zookeeper containers, you’re good to go. You can start connecting to Kafka either directly or from an application.
-
Directly, via Kafka’s consumer and producer scripts:
# Create topic docker exec -it kafka_kafka_1 kafka-topics.sh --create --bootstrap-server kafka:9092 --topic my-topic # Create events docker exec -it kafka_kafka_1 kafka-console-producer.sh --bootstrap-server kafka:9092 --topic my-topic # Read events docker exec -it kafka_kafka_1 kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic my-topic --from-beginning
-
From an application (in Kotlin):
fun createProducer(): Producer<String, String> { val props = Properties() props["bootstrap.servers"] = "localhost:9092" props["acks"] = "all" props["retries"] = 0 props["linger.ms"] = 1 props["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer" props["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer" return KafkaProducer(props) } fun Producer<String, String>.produceMessages(topic: String) { fixedRateTimer(daemon = true, period = Duration.ofSeconds(2).toMillis()) { val time = LocalDateTime.now() val message = ProducerRecord( topic, // topic time.toString(), // key "Message sent at ${LocalDateTime.now()}" // value ) send(message) } } fun createConsumer(): Consumer<String, String> { val props = Properties() props.setProperty("bootstrap.servers", "localhost:9092") props.setProperty("group.id", "test") props.setProperty("enable.auto.commit", "true") props.setProperty("auto.commit.interval.ms", "1000") props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") return KafkaConsumer(props) } fun Consumer<String, String>.consumeMessages(topic: String) { subscribe(listOf(topic)) while (true) { val messages: ConsumerRecords<String, String> = poll(Duration.ofMillis(5000)) for (message in messages) { println("Consumer reading message: ${message.value()}") } commitAsync() } }