PolarSPARC

Exploring Kafka Streams :: Part 2


Bhaskar S 11/21/2021


Overview

In Part 1, we setup the Kafka Streams environment and dipped our toes to explore two simple examples in Java.

In this part of the series, we will start to dig in a little deeper into Kafka Streams.

Kafka Streams Concepts

Stream Tasks and Threads

A Kafka Streams topology is nothing more than a directed acyclic graph (DAG) with a Source processor, a set of Stream processors (that perform the various data transformations), and a terminal Sink processor.

Each data event consumed from a Kafka topic partition is processed by a Stream Task. Each stream task is actually executed by a Stream Thread.

A stream task is the smallest atomic unit of work in a Kafka Streams application that processes the topology. The number of partitions in a Kafka topic controls the number of stream tasks. For example, if a Kafka topic has 2 partitions, the Kafka Streams application will have 2 stream tasks.

The following illustration depicts the concept of partitions, tasks, and threads in a Kafka Streams application:


Tasks and Threads
Figure.1

In other words, the maximum parallelism in a Kafka Streams application depends on the number of partitions in the source topic(s) the source processor is consuming from. The number partitions in the source topic(s) in-turn determines the number of stream tasks.

The following are 3 ways to maximize throughput of a Kafka Streams application consuming data events from N partitions:

Hands-on with Kafka Streams

We will update the Java utility class KafkaConsumerConfig from the Common module located in the directory $HOME/java/KafkaStreams/Common to add a new convenience method as shown below:


Listing.1
/*
 * Name:   Kafka Consumer Configuration
 * Author: Bhaskar S
 * Date:   11/10/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.kstreams;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;

import java.util.Properties;

public final class KafkaConsumerConfig {
    public static Properties kafkaConfigurationOne(String appId) {
        Properties config = new Properties();

        config.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:20001");
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        return config;
    }

    public static Properties kafkaConfigurationTwo(String appId, int numThr) {
        Properties config = kafkaConfigurationOne(appId);

        config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThr);

        return config;
    }

    private KafkaConsumerConfig() {}
}

The property StreamsConfig.NUM_STREAM_THREADS_CONFIG allows one to configure the number of threads used for processing the data events from the different partitions of the input Kafka topic.

Since we modified the Common module, we need to once again compile and deploy the Common module so that the other Java modules can use it. To do that, open a terminal window and run the following commands:

$ $HOME/java/KafkaStreams/Common

$ mvn clean install

Second Application

In the Second module, we will demonstrate a STATELESS Kafka Streams application to consume data events from the multi-partition Kafka topic coffee-flavors to demonstrate the concept of stream tasks and stream threads. Each event will be in the form user:flavor, where 'user' is the key and 'flavor' is the value.

To setup the Java directory structure for the Second application, execute the following commands:

$ cd $HOME/java/KafkaStreams

$ mkdir -p $HOME/java/KafkaStreams/Second

$ mkdir -p Second/src/main/java Second/src/main/resources Second/target

$ mkdir -p Second/src/main/java/com/polarsparc/kstreams

$ cd $HOME/java/KafkaStreams/Second


The following is the listing for the Maven project file pom.xml that will be used:


pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <artifactId>KafkaStreams</artifactId>
        <groupId>com.polarsparc.kstreams</groupId>
        <version>1.0</version>
    </parent>

    <artifactId>Second</artifactId>
    <version>1.0</version>

    <dependencies>
        <dependency>
            <artifactId>Common</artifactId>
            <groupId>com.polarsparc.kstreams</groupId>
            <version>1.0</version>
        </dependency>
    </dependencies>
</project>

We need to modify the <modules> section in the parent pom.xml to include the Second module as shown below:


pom.xml (parent)
<modules>
  <module>Common</module>
  <module>First</module>
  <module>Second</module>
</modules>

The following is the Java based STATELESS Kafka Streams application that consumes and processes events from the Kafka topic coffee-flavors:


Listing.2
/*
 * Name:   Coffee Flavors (Stateless)
 * Author: Bhaskar S
 * Date:   11/20/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.kstreams;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Set;

public class CoffeeFlavorsStateless {
    private static final String COFFEE_FLAVORS = "coffee-flavors";

    public static void main(String[] args) {
        if (args.length != 1) {
            System.out.printf("Usage: java %s <1 or 2>\n", CoffeeFlavorsStateless.class.getName());
            System.exit(1);
        }

        int numThreads = 1;
        if (args[0].equals("2")) {
            numThreads = 2;
        }

        Logger log = LoggerFactory.getLogger(CoffeeFlavorsStateless.class.getName());

        Set<String> flavorSet = Set.copyOf(Arrays.asList("caramel", "hazelnut", "mocha", "peppermint"));

        log.info(String.format("---> Num of stream threads: %d", numThreads));

        StreamsConfig config = new StreamsConfig(KafkaConsumerConfig.kafkaConfigurationTwo(
                "coffee-flavor-1", numThreads));

        Serde<String> stringSerde = Serdes.String();

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> stream = builder.stream(COFFEE_FLAVORS, Consumed.with(stringSerde, stringSerde));

        stream.peek((user, flavor) -> log.info(String.format("---> [Start] User: %s, Flavor: %s", user, flavor)))
                .map((user, flavor) -> KeyValue.pair(user.toLowerCase(), flavor.toLowerCase()))
                .filter((user, flavor) -> flavorSet.contains(flavor))
                .map((user, flavor) -> KeyValue.pair(flavor, user))
                .foreach((flavor, user) -> log.info(String.format("---> [Final] %s - %s", flavor, user)));

        KafkaStreams streams = new KafkaStreams(builder.build(), config);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

The code from Listing.2 above needs some explanation:

The class org.apache.kafka.streams.StreamsConfig allows one to encapsulate the various configuration options for the Kafka Streams application.

The interface org.apache.kafka.common.serialization.Serde<T> is an abstraction for the serialization and the deserialization of an object of type T.

The class org.apache.kafka.streams.kstream.Consumed allows one to configure parameters related to Kafka Streams such as the key and value serdes.

The DSL method peek(ACTOR) allows one to apply the given ACTOR function on each of the incoming data events.

Before we proceed further, let us take a peek at the current state of Kafka data directory. Open a terminal window and execute the following command:

ls -l $HOME/kafka/data/

The following should be the typical output:

Output.1

-rw-r--r-- 1 polarsparc polarsparc    0 Nov 14 14:39 cleaner-offset-checkpoint
-rw-r--r-- 1 polarsparc polarsparc    4 Nov 14 14:41 log-start-offset-checkpoint
-rw-r--r-- 1 polarsparc polarsparc   88 Nov 14 14:39 meta.properties
-rw-r--r-- 1 polarsparc polarsparc    4 Nov 14 14:41 recovery-point-offset-checkpoint
-rw-r--r-- 1 polarsparc polarsparc   21 Nov 14 14:41 replication-offset-checkpoint
drwxr-xr-x 2 polarsparc polarsparc 4096 Nov 14 14:41 survey-event-0

Now, we need to create the Kafka topic coffee-flavors with 2 partitions using docker by executing the following command:

docker run --rm --net=host confluentinc/cp-kafka:7.0.0 kafka-topics --create --topic coffee-flavors --partitions 2 --replication-factor 1 --if-not-exists --bootstrap-server localhost:20001

The following should be the typical output:

Output.2

Created topic coffee-flavors.

Once again, let us take a peek at the current state of Kafka data directory by executing the following command:

ls -l $HOME/kafka/data/

The following should be the typical output:

Output.3

-rw-r--r-- 1 polarsparc polarsparc    0 Nov 20 15:21 cleaner-offset-checkpoint
drwxr-xr-x 2 polarsparc polarsparc 4096 Nov 20 14:48 coffee-flavors-0
drwxr-xr-x 2 polarsparc polarsparc 4096 Nov 20 14:48 coffee-flavors-1
-rw-r--r-- 1 polarsparc polarsparc    4 Nov 20 14:48 log-start-offset-checkpoint
-rw-r--r-- 1 polarsparc polarsparc   88 Nov 20 14:39 meta.properties
-rw-r--r-- 1 polarsparc polarsparc   59 Nov 20 14:48 recovery-point-offset-checkpoint
-rw-r--r-- 1 polarsparc polarsparc   59 Nov 20 14:48 replication-offset-checkpoint
drwxr-xr-x 2 polarsparc polarsparc 4096 Nov 14 14:39 survey-event-0

Notice the creation of 2 new sub-directories - coffee-flavors-0 and coffee-flavors-1.

Each Kafka topic (along with the specified number of partitions) will map to a sub-directory under the specified Kafka data directory. In our example, we specified 2 partitions for the Kafka topic coffee-flavors and hence the 2 sub-directories. Each sub-directory will have the format: [topic_name]-[partition_number].

To display the details of the Kafka topic coffee-flavors using docker, execute the following command:

docker run --rm --net=host confluentinc/cp-kafka:7.0.0 kafka-topics --bootstrap-server localhost:20001 --describe --topic coffee-flavors

The following should be the typical output:

Output.4

Topic: coffee-flavors	TopicId: lnu40VHKQcCLkAVU_jYgTQ	PartitionCount: 2	ReplicationFactor: 1	Configs: 
        Topic: coffee-flavors	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
        Topic: coffee-flavors	Partition: 1	Leader: 1	Replicas: 1	Isr: 1

Now is time to test the code from Listing.2. To do that, open a terminal window and execute the following commands:

$ $HOME/java/KafkaStreams/Second

$ mvn clean compile

$ mvn exec:java -Dexec.mainClass=com.polarsparc.kstreams.CoffeeFlavorsStateless -Dexec.args="1"

The following would be the typical output:

Output.5

[com.polarsparc.kstreams.CoffeeFlavorsStateless.main()] INFO com.polarsparc.kstreams.CoffeeFlavorsStateless - ---> Num of stream threads: 1
[com.polarsparc.kstreams.CoffeeFlavorsStateless.main()] INFO org.apache.kafka.streams.StreamsConfig - StreamsConfig values: 
  acceptable.recovery.lag = 10000
  application.id = coffee-flavor-1
  application.server = 
  bootstrap.servers = [localhost:20001]
  buffered.records.per.partition = 1000
  built.in.metrics.version = latest
  cache.max.bytes.buffering = 10485760
  client.id = 
  commit.interval.ms = 1000
... SNIP ...
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] Starting
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] State transition from CREATED to STARTING
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1-consumer, groupId=coffee-flavor-1] Subscribed to topic(s): coffee-flavors
[kafka-producer-network-thread | coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1-producer] INFO org.apache.kafka.clients.Metadata - [Producer clientId=coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1-producer] Cluster ID: l1aQqd95S_eYrvs07bW0MQ
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] Processed 0 total records, ran 0 punctuators, and committed 0 total tasks since the last update
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1-consumer, groupId=coffee-flavor-1] Cluster ID: l1aQqd95S_eYrvs07bW0MQ
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1-consumer, groupId=coffee-flavor-1] Discovered group coordinator localhost:20001 (id: 2147483646 rack: null)
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1-consumer, groupId=coffee-flavor-1] (Re-)joining group
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1-consumer, groupId=coffee-flavor-1] Request joining group due to: need to re-join with the given member-id
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1-consumer, groupId=coffee-flavor-1] (Re-)joining group
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1-consumer, groupId=coffee-flavor-1] Successfully joined group with generation Generation{generationId=1, memberId='coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1-consumer-548ed399-6a56-46dd-a41d-c90471b94d1c', protocol='stream'}
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor - stream-thread [coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1-consumer] All members participating in this rebalance: 
b6dff517-c409-4e50-9da9-195debedc9e3: [coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1-consumer-548ed399-6a56-46dd-a41d-c90471b94d1c].
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor - Decided on assignment: {b6dff517-c409-4e50-9da9-195debedc9e3=[activeTasks: ([0_0, 0_1]) standbyTasks: ([]) prevActiveTasks: ([]) prevStandbyTasks: ([]) changelogOffsetTotalsByTask: ([]) taskLagTotals: ([]) capacity: 1 assigned: 2]} with no followup probing rebalance.
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor - stream-thread [coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1-consumer] Assigned tasks [0_1, 0_0] including stateful [] to clients as: 
b6dff517-c409-4e50-9da9-195debedc9e3=[activeTasks: ([0_0, 0_1]) standbyTasks: ([])].
... SNIP ...
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1-consumer, groupId=coffee-flavor-1] Updating assignment with
  Assigned partitions:                       [coffee-flavors-0, coffee-flavors-1]
  Current owned partitions:                  []
  Added partitions (assigned - owned):       [coffee-flavors-0, coffee-flavors-1]
  Revoked partitions (owned - assigned):     []
... SNIP ...
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] Restoration took 51 ms for all tasks [0_0, 0_1]
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3] State transition from REBALANCING to RUNNING
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1-consumer, groupId=coffee-flavor-1] Requesting the log end offset for coffee-flavors-0 in order to compute lag
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1-consumer, groupId=coffee-flavor-1] Requesting the log end offset for coffee-flavors-1 in order to compute lag
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] Processed 0 total records, ran 0 punctuators, and committed 0 total tasks since the last update

Notice the name of the thread in the log lines - they all end with the suffix StreamThread-1 indicating there is one stream thread.

Next, look at the line that ends with [activeTasks: ([0_0, 0_1]) standbyTasks: ([])]. This indicates there are 2 stream tasks (each corresponding to a topic partition) in the Kafka Streams application, which matches what we indicated in the concepts section above.

Finally, look at the line with Assigned partitions: [coffee-flavors-0, coffee-flavors-1]. This implies that the single stream thread is executing both the stream tasks that corresponds to the 2 topic partitions.

We need to publish some events to the Kafka topic coffee-flavors. Open a terminal window and run the Kafka console publisher using the following command:

$ docker run -it --rm --net=host confluentinc/cp-kafka:7.0.0 kafka-console-producer --bootstrap-server localhost:20001 --property key.separator=, --property parse.key=true --request-required-acks 1 --topic coffee-flavors

The prompt will change to >.

Enter the following events:

>Alice,Caramel

>Bob,Mocha

>Charlie,Hazelnut

>Eva,Peppermint

>Frank,Mocha

>George,Caramel

>Harry,Mocha

The following would be the additional output in the terminal running the application from Listing.2 above:

Output.6

[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] Processed 0 total records, ran 0 punctuators, and committed 0 total tasks since the last update
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO com.polarsparc.kstreams.CoffeeFlavorsStateless - ---> [Start] User: Alice, Flavor: Caramel
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO com.polarsparc.kstreams.CoffeeFlavorsStateless - ---> [Final] caramel - alice
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO com.polarsparc.kstreams.CoffeeFlavorsStateless - ---> [Start] User: Bob, Flavor: Mocha
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO com.polarsparc.kstreams.CoffeeFlavorsStateless - ---> [Final] mocha - bob
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO com.polarsparc.kstreams.CoffeeFlavorsStateless - ---> [Start] User: Charlie, Flavor: Hazelnut
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO com.polarsparc.kstreams.CoffeeFlavorsStateless - ---> [Final] hazelnut - charlie
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO com.polarsparc.kstreams.CoffeeFlavorsStateless - ---> [Start] User: Eva, Flavor: Peppermint
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO com.polarsparc.kstreams.CoffeeFlavorsStateless - ---> [Final] peppermint - eva
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] Processed 4 total records, ran 0 punctuators, and committed 4 total tasks since the last update
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO com.polarsparc.kstreams.CoffeeFlavorsStateless - ---> [Start] User: Frank, Flavor: Mocha
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO com.polarsparc.kstreams.CoffeeFlavorsStateless - ---> [Final] mocha - frank
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO com.polarsparc.kstreams.CoffeeFlavorsStateless - ---> [Start] User: George, Flavor: Caramel
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO com.polarsparc.kstreams.CoffeeFlavorsStateless - ---> [Final] caramel - george
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO com.polarsparc.kstreams.CoffeeFlavorsStateless - ---> [Start] User: Harry, Flavor: Mocha
[coffee-flavor-1-b6dff517-c409-4e50-9da9-195debedc9e3-StreamThread-1] INFO com.polarsparc.kstreams.CoffeeFlavorsStateless - ---> [Final] mocha - harry

Now, stop the application from Listing.2 above that is running in the terminal by pressing CTRL-C.

Given that the recent run of our application from Listing.2 above consumed all the messages (data events) from the specific Kafka topic, we need to reset the topic offset (corresponding to our application id) back to the beginning so thet we can re-run our application and consume the same messages again. To do that using docker, execute the following command:

docker run --rm --net=host confluentinc/cp-kafka:7.0.0 kafka-streams-application-reset --bootstrap-servers localhost:20001 --application-id coffee-flavor-1 --input-topics coffee-flavors --to-earliest

The following should be the typical output:

Output.7

Reset-offsets for input topics [coffee-flavors]
Following input topics offsets will be reset to (for consumer group coffee-flavor-1)
Topic: coffee-flavors Partition: 0 Offset: 0
Topic: coffee-flavors Partition: 1 Offset: 0
Done.
Deleting inferred internal topics []
Done.

To re-run the code from Listing.2, execute the following commands:

$ $HOME/java/KafkaStreams/Second

$ mvn exec:java -Dexec.mainClass=com.polarsparc.kstreams.CoffeeFlavorsStateless -Dexec.args="2"

The following would be the typical output:

Output.8

[com.polarsparc.kstreams.CoffeeFlavorsStateless.main()] INFO com.polarsparc.kstreams.CoffeeFlavorsStateless - ---> Num of stream threads: 2
[com.polarsparc.kstreams.CoffeeFlavorsStateless.main()] INFO org.apache.kafka.streams.StreamsConfig - StreamsConfig values: 
  acceptable.recovery.lag = 10000
  application.id = coffee-flavor-1
  application.server = 
  bootstrap.servers = [localhost:20001]
  buffered.records.per.partition = 1000
  built.in.metrics.version = latest
  cache.max.bytes.buffering = 10485760
  client.id = 
  commit.interval.ms = 1000
... SNIP ...
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] Starting
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2] Starting
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] State transition from CREATED to STARTING
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2] State transition from CREATED to STARTING
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1-consumer, groupId=coffee-flavor-1] Subscribed to topic(s): coffee-flavors
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2-consumer, groupId=coffee-flavor-1] Subscribed to topic(s): coffee-flavors
[kafka-producer-network-thread | coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2-producer] INFO org.apache.kafka.clients.Metadata - [Producer clientId=coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2-producer] Cluster ID: l1aQqd95S_eYrvs07bW0MQ
[kafka-producer-network-thread | coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1-producer] INFO org.apache.kafka.clients.Metadata - [Producer clientId=coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1-producer] Cluster ID: l1aQqd95S_eYrvs07bW0MQ
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1-consumer, groupId=coffee-flavor-1] Cluster ID: l1aQqd95S_eYrvs07bW0MQ
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2-consumer, groupId=coffee-flavor-1] Cluster ID: l1aQqd95S_eYrvs07bW0MQ
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1-consumer, groupId=coffee-flavor-1] Discovered group coordinator localhost:20001 (id: 2147483646 rack: null)
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2-consumer, groupId=coffee-flavor-1] Discovered group coordinator localhost:20001 (id: 2147483646 rack: null)
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2-consumer, groupId=coffee-flavor-1] (Re-)joining group
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1-consumer, groupId=coffee-flavor-1] (Re-)joining group
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2] Processed 0 total records, ran 0 punctuators, and committed 0 total tasks since the last update
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] Processed 0 total records, ran 0 punctuators, and committed 0 total tasks since the last update
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1-consumer, groupId=coffee-flavor-1] Request joining group due to: need to re-join with the given member-id
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2-consumer, groupId=coffee-flavor-1] Request joining group due to: need to re-join with the given member-id
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1-consumer, groupId=coffee-flavor-1] (Re-)joining group
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2-consumer, groupId=coffee-flavor-1] (Re-)joining group
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1-consumer, groupId=coffee-flavor-1] Successfully joined group with generation Generation{generationId=8, memberId='coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1-consumer-1ebb54b4-89aa-450f-be82-a556601a0461', protocol='stream'}
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor - stream-thread [coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1-consumer] All members participating in this rebalance: 
941a845d-b16e-4f2a-90f8-38868aaa16c8: [coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1-consumer-1ebb54b4-89aa-450f-be82-a556601a0461].
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor - Decided on assignment: {941a845d-b16e-4f2a-90f8-38868aaa16c8=[activeTasks: ([0_0, 0_1]) standbyTasks: ([]) prevActiveTasks: ([]) prevStandbyTasks: ([]) changelogOffsetTotalsByTask: ([]) taskLagTotals: ([]) capacity: 1 assigned: 2]} with no followup probing rebalance.
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor - stream-thread [coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1-consumer] Assigned tasks [0_1, 0_0] including stateful [] to clients as: 
941a845d-b16e-4f2a-90f8-38868aaa16c8=[activeTasks: ([0_0, 0_1]) standbyTasks: ([])].
... SNIP ...
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2-consumer, groupId=coffee-flavor-1] Updating assignment with
  Assigned partitions:                       [coffee-flavors-1]
  Current owned partitions:                  []
  Added partitions (assigned - owned):       [coffee-flavors-1]
  Revoked partitions (owned - assigned):     []
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1-consumer, groupId=coffee-flavor-1] Updating assignment with
  Assigned partitions:                       [coffee-flavors-0]
  Current owned partitions:                  []
  Added partitions (assigned - owned):       [coffee-flavors-0]
  Revoked partitions (owned - assigned):     []
... SNIP ...
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2] INFO org.apache.kafka.streams.processor.internals.TaskManager - stream-thread [coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2] Handle new assignment with:
  New active tasks: [0_1]
  New standby tasks: []
  Existing active tasks: []
  Existing standby tasks: []
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.TaskManager - stream-thread [coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] Handle new assignment with:
  New active tasks: [0_0]
  New standby tasks: []
  Existing active tasks: []
  Existing standby tasks: []
... SNIP ...
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2] INFO org.apache.kafka.streams.processor.internals.StreamTask - stream-thread [coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2] task [0_1] Restored and ready to run
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamTask - stream-thread [coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] task [0_0] Restored and ready to run
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2] Restoration took 31 ms for all tasks [0_1]
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] Restoration took 31 ms for all tasks [0_0]
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2] State transition from PARTITIONS_ASSIGNED to RUNNING
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8] State transition from REBALANCING to RUNNING
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2-consumer, groupId=coffee-flavor-1] Requesting the log end offset for coffee-flavors-1 in order to compute lag
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1-consumer, groupId=coffee-flavor-1] Requesting the log end offset for coffee-flavors-0 in order to compute lag
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] INFO com.polarsparc.kstreams.CoffeeFlavorsStateless - ---> [Start] User: Charlie, Flavor: Hazelnut
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2] INFO com.polarsparc.kstreams.CoffeeFlavorsStateless - ---> [Start] User: Alice, Flavor: Caramel
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] INFO com.polarsparc.kstreams.CoffeeFlavorsStateless - ---> [Final] hazelnut - charlie
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2] INFO com.polarsparc.kstreams.CoffeeFlavorsStateless - ---> [Final] caramel - alice
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] INFO com.polarsparc.kstreams.CoffeeFlavorsStateless - ---> [Start] User: Eva, Flavor: Peppermint
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2] INFO com.polarsparc.kstreams.CoffeeFlavorsStateless - ---> [Start] User: Bob, Flavor: Mocha
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] INFO com.polarsparc.kstreams.CoffeeFlavorsStateless - ---> [Final] peppermint - eva
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2] INFO com.polarsparc.kstreams.CoffeeFlavorsStateless - ---> [Final] mocha - bob
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] INFO com.polarsparc.kstreams.CoffeeFlavorsStateless - ---> [Start] User: George, Flavor: Caramel
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2] INFO com.polarsparc.kstreams.CoffeeFlavorsStateless - ---> [Start] User: Frank, Flavor: Mocha
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] INFO com.polarsparc.kstreams.CoffeeFlavorsStateless - ---> [Final] caramel - george
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-2] INFO com.polarsparc.kstreams.CoffeeFlavorsStateless - ---> [Final] mocha - frank
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] INFO com.polarsparc.kstreams.CoffeeFlavorsStateless - ---> [Start] User: Harry, Flavor: Mocha
[coffee-flavor-1-941a845d-b16e-4f2a-90f8-38868aaa16c8-StreamThread-1] INFO com.polarsparc.kstreams.CoffeeFlavorsStateless - ---> [Final] mocha - harry

Notice the name of the threads in the log lines - they all end with the suffix StreamThread-1 and StreamThread-2 respectively, indicating there are two stream threads in the application.

Next, look at the line Assigned partitions: [coffee-flavors-1] - we see it assigned to the stream thread StreamThread-2. Similarly, look at the line Assigned partitions: [coffee-flavors-0] - it is assigned to the stream thread StreamThread-1. This is related to the concept of Rebalancing. The process of adding and/or removing topic partitions to the application consumer(s) is called Rebalancing. The assignment of partitions to consumer(s) is dynamic in nature. When consumer(s) are added to the same application id, some of the current partition assignments are taken from currently active consumer(s) and redistributed to the newly added consumer(s).

Finally, look at the line with task [0_1] Restored and ready to run - we see the associated task corresponding with the stream thread StreamThread-2. Similarly, look at the line with task [0_0] Restored and ready to run - we see the associated task corresponding with the stream thread StreamThread-1.


References

Exploring Kafka Streams :: Part 1

Kafka Streams Developer Guide



© PolarSPARC