Exploring Apache Kafka - Part 2


Bhaskar S 03/24/2018


Overview

In Part-1 of this series, we setup Apache Kafka using Docker and demonstrated the messaging semantics using the provided command-line tools for the Producer and the Consumer.

In this article, we will leverage the Java Producer API and the Java Consumer API to implement a Producer and a Consumer to demonstrate the messaging semantics.

Setup

Ensure that the directories logs, kafka/data, and zk/data under /home/alice are all cleaned up for a fresh start.

Create the directories called classes, lib, and src under /home/alice by executing the following command:

mkdir -p classes lib src

Since we will be implementing the code for the Producer and the Consumer in Java (for demonstration purposes), we need to download some dependent JARs. Download and copy the following JAR files to the directory /home/alice/lib:

Before continuing further, ensure that the ZooKeeper and the Kafka Broker instances are up and running.

Hands-on with Kafka Producer/Consumer using Java

The following are the contents of the properties file called kafka_producer.properties:

kafka_producer.properties
#
# Kafka producer properties
#

bootstrap.servers = localhost:20001
key.serializer = org.apache.kafka.common.serialization.StringSerializer
value.serializer = org.apache.kafka.common.serialization.StringSerializer
acks = 1
producer.topic = MyTestMessages

The following Java class called MyKafkaProducerApp implements our Producer:

MyKafkaProducerApp.java
/*
 * Name:        MyKafkaProducerApp
 * 
 * Description: A simple Kafka message producer that publishes messages to a specified topic
 * 
 */

package com.polarsparc.Kafka;

import java.io.File;
import java.util.Properties;

import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.builder.fluent.Configurations;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Kafka Producer
 *
 */
public class MyKafkaProducerApp 
{
    public static void main( String[] args )
    {
        if (args.length != 2) {
            System.out.printf("Usage: java %s <count> <prefix>\n", MyKafkaProducerApp.class.getName());
            System.exit(1);
        }
        
        final String PROPERTIES_FILE = "kafka_producer.properties";
        
        Logger logger = LoggerFactory.getLogger(MyKafkaProducerApp.class);
        
        /* Determine the count of message to publish to Kafka */
        int count = 0;
        try {
            count = Integer.parseInt(args[0]);
            if (count <= 0) {
                count = 10;
            }
        }
        catch (Exception ex) {
            logger.warn("Invalid <count> value - using default 10");
            count = 10;
        }
        
        logger.info("Message count = " + count);
        
        Configuration config = null;
        
        Configurations configs = new Configurations();
        try {
            config = configs.properties(new File(PROPERTIES_FILE));
        }
        catch (Exception ex) {
            logger.error("Error loading " + PROPERTIES_FILE, ex);
            System.exit(1);
        }
        
        /* Initialize the properties from kafka_producer.properties */
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                config.getString(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                config.getString(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                config.getString(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
        props.put(ProducerConfig.ACKS_CONFIG,
                config.getString(ProducerConfig.ACKS_CONFIG));
        
        /* Create an instance of a kafka producer */
        Producer<String, String> producer = new KafkaProducer<>(props);
        
        for (int i = 0; i < count; i++) {
            /* Create an instance of ProducerRecord using the Topic name, the key, and the message content */
            ProducerRecord<String, String> record = new ProducerRecord<>(config.getString("producer.topic"), 
                    String.valueOf(i), args[1] + " - " + (i+1));
            
            /* Send the data record to kafka */
            try {
                producer.send(record).get();
            }
            catch (Exception ex) {
                logger.error("Error sending message ["+i+"]", ex);
            }
        }
        
        /* Close the kafka producer */
        producer.close();
    }
}

Let us explain and understand some of the classes/methods used in the MyKafkaProducerApp code shown above.

The class org.apache.commons.configuration2.builder.fluent.Configurations is an utility class used for creating configuration objects. It simplifies reading of configuration data from different sources. In our example, we are accessing a properties file for creating a configuration object.

The interface org.apache.commons.configuration2.Configuration provides a common abstraction for accessing and manipulating the configuration object.

The interface org.apache.kafka.clients.producer.Producer provides a common abstraction for a Kafka producer.

The class org.apache.kafka.clients.producer.ProducerConfig encapsulates the various Kafka producer configuration options as java.lang.String constants.

The property ProducerConfig.BOOTSTRAP_SERVERS_CONFIG allows one to specify a comma-separated list of Kafka brokers as host:port pairs. In our case, we have one Kafka broker at localhost:20001.

The property ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG allows one to specify the serializer class for the key associated with the published message. In our case, we are using a java.lang.String based key and hence use org.apache.kafka.common.serialization.StringSerializer.

The property ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG allows one to specify the serializer class for the data value associated with the published message. In our case, we are using a java.lang.String based data value and hence use org.apache.kafka.common.serialization.StringSerializer.

The property ProducerConfig.ACKS_CONFIG allows one to control when a sent message is considered published. A value of 1 indicates acknowledgement from one Kafka broker.

The class org.apache.kafka.clients.producer.ProducerRecord represents a data record that is sent to the Kafka broker for persistence and distribution. It consists of a topic name, an associated key, and a data value. In our article, this is what is referred to as the message published by a Kafka producer.

The class org.apache.kafka.clients.producer.KafkaProducer is the Kafka publishing client for sending data records (also referred to as messages) to the Kafka broker. The send() method is asynchronous and returns immediately with a java.util.concurrent.Future object. The get() method on the Future object will block till the send request completes.

To run our simple Kafka message producer to publish 10 messages, execute the java command as shown below:

java -cp ./classes:./lib/commons-beanutils-1.9.3.jar:./lib/commons-configuration2-2.2.jar:./lib/commons-lang3-3.6.jar:./lib/commons-logging-1.2.jar:./lib/kafka-clients-1.0.0.jar:./lib/slf4j-api-1.7.25.jar:./lib/slf4j-simple-1.7.25.jar com.polarsparc.Kafka.MyKafkaProducerApp 10 "Java Producer Msgs"

The following should be the typical output:

Output.1

[main] INFO com.polarsparc.Kafka.MyKafkaProducerApp - Message count = 10
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 1.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : aaa7af6d4a11b29d
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.

To verify the test messages have been published to the Kafka Topic called MyTestMessages, execute the following docker command:

docker run -it --rm --net=host confluentinc/cp-kafka:4.0.0-3 kafka-console-consumer --bootstrap-server localhost:20001 --topic MyTestMessages --group mycmdtest --from-beginning

The following should be the typical output:

Output.2

Java Producer Msgs - 1
Java Producer Msgs - 2
Java Producer Msgs - 3
Java Producer Msgs - 4
Java Producer Msgs - 5
Java Producer Msgs - 6
Java Producer Msgs - 7
Java Producer Msgs - 8
Java Producer Msgs - 9
Java Producer Msgs - 10

The following are the contents of the properties file called kafka_consumer.properties:

kafka_consumer.properties
#
# Kafka consumer properties
#

bootstrap.servers = localhost:20001
key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
value.deserializer = org.apache.kafka.common.serialization.StringDeserializer
consumer.topic = MyTestMessages
group.id = mytestgroup
auto.offset.reset = earliest
enable.auto.commit = true
consumer.start.beginning = true

The following Java class called MyKafkaConsumerApp implements our Consumer:

MyKafkaConsumerApp.java
/*
 * Name:        MyKafkaConsumerApp
 * 
 * Description: A simple Kafka message consumer that subscribes messages from a specified topic
 * 
 */

package com.polarsparc.Kafka;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.builder.fluent.Configurations;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Kafka Consumer
 *
 */
public class MyKafkaConsumerApp {
    public static void main(String[] args) {
        final String PROPERTIES_FILE = "kafka_consumer.properties";
        
        Logger logger = LoggerFactory.getLogger(MyKafkaConsumerApp.class);
        
        Configuration config = null;
        
        Configurations configs = new Configurations();
        try {
            config = configs.properties(new File(PROPERTIES_FILE));
        }
        catch (Exception ex) {
            logger.error("Error loading " + PROPERTIES_FILE, ex);
            System.exit(1);
        }
        
        /* Initialize the properties from kafka_consumer.properties */
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                config.getString(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                config.getString(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                config.getString(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
        props.put(ConsumerConfig.GROUP_ID_CONFIG,
                config.getString(ConsumerConfig.GROUP_ID_CONFIG));
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
                config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
        
        /* Create an instance of a kafka consumer */
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        
        /* Initialize the list of topic(s) to subscribe to */
        List<String> topics = new ArrayList<>();
        topics.add(config.getString("consumer.topic"));
        
        /* Subscribe to the desired topics */
        consumer.subscribe(topics);
        
        try {
            for (int cnt = 0; cnt < 10; cnt++) {
                /* Poll for a set of ConsumerRecord instances */
                ConsumerRecords<String, String> records = consumer.poll(1000);
                
                /* Display the messages */
                records.forEach(rec -> {
                    logger.info("Partition: " + rec.partition() + ", Offset: " + rec.offset() + ", Value: " + rec.value());
                });
            }
        }
        catch (Exception ex) {
            logger.error("Error receiving message(s)", ex);
        }
        
        consumer.close();
    }
}

Let us explain and understand some of the classes/methods used in the MyKafkaConsumerApp code shown above.

The interface org.apache.kafka.clients.consumer.Consumer provides a common abstraction for a Kafka consumer.

The class org.apache.kafka.clients.consumer.ConsumerConfig encapsulates the various Kafka consumer configuration options as java.lang.String constants.

The property ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG allows one to specify a comma-separated list of Kafka brokers as host:port pairs. In our case, we have one Kafka broker at localhost:20001.

The property ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG allows one to specify the deserializer class for the key associated with the messages to be consumed. In our case, we are using a java.lang.String based key and hence use org.apache.kafka.common.serialization.StringDeserializer.

The property ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG allows one to specify the deserializer class for the data value associated with the messages to be consumed. In our case, we are using a java.lang.String based data value and hence use org.apache.kafka.common.serialization.StringDeserializer.

The property ConsumerConfig.GROUP_ID_CONFIG allows one to specify the consumer group name this Kafka consumer belongs to. In our case, we are using the name mytestgroup.

The property ConsumerConfig.AUTO_OFFSET_RESET_CONFIG allows one to specify the action to take when the Kafka consumer is started for the very first time (there is no committed offset(s) for the consumer). In our case, we have specified a value of earliest indicating we want to consume messages from the beginning.

The class org.apache.kafka.clients.consumer.ConsumerRecords represents a collection of data records received from the Kafka broker for every poll by the Kafka consumer.

The class org.apache.kafka.clients.consumer.KafkaConsumer is the Kafka subscribing client for receiving data records (also referred to as messages) from the Kafka broker. The poll(interval) method fetches data records from the Kafka partition(s) of the specified topic(s) in batches. If there are no more data records, it will timeout in the specified interval. By default, the offset(s) for the partition(s) of the specified topic(s) are automatically updated in the Kafka broker when the poll(interval) method returns.

To run our simple Kafka message consumer, execute the java command as shown below:

java -cp ./classes:./lib/commons-beanutils-1.9.3.jar:./lib/commons-configuration2-2.2.jar:./lib/commons-lang3-3.6.jar:./lib/commons-logging-1.2.jar:./lib/kafka-clients-1.0.0.jar:./lib/slf4j-api-1.7.25.jar:./lib/slf4j-simple-1.7.25.jar com.polarsparc.Kafka.MyKafkaConsumerApp

The following should be the typical output:

Output.3

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 1.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : aaa7af6d4a11b29d
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=mytestgroup] Discovered coordinator localhost:20001 (id: 2147483646 rack: null)
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=mytestgroup] Revoking previously assigned partitions []
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=mytestgroup] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=mytestgroup] Successfully joined group with generation 1
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=mytestgroup] Setting newly assigned partitions [MyTestMessages-0]
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp - Partition: 0, Offset: 0, Value: Java Producer Msgs - 1
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp - Partition: 0, Offset: 1, Value: Java Producer Msgs - 2
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp - Partition: 0, Offset: 2, Value: Java Producer Msgs - 3
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp - Partition: 0, Offset: 3, Value: Java Producer Msgs - 4
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp - Partition: 0, Offset: 4, Value: Java Producer Msgs - 5
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp - Partition: 0, Offset: 5, Value: Java Producer Msgs - 6
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp - Partition: 0, Offset: 6, Value: Java Producer Msgs - 7
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp - Partition: 0, Offset: 7, Value: Java Producer Msgs - 8
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp - Partition: 0, Offset: 8, Value: Java Producer Msgs - 9
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp - Partition: 0, Offset: 9, Value: Java Producer Msgs - 10

If we re-run our simple Kafka message consumer again, we will not see any messages (not even the ones from the very beginning). This is because the current committed offset for our simple Kafka consumer is at 10 from the previous run.

To verify the current offset for our consumer group called mytestgroup, execute the following docker command:

docker run --rm --net=host confluentinc/cp-kafka:4.0.0-3 kafka-consumer-groups --bootstrap-server localhost:20001 --describe --group mytestgroup

The following should be the typical output:

Output.4

Note: This will not show information about old Zookeeper-based consumers.

Consumer group 'mytestgroup' has no active members.

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
MyTestMessages                 0          10              10              0          -                                                 -                              -

In order to support the ability to subscribe to all the messages (from the very beginning), we will need to add logic to reset the Kafka topic partition(s) offset to the beginning (offset 0).

The following Java class called MyKafkaConsumerApp2 implements our Consumer with the ability to subscribe to messages from the beginning of Kafka topic(s):

MyKafkaConsumerApp2.java
/*
 * Name:        MyKafkaConsumerApp2
 * 
 * Description: A simple Kafka message consumer that subscribes messages from a specified topic
 *              with the ability to rewind offset(s)
 * 
 */

package com.polarsparc.Kafka;

import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;

import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.builder.fluent.Configurations;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Kafka Consumer
 *
 */
public class MyKafkaConsumerApp2 {
    public static void main(String[] args) {
        final String PROPERTIES_FILE = "kafka_consumer.properties";
        
        Logger logger = LoggerFactory.getLogger(MyKafkaConsumerApp2.class);
        
        Configuration config = null;
        
        Configurations configs = new Configurations();
        try {
            config = configs.properties(new File(PROPERTIES_FILE));
        }
        catch (Exception ex) {
            logger.error("Error loading " + PROPERTIES_FILE, ex);
            System.exit(1);
        }
        
        /* Determine if start from beginning */
        boolean beginning = false;
        try {
            beginning = Boolean.parseBoolean(config.getString("consumer.start.beginning"));
        }
        catch (Exception ex) {
            logger.warn("Invalid <consumer.start.beginning> value - using default false");
            beginning = false;
        }
        
        logger.info("Consumer start from beginning = " + beginning);
        
        /* Initialize the properties from kafka_consumer.properties */
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                config.getString(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                config.getString(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                config.getString(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
        props.put(ConsumerConfig.GROUP_ID_CONFIG,
                config.getString(ConsumerConfig.GROUP_ID_CONFIG));
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
                config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
        
        Set<Integer> partitionSet = new HashSet<>();
        
        /* Create an instance of a kafka consumer */
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        
        /* Initialize the list of topic(s) to subscribe to */
        List<String> topics = new ArrayList<>();
        topics.add(config.getString("consumer.topic"));
        
        logger.info("Subscribed topics: " + topics);
        
        /* Subscribe to the desired topics with callback */
        consumer.subscribe(topics, new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                partitions.forEach(par -> {
                    logger.info("REVOKED -> Topic: " + par.topic() + ", Partition: " + par.partition());
                    
                    partitionSet.remove(par.partition());
                });
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                partitions.forEach(par -> {
                    logger.info("ASSIGNED -> Topic: " + par.topic() + ", Partition: " + par.partition());
                    
                    partitionSet.add(par.partition());
                });
            }
        });
        
        try {
            boolean once = true;
            
            for (int cnt = 0; cnt < 10; cnt++) {
                /* Poll for a set of ConsumerRecord instances */
                ConsumerRecords<String, String> records = consumer.poll(1000);
                
                if (beginning) {
                    if (once) {
                        String topic = config.getString("consumer.topic");
                        
                        partitionSet.forEach(par -> {
                            /* Set the topic partition offset to beginning */
                            consumer.seek(new TopicPartition(topic, par), 0);
                        });
                        
                        once = false;
                    }
                }
                
                /* Display the messages */
                records.forEach(rec -> {
                    logger.info("Partition: " + rec.partition() + ", Offset: " + rec.offset() + ", Value: " + rec.value());
                });
            }
        }
        catch (Exception ex) {
            logger.error("Error receiving message(s)", ex);
        }
        
        consumer.close();
    }
}

Let us explain and understand some of the classes/methods used in the MyKafkaConsumerApp2 code shown above.

The class org.apache.kafka.common.TopicPartition is a holder that wraps the Kafka topic name and a Kafka topic partition number.

The interface org.apache.kafka.clients.consumer.ConsumerRebalanceListener provides a callback mechanism for the Kafka broker to notify the Kafka consumer of the assignments and revocations of the various Kafka topic(s) and their partition(s). Since Kafka is managing the consumer group membership, Kafka topic partition assignment/revocation can be triggered if the consumers in a group change (new consumers added or existing consumers exit).

The seek(TopicPartition, offset) method allows one to manually reset the offset corresponding to a Kafka topic and partition.

To run our simple Kafka message consumer (with the ability to start from the beginning), execute the java command as shown below:

java -cp ./classes:./lib/commons-beanutils-1.9.3.jar:./lib/commons-configuration2-2.2.jar:./lib/commons-lang3-3.6.jar:./lib/commons-logging-1.2.jar:./lib/kafka-clients-1.0.0.jar:./lib/slf4j-api-1.7.25.jar:./lib/slf4j-simple-1.7.25.jar com.polarsparc.Kafka.MyKafkaConsumerApp2

The following should be the typical output:

Output.5

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 1.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : aaa7af6d4a11b29d
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp2 - Subscribed topics: [MyTestMessages]
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=mytestgroup] Discovered coordinator localhost:20001 (id: 2147483646 rack: null)
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=mytestgroup] Revoking previously assigned partitions []
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=mytestgroup] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=mytestgroup] Successfully joined group with generation 5
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=mytestgroup] Setting newly assigned partitions [MyTestMessages-0]
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp2 - ASSIGNED -> Topic: MyTestMessages, Partition: 0
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp2 - Partition: 0, Offset: 0, Value: Java Producer Msgs - 1
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp2 - Partition: 0, Offset: 1, Value: Java Producer Msgs - 2
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp2 - Partition: 0, Offset: 2, Value: Java Producer Msgs - 3
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp2 - Partition: 0, Offset: 3, Value: Java Producer Msgs - 4
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp2 - Partition: 0, Offset: 4, Value: Java Producer Msgs - 5
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp2 - Partition: 0, Offset: 5, Value: Java Producer Msgs - 6
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp2 - Partition: 0, Offset: 6, Value: Java Producer Msgs - 7
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp2 - Partition: 0, Offset: 7, Value: Java Producer Msgs - 8
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp2 - Partition: 0, Offset: 8, Value: Java Producer Msgs - 9
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp2 - Partition: 0, Offset: 9, Value: Java Producer Msgs - 10

As indicated earlier, by default, Kafka implicitly commits offsets for consumers once the poll() method completes. In some use-cases, it may be desirable for the consumer application to explicitly commit offsets after successful processing of data records.

The following Java class called MyKafkaConsumerApp3 implements our Consumer with the ability to explicitly commit message offsets:

MyKafkaConsumerApp3.java
/*
 * Name:        MyKafkaConsumerApp3
 * 
 * Description: A simple Kafka message consumer that subscribes messages from a specified topic
 *              with the ability to explicitly commit offset(s)
 * 
 */

package com.polarsparc.Kafka;

import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;

import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.builder.fluent.Configurations;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Kafka Consumer
 *
 */
public class MyKafkaConsumerApp3 {
    public static void main(String[] args) {
        final String PROPERTIES_FILE = "kafka_consumer.properties";
        
        Logger logger = LoggerFactory.getLogger(MyKafkaConsumerApp3.class);
        
        Configuration config = null;
        
        Configurations configs = new Configurations();
        try {
            config = configs.properties(new File(PROPERTIES_FILE));
        }
        catch (Exception ex) {
            logger.error("Error loading " + PROPERTIES_FILE, ex);
            System.exit(1);
        }
        
        /* Determine if start from beginning */
        boolean beginning = false;
        try {
            beginning = Boolean.parseBoolean(config.getString("consumer.start.beginning"));
        }
        catch (Exception ex) {
            logger.warn("Invalid <consumer.start.beginning> value - using default false");
            beginning = false;
        }
        
        logger.info("Consumer start from beginning = " + beginning);
        
        /* Initialize the properties from kafka_consumer.properties */
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                config.getString(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                config.getString(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                config.getString(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
        props.put(ConsumerConfig.GROUP_ID_CONFIG,
                config.getString(ConsumerConfig.GROUP_ID_CONFIG));
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
                config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
                config.getString(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
        
        Set<Integer> partitionSet = new HashSet<>();
        
        /* Create an instance of a kafka consumer */
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        
        /* Initialize the list of topic(s) to subscribe to */
        List<String> topics = new ArrayList<>();
        topics.add(config.getString("consumer.topic"));
        
        logger.info("Subscribed topics: " + topics);
        
        /* Subscribe to the desired topics with callback */
        consumer.subscribe(topics, new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                partitions.forEach(par -> {
                    logger.info("REVOKED -> Topic: " + par.topic() + ", Partition: " + par.partition());
                    
                    partitionSet.remove(par.partition());
                });
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                partitions.forEach(par -> {
                    logger.info("ASSIGNED -> Topic: " + par.topic() + ", Partition: " + par.partition());
                    
                    partitionSet.add(par.partition());
                });
            }
        });
        
        try {
            boolean once = true;
            
            for (int cnt = 0; cnt < 10; cnt++) {
                /* Poll for a set of ConsumerRecord instances */
                ConsumerRecords<String, String> records = consumer.poll(1000);
                
                if (beginning) {
                    if (once) {
                        String topic = config.getString("consumer.topic");
                        
                        partitionSet.forEach(par -> {
                            /* Set the topic partition offset to beginning */
                            consumer.seek(new TopicPartition(topic, par), 0);
                        });
                        
                        once = false;
                    }
                }
                
                /* Display the messages */
                records.forEach(rec -> {
                    logger.info("Partition: " + rec.partition() + ", Offset: " + rec.offset() + ", Value: " + rec.value());
                });
                
                /* Explicitly commit offset(s) to Kafka - Synchronous blocking call */
                consumer.commitSync();
            }
        }
        catch (Exception ex) {
            logger.error("Error receiving message(s)", ex);
        }
        
        consumer.close();
    }
}

Let us explain and understand some of the classes/methods used in the MyKafkaConsumerApp3 code shown above.

Setting the property ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false allows one to turn-off the default behavior of Kafka, which implicitly commits offsets on behalf of consumers, after a successful invocation of the poll() method.

The commitSync() method allows one to explicitly commit the offset(s) corresponding to Kafka topic(s) and partition(s). This is a synchronous blocking call.

Run the simple Kafka message producer to publish 10 more messages by executing the java command as shown below:

java -cp ./classes:./lib/commons-beanutils-1.9.3.jar:./lib/commons-configuration2-2.2.jar:./lib/commons-lang3-3.6.jar:./lib/commons-logging-1.2.jar:./lib/kafka-clients-1.0.0.jar:./lib/slf4j-api-1.7.25.jar:./lib/slf4j-simple-1.7.25.jar com.polarsparc.Kafka.MyKafkaProducerApp 10 "Java Producer Msgs (2nd)"

Modify the value of the property enable.auto.commit from true to false in the properties file kafka_consumer.properties.

Now run our simple Kafka message consumer (with the ability to explicitly commit offsets) by executing the java command as shown below:

java -cp ./classes:./lib/commons-beanutils-1.9.3.jar:./lib/commons-configuration2-2.2.jar:./lib/commons-lang3-3.6.jar:./lib/commons-logging-1.2.jar:./lib/kafka-clients-1.0.0.jar:./lib/slf4j-api-1.7.25.jar:./lib/slf4j-simple-1.7.25.jar com.polarsparc.Kafka.MyKafkaConsumerApp3

The following should be the typical output:

Output.6

[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp3 - Consumer start from beginning = false
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 1.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : aaa7af6d4a11b29d
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp3 - Subscribed topics: [MyTestMessages]
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=mytestgroup] Discovered coordinator localhost:20001 (id: 2147483646 rack: null)
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=mytestgroup] Revoking previously assigned partitions []
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=mytestgroup] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=mytestgroup] Successfully joined group with generation 11
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=mytestgroup] Setting newly assigned partitions [MyTestMessages-0]
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp3 - ASSIGNED -> Topic: MyTestMessages, Partition: 0
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp3 - Partition: 0, Offset: 10, Value: Java Producer Msgs (2nd) - 1
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp3 - Partition: 0, Offset: 11, Value: Java Producer Msgs (2nd) - 2
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp3 - Partition: 0, Offset: 12, Value: Java Producer Msgs (2nd) - 3
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp3 - Partition: 0, Offset: 13, Value: Java Producer Msgs (2nd) - 4
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp3 - Partition: 0, Offset: 14, Value: Java Producer Msgs (2nd) - 5
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp3 - Partition: 0, Offset: 15, Value: Java Producer Msgs (2nd) - 6
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp3 - Partition: 0, Offset: 16, Value: Java Producer Msgs (2nd) - 7
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp3 - Partition: 0, Offset: 17, Value: Java Producer Msgs (2nd) - 8
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp3 - Partition: 0, Offset: 18, Value: Java Producer Msgs (2nd) - 9
[main] INFO com.polarsparc.Kafka.MyKafkaConsumerApp3 - Partition: 0, Offset: 19, Value: Java Producer Msgs (2nd) - 10

References

Exploring Apache Kafka - Part 1

Confluent Apache Kafka

Introduction to Docker

Exploring Apache ZooKeeper :: Part-1

Exploring Apache ZooKeeper :: Part-2

Exploring Apache ZooKeeper :: Part-3