Exploring Apache Kafka - Part 1


Bhaskar S 03/17/2018


Overview

Apache Kafka is an open source, low-latency, high-throughput, fault-tolerant, distributed, persistent, durable, data streaming software platform.

The core Apache Kafka platform supports the following capabilities:

In this article, we will focus on the Producer and Consumer aspects of the Apache Kafka platform. It is akin to the traditional enterprise messaging middleware, wherein a set of applications send different data events at one end for persistence, while a different set of applications retrieve the desired data events at the other end.

However, the main differences compared to the traditional enterprise messaging middleware are:

Terminology

The following is a high level architectural view of Apache Kafka:

Architecture
Architecture

In this section, we will list and briefly describe some of the terms used in the context of Apache Kafka.

Term Description
Topic is a category, with an associated name, for storing and distributing messages
Partition an immutable log containing a sequence of messages associated with a Topic. A Topic can have one or more Partition(s) associated with it, in order to scale the reads and writes of the messages
Offset an unique numeric sequence identifier associated with each message in a Partition of a Topic. As messages are appended to a Partition log, they are assigned an increasing sequence number, indicating their position in the log
Broker the server that houses the Partition(s) associated with one or more Topic(s). Each Broker is assigned an unique identifier. A Partition associated with a Topic can be replicated across multiple Broker(s) for fault-tolerance and fail-over
Cluster a group of one or more Broker(s) that can be hosted on one or more machine(s). Note that a machine can host one or more Broker(s)
Producer an application that generates messages for one or more Topic(s). Each message on a Topic is mapped to a Partition. In order to scale linearly and handle messages from multiple Producer(s), Topic(s) can have multiple Partition(s) spread across Broker(s)
Consumer an application that retrieves messages from one or more Topic(s). The Offsets corresponding to messages that are read and processed is saved and committed in a special Topic maintained by the Broker(s)
Consumer Group a set of Consumer(s) that cooperate to retrieve messages from one or more Partition(s) associated with one or more Topic(s)
ZooKeeper a distributed membership coordinator for Broker(s). It stores the metadata about the Topics(s) such as the number of Partition(s), which Brokers(s) house the Partition(s) associated with Topic(s), etc

Installation and Setup

The setup will be on a Ubuntu 16.04 LTS based Linux desktop.

Ensure Docker is installed and setup. Else, refer to the article Introduction to Docker.

We will assume a hypothetical user alice with the home directory located at /home/alice, using a bash shell.

Create directory structures called logs, kafka/data, and zk/data under /home/alice by executing the following command:

mkdir -p logs kafka/data zk/data

For our exploration, we will be downloading and using the official docker images confluentinc/cp-zookeeper:4.0.0-3 and confluentinc/cp-kafka:4.0.0-3.

To pull and download the docker image for confluentinc/cp-zookeeper:4.0.0-3, execute the following command:

docker pull confluentinc/cp-zookeeper:4.0.0-3

The following should be the typical output:

Output.1

4.0.0-3: Pulling from confluentinc/cp-zookeeper
f49cf87b52c1: Pull complete 
e84d5d3d4ecc: Pull complete 
77cefc0815fa: Pull complete 
42017afc871d: Pull complete 
198b9445dd77: Pull complete 
Digest: sha256:dcf3186f814e0ca90c3b6e18ac272dd3a30216febbb72d9bdd2a2e9cad826ae7
Status: Downloaded newer image for confluentinc/cp-zookeeper:4.0.0-3

Similarly, to pull and download the docker image for confluentinc/cp-kafka:4.0.0-3, execute the following command:

docker pull confluentinc/cp-kafka:4.0.0-3

The following should be the typical output:

Output.2

4.0.0-3: Pulling from confluentinc/cp-kafka
f49cf87b52c1: Already exists 
e84d5d3d4ecc: Already exists 
77cefc0815fa: Already exists 
6f5b44d773de: Pull complete 
59804b9a6eeb: Pull complete 
Digest: sha256:910589d606c6179ab7a0d6be5ce9015fce8f76d29db9cbe31721c9f14380fd68
Status: Downloaded newer image for confluentinc/cp-kafka:4.0.0-3

The first step is to start an instance of ZooKeeper. To launch the docker instance for ZooKeeper, execute the following command:

docker run -it --rm --name zk --net=host -u $(id -u $USER):$(id -g $USER) -v /home/alice/zk/data:/var/lib/zookeeper/data -v /home/alice/logs:/var/log/kafka -e ZOOKEEPER_CLIENT_PORT=10001 -e ZOOKEEPER_TICK_TIME=2000 confluentinc/cp-zookeeper:4.0.0-3

The following should be the typical output:

Output.3

[2018-03-17 18:01:26,734] INFO Reading configuration from: /etc/kafka/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2018-03-17 18:01:26,736] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2018-03-17 18:01:26,736] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2018-03-17 18:01:26,736] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2018-03-17 18:01:26,736] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2018-03-17 18:01:26,752] INFO Reading configuration from: /etc/kafka/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2018-03-17 18:01:26,752] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2018-03-17 18:01:26,759] INFO Server environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT (org.apache.zookeeper.server.ZooKeeperServer)
[2018-03-17 18:01:26,759] INFO Server environment:host.name=ganga (org.apache.zookeeper.server.ZooKeeperServer)
[2018-03-17 18:01:26,759] INFO Server environment:java.version=1.8.0_102 (org.apache.zookeeper.server.ZooKeeperServer)
[2018-03-17 18:01:26,759] INFO Server environment:java.vendor=Azul Systems, Inc. (org.apache.zookeeper.server.ZooKeeperServer)
[2018-03-17 18:01:26,759] INFO Server environment:java.home=/usr/lib/jvm/zulu-8-amd64/jre (org.apache.zookeeper.server.ZooKeeperServer)
[2018-03-17 18:01:26,759] INFO Server environment:java.class.path=:/usr/bin/../share/java/kafka/jackson-mapper-asl-1.9.13.jar:/usr/bin/../share/java/kafka/jopt-simple-5.0.4.jar:/usr/bin/../share/java/kafka/kafka-log4j-appender-1.0.0-cp1.jar:/usr/bin/../share/java/kafka/kafka_2.11-1.0.0-cp1-javadoc.jar:/usr/bin/../share/java/kafka/commons-codec-1.9.jar:/usr/bin/../share/java/kafka/metrics-core-2.2.0.jar:/usr/bin/../share/java/kafka/commons-beanutils-1.8.3.jar:/usr/bin/../share/java/kafka/connect-api-1.0.0-cp1.jar:/usr/bin/../share/java/kafka/aopalliance-repackaged-2.5.0-b32.jar:/usr/bin/../share/java/kafka/paranamer-2.7.jar:/usr/bin/../share/java/kafka/slf4j-log4j12-1.7.25.jar:/usr/bin/../share/java/kafka/jackson-core-asl-1.9.13.jar:/usr/bin/../share/java/kafka/javassist-3.21.0-GA.jar:/usr/bin/../share/java/kafka/jetty-servlet-9.2.22.v20170606.jar:/usr/bin/../share/java/kafka/jetty-security-9.2.22.v20170606.jar:/usr/bin/../share/java/kafka/commons-compress-1.8.1.jar:/usr/bin/../share/java/kafka/slf4j-api-1.7.25.jar:/usr/bin/../share/java/kafka/kafka_2.11-1.0.0-cp1-test-sources.jar:/usr/bin/../share/java/kafka/reflections-0.9.11.jar:/usr/bin/../share/java/kafka/commons-collections-3.2.1.jar:/usr/bin/../share/java/kafka/httpclient-4.5.2.jar:/usr/bin/../share/java/kafka/guava-20.0.jar:/usr/bin/../share/java/kafka/jetty-server-9.2.22.v20170606.jar:/usr/bin/../share/java/kafka/jetty-util-9.2.22.v20170606.jar:/usr/bin/../share/java/kafka/commons-logging-1.2.jar:/usr/bin/../share/java/kafka/httpmime-4.5.2.jar:/usr/bin/../share/java/kafka/jackson-jaxrs-base-2.9.1.jar:/usr/bin/../share/java/kafka/javax.servlet-api-3.1.0.jar:/usr/bin/../share/java/kafka/jersey-common-2.25.1.jar:/usr/bin/../share/java/kafka/jersey-client-2.25.1.jar:/usr/bin/../share/java/kafka/maven-artifact-3.5.0.jar:/usr/bin/../share/java/kafka/jersey-container-servlet-core-2.25.1.jar:/usr/bin/../share/java/kafka/commons-validator-1.4.1.jar:/usr/bin/../share/java/kafka/jetty-http-9.2.22.v20170606.jar:/usr/bin/../share/java/kafka/connect-runtime-1.0.0-cp1.jar:/usr/bin/../share/java/kafka/javassist-3.20.0-GA.jar:/usr/bin/../share/java/kafka/javax.annotation-api-1.2.jar:/usr/bin/../share/java/kafka/jackson-jaxrs-json-provider-2.9.1.jar:/usr/bin/../share/java/kafka/kafka-streams-examples-1.0.0-cp1.jar:/usr/bin/../share/java/kafka/javax.inject-1.jar:/usr/bin/../share/java/kafka/kafka_2.11-1.0.0-cp1-test.jar:/usr/bin/../share/java/kafka/kafka.jar:/usr/bin/../share/java/kafka/osgi-resource-locator-1.0.1.jar:/usr/bin/../share/java/kafka/hk2-utils-2.5.0-b32.jar:/usr/bin/../share/java/kafka/validation-api-1.1.0.Final.jar:/usr/bin/../share/java/kafka/zookeeper-3.4.10.jar:/usr/bin/../share/java/kafka/support-metrics-common-4.0.0.jar:/usr/bin/../share/java/kafka/javax.ws.rs-api-2.0.1.jar:/usr/bin/../share/java/kafka/hk2-api-2.5.0-b32.jar:/usr/bin/../share/java/kafka/jackson-databind-2.9.1.jar:/usr/bin/../share/java/kafka/kafka-streams-1.0.0-cp1.jar:/usr/bin/../share/java/kafka/connect-transforms-1.0.0-cp1.jar:/usr/bin/../share/java/kafka/jersey-server-2.25.1.jar:/usr/bin/../share/java/kafka/commons-digester-1.8.1.jar:/usr/bin/../share/java/kafka/zkclient-0.10.jar:/usr/bin/../share/java/kafka/commons-lang3-3.5.jar:/usr/bin/../share/java/kafka/commons-lang3-3.1.jar:/usr/bin/../share/java/kafka/kafka-tools-1.0.0-cp1.jar:/usr/bin/../share/java/kafka/kafka-clients-1.0.0-cp1.jar:/usr/bin/../share/java/kafka/javax.inject-2.5.0-b32.jar:/usr/bin/../share/java/kafka/rocksdbjni-5.7.3.jar:/usr/bin/../share/java/kafka/jersey-container-servlet-2.25.1.jar:/usr/bin/../share/java/kafka/connect-json-1.0.0-cp1.jar:/usr/bin/../share/java/kafka/jersey-media-jaxb-2.25.1.jar:/usr/bin/../share/java/kafka/log4j-1.2.17.jar:/usr/bin/../share/java/kafka/xz-1.5.jar:/usr/bin/../share/java/kafka/jetty-io-9.2.22.v20170606.jar:/usr/bin/../share/java/kafka/hk2-locator-2.5.0-b32.jar:/usr/bin/../share/java/kafka/plexus-utils-3.0.24.jar:/usr/bin/../share/java/kafka/jackson-module-jaxb-annotations-2.9.1.jar:/usr/bin/../share/java/kafka/lz4-java-1.4.jar:/usr/bin/../share/java/kafka/snappy-java-1.1.4.jar:/usr/bin/../share/java/kafka/scala-library-2.11.11.jar:/usr/bin/../share/java/kafka/kafka_2.11-1.0.0-cp1.jar:/usr/bin/../share/java/kafka/httpcore-4.4.4.jar:/usr/bin/../share/java/kafka/argparse4j-0.7.0.jar:/usr/bin/../share/java/kafka/jackson-annotations-2.9.1.jar:/usr/bin/../share/java/kafka/support-metrics-client-4.0.0.jar:/usr/bin/../share/java/kafka/kafka_2.11-1.0.0-cp1-sources.jar:/usr/bin/../share/java/kafka/jetty-servlets-9.2.22.v20170606.jar:/usr/bin/../share/java/kafka/connect-file-1.0.0-cp1.jar:/usr/bin/../share/java/kafka/jetty-continuation-9.2.22.v20170606.jar:/usr/bin/../share/java/kafka/jersey-guava-2.25.1.jar:/usr/bin/../share/java/kafka/kafka_2.11-1.0.0-cp1-scaladoc.jar:/usr/bin/../share/java/kafka/jackson-core-2.9.1.jar:/usr/bin/../share/java/kafka/avro-1.8.2.jar:/usr/bin/../share/java/confluent-support-metrics/*:/usr/share/java/confluent-support-metrics/* (org.apache.zookeeper.server.ZooKeeperServer)
[2018-03-17 18:01:26,759] INFO Server environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.server.ZooKeeperServer)
[2018-03-17 18:01:26,759] INFO Server environment:java.io.tmpdir=/tmp (org.apache.zookeeper.server.ZooKeeperServer)
[2018-03-17 18:01:26,759] INFO Server environment:java.compiler= (org.apache.zookeeper.server.ZooKeeperServer)
[2018-03-17 18:01:26,759] INFO Server environment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer)
[2018-03-17 18:01:26,759] INFO Server environment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)
[2018-03-17 18:01:26,760] INFO Server environment:os.version=4.10.0-38-generic (org.apache.zookeeper.server.ZooKeeperServer)
[2018-03-17 18:01:26,760] INFO Server environment:user.name=? (org.apache.zookeeper.server.ZooKeeperServer)
[2018-03-17 18:01:26,760] INFO Server environment:user.home=? (org.apache.zookeeper.server.ZooKeeperServer)
[2018-03-17 18:01:26,760] INFO Server environment:user.dir=/ (org.apache.zookeeper.server.ZooKeeperServer)
[2018-03-17 18:01:26,764] INFO tickTime set to 2000 (org.apache.zookeeper.server.ZooKeeperServer)
[2018-03-17 18:01:26,765] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2018-03-17 18:01:26,765] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2018-03-17 18:01:26,771] INFO binding to port 0.0.0.0/0.0.0.0:10001 (org.apache.zookeeper.server.NIOServerCnxnFactory)

The parameter ZOOKEEPER_CLIENT_PORT with a value of 10001 specifies the network port on which ZooKeeper is listening for client connections.

The parameter ZOOKEEPER_TICK_TIME with a value of 2000 specifies the length of a single clock tick (measured in milliseconds) used by ZooKeeper to listen for heartbeats and manage timeouts.

To check if the docker instance for ZooKeeper is up and running, execute the telnet command on port 10001 as shown below:

telnet localhost 10001

The following should be the typical output:

Output.4

Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.

The telnet command is basically waiting for some input. Enter the word conf and press ENTER.

The following should be the typical output:

Output.5

clientPort=10001
dataDir=/var/lib/zookeeper/data/version-2
dataLogDir=/var/lib/zookeeper/log/version-2
tickTime=2000
maxClientCnxns=60
minSessionTimeout=4000
maxSessionTimeout=40000
serverId=0
Connection closed by foreign host.

Execute the telnet command on port 10001 one more time as shown below:

telnet localhost 10001

This time enter the word stat and press ENTER.

The following should be the typical output:

Output.6

Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
stat
Zookeeper version: 3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT
Clients:
 /127.0.0.1:35680[0](queued=0,recved=1,sent=0)

Latency min/avg/max: 0/0/0
Received: 2
Sent: 1
Connections: 1
Outstanding: 0
Zxid: 0x0
Mode: standalone
Node count: 4
Connection closed by foreign host.

The next step is to start an instance of Kafka Broker. To launch the docker instance of a Kafka Broker, execute the following command:

docker run -it --rm --name=kafka --net=host -u $(id -u $USER):$(id -g $USER) -v /home/alice/kafka/data:/var/lib/kafka/data -v /home/alice/logs:/var/log/kafka -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=localhost:10001 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:20001 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 confluentinc/cp-kafka:4.0.0-3

The following should be the typical output:

Output.7

[2018-03-17 18:44:43,956] INFO starting (kafka.server.KafkaServer)
[2018-03-17 18:44:43,957] INFO Connecting to zookeeper on localhost:10001 (kafka.server.KafkaServer)
[2018-03-17 18:44:43,967] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2018-03-17 18:44:43,971] INFO Client environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT (org.apache.zookeeper.ZooKeeper)
[2018-03-17 18:44:43,971] INFO Client environment:host.name=ganga (org.apache.zookeeper.ZooKeeper)
[2018-03-17 18:44:43,971] INFO Client environment:java.version=1.8.0_102 (org.apache.zookeeper.ZooKeeper)
[2018-03-17 18:44:43,971] INFO Client environment:java.vendor=Azul Systems, Inc. (org.apache.zookeeper.ZooKeeper)
[2018-03-17 18:44:43,971] INFO Client environment:java.home=/usr/lib/jvm/zulu-8-amd64/jre (org.apache.zookeeper.ZooKeeper)
[2018-03-17 18:44:43,971] INFO Client environment:java.class.path=:/usr/bin/../share/java/kafka/jackson-mapper-asl-1.9.13.jar:/usr/bin/../share/java/kafka/jopt-simple-5.0.4.jar:/usr/bin/../share/java/kafka/kafka-log4j-appender-1.0.0-cp1.jar:/usr/bin/../share/java/kafka/kafka_2.11-1.0.0-cp1-javadoc.jar:/usr/bin/../share/java/kafka/commons-codec-1.9.jar:/usr/bin/../share/java/kafka/metrics-core-2.2.0.jar:/usr/bin/../share/java/kafka/commons-beanutils-1.8.3.jar:/usr/bin/../share/java/kafka/connect-api-1.0.0-cp1.jar:/usr/bin/../share/java/kafka/aopalliance-repackaged-2.5.0-b32.jar:/usr/bin/../share/java/kafka/paranamer-2.7.jar:/usr/bin/../share/java/kafka/slf4j-log4j12-1.7.25.jar:/usr/bin/../share/java/kafka/jackson-core-asl-1.9.13.jar:/usr/bin/../share/java/kafka/javassist-3.21.0-GA.jar:/usr/bin/../share/java/kafka/jetty-servlet-9.2.22.v20170606.jar:/usr/bin/../share/java/kafka/jetty-security-9.2.22.v20170606.jar:/usr/bin/../share/java/kafka/commons-compress-1.8.1.jar:/usr/bin/../share/java/kafka/slf4j-api-1.7.25.jar:/usr/bin/../share/java/kafka/kafka_2.11-1.0.0-cp1-test-sources.jar:/usr/bin/../share/java/kafka/reflections-0.9.11.jar:/usr/bin/../share/java/kafka/commons-collections-3.2.1.jar:/usr/bin/../share/java/kafka/httpclient-4.5.2.jar:/usr/bin/../share/java/kafka/guava-20.0.jar:/usr/bin/../share/java/kafka/jetty-server-9.2.22.v20170606.jar:/usr/bin/../share/java/kafka/jetty-util-9.2.22.v20170606.jar:/usr/bin/../share/java/kafka/commons-logging-1.2.jar:/usr/bin/../share/java/kafka/httpmime-4.5.2.jar:/usr/bin/../share/java/kafka/jackson-jaxrs-base-2.9.1.jar:/usr/bin/../share/java/kafka/javax.servlet-api-3.1.0.jar:/usr/bin/../share/java/kafka/jersey-common-2.25.1.jar:/usr/bin/../share/java/kafka/jersey-client-2.25.1.jar:/usr/bin/../share/java/kafka/maven-artifact-3.5.0.jar:/usr/bin/../share/java/kafka/jersey-container-servlet-core-2.25.1.jar:/usr/bin/../share/java/kafka/commons-validator-1.4.1.jar:/usr/bin/../share/java/kafka/jetty-http-9.2.22.v20170606.jar:/usr/bin/../share/java/kafka/connect-runtime-1.0.0-cp1.jar:/usr/bin/../share/java/kafka/javassist-3.20.0-GA.jar:/usr/bin/../share/java/kafka/javax.annotation-api-1.2.jar:/usr/bin/../share/java/kafka/jackson-jaxrs-json-provider-2.9.1.jar:/usr/bin/../share/java/kafka/kafka-streams-examples-1.0.0-cp1.jar:/usr/bin/../share/java/kafka/javax.inject-1.jar:/usr/bin/../share/java/kafka/kafka_2.11-1.0.0-cp1-test.jar:/usr/bin/../share/java/kafka/kafka.jar:/usr/bin/../share/java/kafka/osgi-resource-locator-1.0.1.jar:/usr/bin/../share/java/kafka/hk2-utils-2.5.0-b32.jar:/usr/bin/../share/java/kafka/validation-api-1.1.0.Final.jar:/usr/bin/../share/java/kafka/zookeeper-3.4.10.jar:/usr/bin/../share/java/kafka/support-metrics-common-4.0.0.jar:/usr/bin/../share/java/kafka/javax.ws.rs-api-2.0.1.jar:/usr/bin/../share/java/kafka/hk2-api-2.5.0-b32.jar:/usr/bin/../share/java/kafka/jackson-databind-2.9.1.jar:/usr/bin/../share/java/kafka/kafka-streams-1.0.0-cp1.jar:/usr/bin/../share/java/kafka/connect-transforms-1.0.0-cp1.jar:/usr/bin/../share/java/kafka/jersey-server-2.25.1.jar:/usr/bin/../share/java/kafka/commons-digester-1.8.1.jar:/usr/bin/../share/java/kafka/zkclient-0.10.jar:/usr/bin/../share/java/kafka/commons-lang3-3.5.jar:/usr/bin/../share/java/kafka/commons-lang3-3.1.jar:/usr/bin/../share/java/kafka/kafka-tools-1.0.0-cp1.jar:/usr/bin/../share/java/kafka/kafka-clients-1.0.0-cp1.jar:/usr/bin/../share/java/kafka/javax.inject-2.5.0-b32.jar:/usr/bin/../share/java/kafka/rocksdbjni-5.7.3.jar:/usr/bin/../share/java/kafka/jersey-container-servlet-2.25.1.jar:/usr/bin/../share/java/kafka/connect-json-1.0.0-cp1.jar:/usr/bin/../share/java/kafka/jersey-media-jaxb-2.25.1.jar:/usr/bin/../share/java/kafka/log4j-1.2.17.jar:/usr/bin/../share/java/kafka/xz-1.5.jar:/usr/bin/../share/java/kafka/jetty-io-9.2.22.v20170606.jar:/usr/bin/../share/java/kafka/hk2-locator-2.5.0-b32.jar:/usr/bin/../share/java/kafka/plexus-utils-3.0.24.jar:/usr/bin/../share/java/kafka/jackson-module-jaxb-annotations-2.9.1.jar:/usr/bin/../share/java/kafka/lz4-java-1.4.jar:/usr/bin/../share/java/kafka/snappy-java-1.1.4.jar:/usr/bin/../share/java/kafka/scala-library-2.11.11.jar:/usr/bin/../share/java/kafka/kafka_2.11-1.0.0-cp1.jar:/usr/bin/../share/java/kafka/httpcore-4.4.4.jar:/usr/bin/../share/java/kafka/argparse4j-0.7.0.jar:/usr/bin/../share/java/kafka/jackson-annotations-2.9.1.jar:/usr/bin/../share/java/kafka/support-metrics-client-4.0.0.jar:/usr/bin/../share/java/kafka/kafka_2.11-1.0.0-cp1-sources.jar:/usr/bin/../share/java/kafka/jetty-servlets-9.2.22.v20170606.jar:/usr/bin/../share/java/kafka/connect-file-1.0.0-cp1.jar:/usr/bin/../share/java/kafka/jetty-continuation-9.2.22.v20170606.jar:/usr/bin/../share/java/kafka/jersey-guava-2.25.1.jar:/usr/bin/../share/java/kafka/kafka_2.11-1.0.0-cp1-scaladoc.jar:/usr/bin/../share/java/kafka/jackson-core-2.9.1.jar:/usr/bin/../share/java/kafka/avro-1.8.2.jar:/usr/bin/../share/java/confluent-support-metrics/*:/usr/share/java/confluent-support-metrics/* (org.apache.zookeeper.ZooKeeper)
[2018-03-17 18:44:43,971] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
[2018-03-17 18:44:43,971] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2018-03-17 18:44:43,972] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2018-03-17 18:44:43,972] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2018-03-17 18:44:43,972] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2018-03-17 18:44:43,972] INFO Client environment:os.version=4.10.0-38-generic (org.apache.zookeeper.ZooKeeper)
[2018-03-17 18:44:43,972] INFO Client environment:user.name=? (org.apache.zookeeper.ZooKeeper)
[2018-03-17 18:44:43,972] INFO Client environment:user.home=? (org.apache.zookeeper.ZooKeeper)
[2018-03-17 18:44:43,972] INFO Client environment:user.dir=/ (org.apache.zookeeper.ZooKeeper)
[2018-03-17 18:44:43,972] INFO Initiating client connection, connectString=localhost:10001 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@2f7298b (org.apache.zookeeper.ZooKeeper)
[2018-03-17 18:44:43,981] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient)
[2018-03-17 18:44:43,986] INFO Opening socket connection to server localhost/127.0.0.1:10001. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2018-03-17 18:44:44,043] INFO Socket connection established to localhost/127.0.0.1:10001, initiating session (org.apache.zookeeper.ClientCnxn)
[2018-03-17 18:44:44,051] INFO Session establishment complete on server localhost/127.0.0.1:10001, sessionid = 0x162351f9bfe0001, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2018-03-17 18:44:44,053] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2018-03-17 18:44:44,401] INFO Cluster ID = Lo1wX0fFTBKtmyQR8pMV1A (kafka.server.KafkaServer)
[2018-03-17 18:44:44,412] WARN No meta.properties file under dir /var/lib/kafka/data/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2018-03-17 18:44:44,435] INFO [ThrottledRequestReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2018-03-17 18:44:44,435] INFO [ThrottledRequestReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2018-03-17 18:44:44,437] INFO [ThrottledRequestReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2018-03-17 18:44:44,466] INFO Loading logs. (kafka.log.LogManager)
[2018-03-17 18:44:44,472] INFO Logs loading complete in 6 ms. (kafka.log.LogManager)
[2018-03-17 18:44:44,514] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2018-03-17 18:44:44,515] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2018-03-17 18:44:44,517] INFO Starting the log cleaner (kafka.log.LogCleaner)
[2018-03-17 18:44:44,518] INFO [kafka-log-cleaner-thread-0]: Starting (kafka.log.LogCleaner)
[2018-03-17 18:44:44,733] INFO Awaiting socket connections on 0.0.0.0:20001. (kafka.network.Acceptor)
[2018-03-17 18:44:44,736] INFO [SocketServer brokerId=1] Started 1 acceptor threads (kafka.network.SocketServer)
[2018-03-17 18:44:44,748] INFO [ExpirationReaper-1-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2018-03-17 18:44:44,749] INFO [ExpirationReaper-1-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2018-03-17 18:44:44,750] INFO [ExpirationReaper-1-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2018-03-17 18:44:44,757] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)
[2018-03-17 18:44:44,810] INFO [controller-event-thread]: Starting (kafka.controller.ControllerEventManager$ControllerEventThread)
[2018-03-17 18:44:44,812] INFO [ExpirationReaper-1-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2018-03-17 18:44:44,814] INFO [ExpirationReaper-1-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2018-03-17 18:44:44,815] INFO [ExpirationReaper-1-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2018-03-17 18:44:44,821] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2018-03-17 18:44:44,823] INFO [GroupCoordinator 1]: Starting up. (kafka.coordinator.group.GroupCoordinator)
[2018-03-17 18:44:44,824] INFO [GroupCoordinator 1]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
[2018-03-17 18:44:44,824] INFO [GroupMetadataManager brokerId=1] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2018-03-17 18:44:44,828] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2018-03-17 18:44:44,828] INFO [Controller id=1] 1 successfully elected as the controller (kafka.controller.KafkaController)
[2018-03-17 18:44:44,828] INFO [Controller id=1] Starting become controller state transition (kafka.controller.KafkaController)
[2018-03-17 18:44:44,852] INFO [ProducerId Manager 1]: Acquired new producerId block (brokerId:1,blockStartProducerId:0,blockEndProducerId:999) by writing to Zk with path version 1 (kafka.coordinator.transaction.ProducerIdManager)
[2018-03-17 18:44:44,859] INFO [Controller id=1] Incremented epoch to 1 (kafka.controller.KafkaController)
[2018-03-17 18:44:44,863] DEBUG [Controller id=1] Registering IsrChangeNotificationListener (kafka.controller.KafkaController)
[2018-03-17 18:44:44,871] DEBUG [Controller id=1] Registering logDirEventNotificationListener (kafka.controller.KafkaController)
[2018-03-17 18:44:44,875] INFO [TransactionCoordinator id=1] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2018-03-17 18:44:44,877] INFO [Transaction Marker Channel Manager 1]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2018-03-17 18:44:44,877] INFO [TransactionCoordinator id=1] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2018-03-17 18:44:44,890] INFO [Controller id=1] Partitions being reassigned: Map() (kafka.controller.KafkaController)
[2018-03-17 18:44:44,891] INFO [Controller id=1] Partitions already reassigned: Set() (kafka.controller.KafkaController)
[2018-03-17 18:44:44,892] INFO [Controller id=1] Resuming reassignment of partitions: Map() (kafka.controller.KafkaController)
[2018-03-17 18:44:44,893] INFO [Controller id=1] Currently active brokers in the cluster: Set() (kafka.controller.KafkaController)
[2018-03-17 18:44:44,894] INFO [Controller id=1] Currently shutting brokers in the cluster: Set() (kafka.controller.KafkaController)
[2018-03-17 18:44:44,894] INFO [Controller id=1] Current list of topics in the cluster: Set() (kafka.controller.KafkaController)
[2018-03-17 18:44:44,897] INFO [Controller id=1] List of topics to be deleted:  (kafka.controller.KafkaController)
[2018-03-17 18:44:44,897] INFO [Controller id=1] List of topics ineligible for deletion:  (kafka.controller.KafkaController)
[2018-03-17 18:44:44,910] INFO [ReplicaStateMachine controllerId=1] Started replica state machine with initial state -> Map() (kafka.controller.ReplicaStateMachine)
[2018-03-17 18:44:44,912] INFO [PartitionStateMachine controllerId=1] Started partition state machine with initial state -> Map() (kafka.controller.PartitionStateMachine)
[2018-03-17 18:44:44,912] INFO [Controller id=1] Ready to serve as the new controller with epoch 1 (kafka.controller.KafkaController)
[2018-03-17 18:44:44,915] INFO [Controller id=1] Partitions undergoing preferred replica election:  (kafka.controller.KafkaController)
[2018-03-17 18:44:44,915] INFO [Controller id=1] Partitions that completed preferred replica election:  (kafka.controller.KafkaController)
[2018-03-17 18:44:44,916] INFO [Controller id=1] Skipping preferred replica election for partitions due to topic deletion:  (kafka.controller.KafkaController)
[2018-03-17 18:44:44,916] INFO [Controller id=1] Resuming preferred replica election for partitions:  (kafka.controller.KafkaController)
[2018-03-17 18:44:44,917] INFO Creating /brokers/ids/1 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2018-03-17 18:44:44,917] INFO [Controller id=1] Starting preferred replica leader election for partitions  (kafka.controller.KafkaController)
[2018-03-17 18:44:44,919] INFO [PartitionStateMachine controllerId=1] Invoking state change to OnlinePartition for partitions  (kafka.controller.PartitionStateMachine)
[2018-03-17 18:44:44,933] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2018-03-17 18:44:44,934] INFO [Controller id=1] Starting the controller scheduler (kafka.controller.KafkaController)
[2018-03-17 18:44:44,935] INFO Registered broker 1 at path /brokers/ids/1 with addresses: EndPoint(localhost,20001,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils)
[2018-03-17 18:44:44,937] WARN No meta.properties file under dir /var/lib/kafka/data/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2018-03-17 18:44:44,951] INFO [Controller id=1] Newly added brokers: 1, deleted brokers: , all live brokers: 1 (kafka.controller.KafkaController)
[2018-03-17 18:44:44,952] DEBUG [Channel manager on controller 1]: Controller 1 trying to connect to broker 1 (kafka.controller.ControllerChannelManager)
[2018-03-17 18:44:44,958] INFO [Controller-1-to-broker-1-send-thread]: Starting (kafka.controller.RequestSendThread)
[2018-03-17 18:44:44,959] INFO [Controller id=1] New broker startup callback for 1 (kafka.controller.KafkaController)
[2018-03-17 18:44:44,961] INFO Kafka version : 1.0.0-cp1 (org.apache.kafka.common.utils.AppInfoParser)
[2018-03-17 18:44:44,961] INFO Kafka commitId : ec61c5e93da662df (org.apache.kafka.common.utils.AppInfoParser)
[2018-03-17 18:44:44,962] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
[2018-03-17 18:44:44,967] INFO [Controller-1-to-broker-1-send-thread]: Controller 1 connected to localhost:20001 (id: 1 rack: null) for sending state change requests (kafka.controller.RequestSendThread)
[2018-03-17 18:44:44,992] TRACE [Controller id=1 epoch=1] Received response {error_code=0} for a request sent to broker localhost:20001 (id: 1 rack: null) (state.change.logger)
[2018-03-17 18:44:49,937] TRACE [Controller id=1] Checking need to trigger auto leader balancing (kafka.controller.KafkaController)
[2018-03-17 18:44:49,940] DEBUG [Controller id=1] Preferred replicas by broker Map() (kafka.controller.KafkaController)

The parameter KAFKA_BROKER_ID with a value of 1 specifies the unique identifier associated with a Kafka Broker.

The parameter KAFKA_ZOOKEEPER_CONNECT with a value of localhost:10001 specifies the host:port of the ZooKeeper instance to connect to.

The parameter KAFKA_ADVERTISED_LISTENERS with a value of PLAINTEXT://localhost:20001 specifies the host:port of the Kafka Broker. This is the host:port that Producer(s) and Consumer(s) use to connect to the Kafka Broker.

The parameter KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR with a value of 1 is needed for a single-node Kafka Broker.

Hands-on with Kafka Producer/Consumer using the Command-Line

In order to publish and subscribe few test messages, we need a Kafka Topic. To create a Kafka Topic called MyTestMessages with a single Partition, execute the following docker command:

docker run --rm --net=host confluentinc/cp-kafka:4.0.0-3 kafka-topics --create --topic MyTestMessages --partitions 1 --replication-factor 1 --if-not-exists --zookeeper localhost:10001

The following should be the typical output in the window that is running the Kafka Broker:

Output.8

[2018-03-17 19:18:56,998] INFO [Controller id=1] New topics: [Set(MyTestMessages)], deleted topics: [Set()], new partition replica assignment [Map(MyTestMessages-0 -> Vector(1))] (kafka.controller.KafkaController)
[2018-03-17 19:18:56,998] INFO [Controller id=1] New topic creation callback for MyTestMessages-0 (kafka.controller.KafkaController)
[2018-03-17 19:18:57,008] INFO [Controller id=1] New partition creation callback for MyTestMessages-0 (kafka.controller.KafkaController)
[2018-03-17 19:18:57,008] INFO [PartitionStateMachine controllerId=1] Invoking state change to NewPartition for partitions MyTestMessages-0 (kafka.controller.PartitionStateMachine)
[2018-03-17 19:18:57,009] TRACE [Controller id=1 epoch=1] Changed partition MyTestMessages-0 state from NonExistentPartition to NewPartition with assigned replicas 1 (state.change.logger)
[2018-03-17 19:18:57,011] INFO [ReplicaStateMachine controllerId=1] Invoking state change to NewReplica for replicas [Topic=MyTestMessages,Partition=0,Replica=1] (kafka.controller.ReplicaStateMachine)
[2018-03-17 19:18:57,016] TRACE [Controller id=1 epoch=1] Changed state of replica 1 for partition MyTestMessages-0 from NonExistentReplica to NewReplica (state.change.logger)
[2018-03-17 19:18:57,016] INFO [PartitionStateMachine controllerId=1] Invoking state change to OnlinePartition for partitions MyTestMessages-0 (kafka.controller.PartitionStateMachine)
[2018-03-17 19:18:57,018] DEBUG [PartitionStateMachine controllerId=1] Live assigned replicas for partition MyTestMessages-0 are: [List(1)] (kafka.controller.PartitionStateMachine)
[2018-03-17 19:18:57,021] DEBUG [PartitionStateMachine controllerId=1] Initializing leader and isr for partition MyTestMessages-0 to (Leader:1,ISR:1,LeaderEpoch:0,ControllerEpoch:1) (kafka.controller.PartitionStateMachine)
[2018-03-17 19:18:57,069] TRACE [Controller id=1 epoch=1] Changed partition MyTestMessages-0 from NewPartition to OnlinePartition with leader 1 (state.change.logger)
[2018-03-17 19:18:57,070] TRACE [Controller id=1 epoch=1] Sending become-leader LeaderAndIsr request PartitionState(controllerEpoch=1, leader=1, leaderEpoch=0, isr=1, zkVersion=0, replicas=1, isNew=true) to broker 1 for partition MyTestMessages-0 (state.change.logger)
[2018-03-17 19:18:57,073] TRACE [Controller id=1 epoch=1] Sending UpdateMetadata request PartitionState(controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], offlineReplicas=[]) to brokers Set(1) for partition MyTestMessages-0 (state.change.logger)
[2018-03-17 19:18:57,073] INFO [ReplicaStateMachine controllerId=1] Invoking state change to OnlineReplica for replicas [Topic=MyTestMessages,Partition=0,Replica=1] (kafka.controller.ReplicaStateMachine)
[2018-03-17 19:18:57,074] TRACE [Controller id=1 epoch=1] Changed state of replica 1 for partition MyTestMessages-0 from NewReplica to OnlineReplica (state.change.logger)
[2018-03-17 19:18:57,074] INFO [Controller-1-to-broker-1-send-thread]: Controller 1 connected to localhost:20001 (id: 1 rack: null) for sending state change requests (kafka.controller.RequestSendThread)
[2018-03-17 19:18:57,078] TRACE [Broker id=1] Received LeaderAndIsr request PartitionState(controllerEpoch=1, leader=1, leaderEpoch=0, isr=1, zkVersion=0, replicas=1, isNew=true) correlation id 1 from controller 1 epoch 1 for partition MyTestMessages-0 (state.change.logger)
[2018-03-17 19:18:57,083] TRACE [Broker id=1] Handling LeaderAndIsr request correlationId 1 from controller 1 epoch 1 starting the become-leader transition for partition MyTestMessages-0 (state.change.logger)
[2018-03-17 19:18:57,085] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions MyTestMessages-0 (kafka.server.ReplicaFetcherManager)
[2018-03-17 19:18:57,110] INFO Loading producer state from offset 0 for partition MyTestMessages-0 with message format version 2 (kafka.log.Log)
[2018-03-17 19:18:57,115] INFO Completed load of log MyTestMessages-0 with 1 log segments, log start offset 0 and log end offset 0 in 16 ms (kafka.log.Log)
[2018-03-17 19:18:57,117] INFO Created log for partition [MyTestMessages,0] in /var/lib/kafka/data with properties {compression.type -> producer, message.format.version -> 1.0-IV0, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)
[2018-03-17 19:18:57,117] INFO [Partition MyTestMessages-0 broker=1] No checkpointed highwatermark is found for partition MyTestMessages-0 (kafka.cluster.Partition)
[2018-03-17 19:18:57,119] INFO Replica loaded for partition MyTestMessages-0 with initial high watermark 0 (kafka.cluster.Replica)
[2018-03-17 19:18:57,120] INFO [Partition MyTestMessages-0 broker=1] MyTestMessages-0 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
[2018-03-17 19:18:57,125] TRACE [Broker id=1] Stopped fetchers as part of become-leader request from controller 1 epoch 1 with correlation id 1 for partition MyTestMessages-0 (last update controller epoch 1) (state.change.logger)
[2018-03-17 19:18:57,126] TRACE [Broker id=1] Completed LeaderAndIsr request correlationId 1 from controller 1 epoch 1 for the become-leader transition for partition MyTestMessages-0 (state.change.logger)
[2018-03-17 19:18:57,130] TRACE [Controller id=1 epoch=1] Received response {error_code=0,partitions=[{topic=MyTestMessages,partition=0,error_code=0}]} for a request sent to broker localhost:20001 (id: 1 rack: null) (state.change.logger)
[2018-03-17 19:18:57,135] TRACE [Broker id=1] Cached leader info PartitionState(controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], offlineReplicas=[]) for partition MyTestMessages-0 in response to UpdateMetadata request sent by controller 1 epoch 1 with correlation id 2 (state.change.logger)
[2018-03-17 19:18:57,136] TRACE [Controller id=1 epoch=1] Received response {error_code=0} for a request sent to broker localhost:20001 (id: 1 rack: null) (state.change.logger)
[2018-03-17 19:19:49,947] TRACE [Controller id=1] Checking need to trigger auto leader balancing (kafka.controller.KafkaController)
[2018-03-17 19:19:49,948] DEBUG [Controller id=1] Preferred replicas by broker Map(1 -> Map(MyTestMessages-0 -> Vector(1))) (kafka.controller.KafkaController)
[2018-03-17 19:19:49,950] DEBUG [Controller id=1] Topics not in preferred replica Map() (kafka.controller.KafkaController)
[2018-03-17 19:19:49,951] TRACE [Controller id=1] Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController)

To list all the currently defined Kafka Topic(s) on the Kafka Broker instance, execute the following docker command:

docker run --rm --net=host confluentinc/cp-kafka:4.0.0-3 kafka-topics --list --zookeeper localhost:10001

The following should be the typical output:

Output.9

MyTestMessages

To get details on the Kafka Topic called MyTestMessages from the running Kafka Broker instance, execute the following docker command:

docker run --rm --net=host confluentinc/cp-kafka:4.0.0-3 kafka-topics --describe --topic MyTestMessages --zookeeper localhost:10001

The following should be the typical output:

Output.10

Topic:MyTestMessages PartitionCount:1  ReplicationFactor:1 Configs:
  Topic: MyTestMessages Partition: 0  Leader: 1 Replicas: 1 Isr: 1

To publish some test messages to the Kafka Topic called MyTestMessages, execute the following docker command:

docker run -it --rm --net=host confluentinc/cp-kafka:4.0.0-3 kafka-console-producer --request-required-acks 1 --broker-list localhost:20001 --topic MyTestMessages

The above docker command will wait for input on the command-line with the prompt '>'. One can enter the following three test messages. To exit, press CTRL-C.

>Test Message - 1

>Test Message - 2

>Test Message - 3

To subscribe to the test messages from the Kafka Topic called MyTestMessages, execute the following docker command:

docker run -it --rm --net=host confluentinc/cp-kafka:4.0.0-3 kafka-console-consumer --bootstrap-server localhost:20001 --topic MyTestMessages --group mycmdtest --from-beginning

The option --from-beginning indicates we want to receive the messages since the beginning (from offset 0). The following should be the typical output:

Output.11

Test Message - 1
Test Message - 2
Test Message - 3

The above docker command will wait indefinitely for more future message(s). To exit, press CTRL-C.

To display the current message offset for the above exited Consumer (belonging to the group mycmdtest), execute the following docker command:

docker run --rm --net=host confluentinc/cp-kafka:4.0.0-3 kafka-consumer-groups --bootstrap-server localhost:20001 --describe --group mycmdtest

The following should be the typical output:

Output.12

Note: This will not show information about old Zookeeper-based consumers.

Consumer group 'mycmdtest' has no active members.

TOPIC            PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID  HOST  CLIENT-ID
MyTestMessages   0          3               3               0    -            -     -

Now, let us create another Kafka Topic called MyPartitionedMsgs with 2 Partitions by execute the following docker command:

docker run --rm --net=host confluentinc/cp-kafka:4.0.0-3 kafka-topics --create --topic MyPartitionedMsgs --partitions 2 --replication-factor 1 --if-not-exists --zookeeper localhost:10001

Next, we will publish 5 test messages to the Kafka Topic called MyPartitionedMsgs by execute the following docker command:

docker run -it --rm --net=host confluentinc/cp-kafka:4.0.0-3 kafka-console-producer --request-required-acks 1 --broker-list localhost:20001 --topic MyPartitionedMsgs

The above docker command will wait for input on the command-line with the prompt '>'. One can enter the following five test messages and then press CTRL-C to exit.

>Partitioned Message - 1

>Partitioned Message - 2

>Partitioned Message - 3

>Partitioned Message - 4

>Partitioned Message - 5

Since no keys were specified for the 5 test messages, the messages will be distributed (using round-robin) between the 2 Partitions of the Kafka Topic called MyPartitionedMsgs.

Next, we will start two instances of subscribers to consume messages from the 2 Partitions of the Kafka Topic called MyPartitionedMsgs by executing the following docker command:

docker run -it --rm --net=host confluentinc/cp-kafka:4.0.0-3 kafka-console-consumer --bootstrap-server localhost:20001 --topic MyPartitionedMsgs --group myparttest --from-beginning

The following should be the typical output from the first Consumer:

Output.13

Partitioned Message - 2
Partitioned Message - 4

And, the following should be the typical output from the second Consumer:

Output.14

Partitioned Message - 1
Partitioned Message - 3
Partitioned Message - 5

To display the current message offsets for the above two Consumers (belonging to the group myparttest), execute the following docker command:

docker run --rm --net=host confluentinc/cp-kafka:4.0.0-3 kafka-consumer-groups --bootstrap-server localhost:20001 --describe --group myparttest

The following should be the typical output:

Output.15

Note: This will not show information about old Zookeeper-based consumers.


TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
MyPartitionedMsgs              0          2               2               0          consumer-1-8c3831de-0c78-42eb-a0e7-1fd95105ee13   /127.0.0.1                     consumer-1
MyPartitionedMsgs              1          3               3               0          consumer-1-af2ba99b-fa52-4d00-92b3-b6daad1eb303   /127.0.0.1

Starting a third Consumer would not have helped (and would not have consumed messages) as there are only 2 Partitions for the Kafka Topic called MyPartitionedMsgs.

In order to publish messages to the Kafka Topic called MyTestMessages with their associated keys, execute the following docker command:

docker run -it --rm --net=host confluentinc/cp-kafka:4.0.0-3 kafka-console-producer --request-required-acks 1 --broker-list localhost:20001 --property "parse.key=true" --property "key.separator=:" --topic MyTestMessages

The above docker command will wait for input on the command-line with the prompt '>'. One can enter the following test messages (preceded by their key and a colon) and then press CTRL-C to exit.

>1:Keyed Test Message - 1

>2:Keyed Test Message - 2

>3:Keyed Test Message - 3

References

Confluent Apache Kafka

Introduction to Docker

Exploring Apache ZooKeeper :: Part-1

Exploring Apache ZooKeeper :: Part-2

Exploring Apache ZooKeeper :: Part-3