PolarSPARC

Exploring Kafka Streams :: Part 5


Bhaskar S 12/05/2021


Overview

In Part 4 of this series, we explored how to serialize and deserialize custom data values in Kafka Streams.

In this part of the series, we will explore grouping of data events using the concept of Windowing in Kafka Streams.

Kafka Streams Concepts

Windowing

The concept of Windowing allows one to group data events (records) together for a given key (same key) within a given window boundary for stateful aggregation operations such as, count(), aggregate(), reduce() etc. One point to keep in mind - the windowing techniques are based on the data events timestamp.

For certain analytics use-cases, it makes sense to look at events within a period of time window to make real-time decisions such as, fraud detection, inventory management, etc.

Kafka Streams supports the following windowing types:

Hands-on with Kafka Streams

Fourth Application

In the Fourth module, we will simulate a data event stream, where each event contains a fictitious product key (as a string) and how many quantities were sold (as an integer) as value. We will demonstrate a STATEFUL Kafka Streams application that will consume the stream of data events from the specified Kafka topic and aggregate the total quantity of product sold using the three windowing techniques - tumbling, hopping, and sliding.

To setup the Java directory structure for the Fourth application, execute the following commands:

$ cd $HOME/java/KafkaStreams

$ mkdir -p $HOME/java/KafkaStreams/Fourth

$ mkdir -p Fourth/src/main/java Fourth/src/main/resources Fourth/target

$ mkdir -p Fourth/src/main/java/com/polarsparc/kstreams

$ cd $HOME/java/KafkaStreams/Fourth


The following is the listing for the Maven project file pom.xml that will be used:


pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <artifactId>KafkaStreams</artifactId>
        <groupId>com.polarsparc.kstreams</groupId>
        <version>1.0</version>
    </parent>

    <artifactId>Fourth</artifactId>
    <version>1.0</version>

    <dependencies>
        <dependency>
            <artifactId>Common</artifactId>
            <groupId>com.polarsparc.kstreams</groupId>
            <version>1.0</version>
        </dependency>
    </dependencies>
</project>

We need to modify the <modules> section in the parent pom.xml to include the Fourth module as shown below:


pom.xml (parent)
<modules>
  <module>Common</module>
  <module>First</module>
  <module>Second</module>
  <module>Third</module>
  <module>Fourth</module>
</modules>

We will need a Kafka producer that will generate the stream of data events. The following is the Java Kafka publisher:


Listing.1
/*
 * Name:   Stream Data Event Generator
 * Author: Bhaskar S
 * Date:   12/04/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.kstreams;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Random;

public class StreamEventGenerator {
    private final static int MAX_EVENTS = 10;
    private final static int MAX_GAP = 3000; // 3 seconds = 3000 ms

    private static final Logger log = LoggerFactory.getLogger(StreamEventGenerator.class.getName());

    private final static Random random = new Random(1001);

    private final static List<String> keysList = Arrays.asList("A", "M", "S", "B", "N", "T");
    private final static List<Integer> valuesList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);

    private static KafkaProducer<String, Integer> createEventProducer() {
        Properties config = new Properties();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:20001");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        return new KafkaProducer<>(config);
    }

    private static void generateDataEvent(boolean flag, String topic, Producer<String, Integer> producer) {
        log.info(String.format("   ---> Topic: %s", topic));

        int cnt = random.nextInt(MAX_EVENTS);
        int gap = random.nextInt(MAX_GAP);

        log.info(String.format("   ---> Events Count: %d", cnt));

        for (int i = 1; i <= cnt; i++) {
            int ki = random.nextInt(keysList.size());
            int vi = random.nextInt(valuesList.size());

            String key = keysList.get(ki);
            Integer value = valuesList.get(vi);

            log.info(String.format("   ---> [%d] Key: %s, Value: %d", i, key, value));

            if (!flag) {
                try {
                    producer.send(new ProducerRecord<>(topic, key, value)).get();
                }
                catch (Exception ex) {
                    log.error(ex.getMessage());
                }
            }
        }

        try {
            Thread.sleep(gap);
        }
        catch (Exception ignore)  {
        }

        log.info(String.format("   ---> Sleep Gap: %d", gap));
    }

    public static void main(String[] args) {
        if (args.length < 1) {
            System.out.printf("Usage: java %s <topic-name> [--dry-run]\n", StreamEventGenerator.class.getName());
            System.exit(1);
        }

        boolean dryRun = args.length == 2 && args[1].equalsIgnoreCase("--dry-run");

        Producer<String, Integer> producer = null;
        if (!dryRun) {
            producer = createEventProducer();
        }

        for (int i = 1; i <= 5; i++) {
            log.info(String.format("---------> Iteration: %d", i));

            generateDataEvent(dryRun, args[0], producer);
        }

        if (!dryRun) {
            producer.close();
        }
    }
}

The way the Kafka producer will works is - in each iteration, it will publish a set of random events and then sleep for a random time (that is less than 3 seconds) before continuing with the next iteration.

Also, we have fixed the random number generator seed to generate a predictable set of events.

In addition, there is a --dry-run option to display the data events.

To compile and test the code from Listing.1 in the --dry-run mode, open a terminal window and execute the following commands:

$ $HOME/java/KafkaStreams/Fourth

$ mvn clean compile

$ mvn exec:java -Dexec.mainClass=com.polarsparc.kstreams.StreamEventGenerator -Dexec.args="test --dry-run"

The following should be the typical output:

Output.1

[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---------> Iteration: 1
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> Topic: test
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> Events Count: 3
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [1] Key: A, Value: 3
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [2] Key: B, Value: 7
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [3] Key: A, Value: 5
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> Sleep Gap: 2223
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---------> Iteration: 2
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> Topic: test
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> Events Count: 4
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [1] Key: T, Value: 8
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [2] Key: B, Value: 9
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [3] Key: M, Value: 3
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [4] Key: M, Value: 4
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> Sleep Gap: 343
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---------> Iteration: 3
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> Topic: test
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> Events Count: 9
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [1] Key: M, Value: 3
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [2] Key: A, Value: 5
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [3] Key: T, Value: 3
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [4] Key: M, Value: 8
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [5] Key: A, Value: 4
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [6] Key: S, Value: 8
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [7] Key: T, Value: 2
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [8] Key: T, Value: 2
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [9] Key: S, Value: 3
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> Sleep Gap: 2110
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---------> Iteration: 4
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> Topic: test
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> Events Count: 4
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [1] Key: S, Value: 3
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [2] Key: T, Value: 8
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [3] Key: T, Value: 1
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [4] Key: A, Value: 6
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> Sleep Gap: 402
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---------> Iteration: 5
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> Topic: test
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> Events Count: 5
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [1] Key: A, Value: 8
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [2] Key: T, Value: 8
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [3] Key: B, Value: 7
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [4] Key: T, Value: 4
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [5] Key: B, Value: 8
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> Sleep Gap: 2585

The following is the Java based STATEFUL Kafka Streams application that consumes and processes events from the specific Kafka topic based on the input argument (that controls the type of windowing technique):


Listing.2
/*
 * Name:   Sum values using various Windowing options
 * Author: Bhaskar S
 * Date:   12/04/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.kstreams;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;

public class WindowedSum {
    private static void usage() {
        System.out.printf("Usage: java %s <TM | HP | SL>\n", WindowedSum.class.getName());
        System.exit(1);
    }

    public static void main(String[] args) {
        if (args.length != 1) {
            usage();
        }

        String topicName = switch (args[0]) {
            case "TM" -> "tumbling-events";
            case "HP" -> "hopping-events";
            case "SL" -> "sliding-events";
            default -> null;
        };

        if (topicName == null) {
            usage();
        }

        Logger log = LoggerFactory.getLogger(WindowedSum.class.getName());

        log.info(String.format("---> Event type: %s", topicName));

        StreamsConfig config = new StreamsConfig(KafkaConsumerConfig.kafkaConfigurationTwo(topicName, 1));

        Serde<String> stringSerde = Serdes.String();
        Serde<Integer> integerSerde = Serdes.Integer();

        // Window duration
        Duration windowSz = Duration.ofSeconds(5);

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, Integer> stream = builder.stream(topicName, Consumed.with(stringSerde, integerSerde));

        KGroupedStream<String, Integer> groupedStream = stream
                .peek((key, value) -> log.info(String.format("---> [%d] >> Key: %s, Value: %d",
                        System.currentTimeMillis(), key, value)))
                .groupByKey();

        TimeWindowedKStream<String, Integer> windowedStream = switch (args[0]) {
            default -> { // TM is the default
                TimeWindows tumblingWindow = TimeWindows.ofSizeWithNoGrace(windowSz);
                tumblingWindow.advanceBy(windowSz); // *IMPORTANT* for tumbling window size = advance
                yield groupedStream.windowedBy(tumblingWindow);
            }
            case "HP" -> {
                Duration advanceSz = Duration.ofSeconds(2);
                TimeWindows hoppingWindow = TimeWindows.ofSizeWithNoGrace(windowSz);
                hoppingWindow.advanceBy(advanceSz);
                yield groupedStream.windowedBy(hoppingWindow);
            }
            case "SL" -> {
                Duration graceSz = Duration.ofMillis(500); // Grace period
                SlidingWindows slidingWindow = SlidingWindows.ofTimeDifferenceAndGrace(windowSz, graceSz);
                yield groupedStream.windowedBy(slidingWindow);
            }
        };
        windowedStream.reduce(Integer::sum)
                .toStream()
                .peek((winKey, sum) -> log.info(String.format("---> [%d] >> Window: %s, Key: %s, Sum: %d",
                        System.currentTimeMillis(), winKey.window().toString(), winKey.key(), sum)));

        Topology topology = builder.build();

        log.info(String.format("---> %s", topology.describe().toString()));

        KafkaStreams streams = new KafkaStreams(topology, config);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

The code from Listing.2 above needs some explanation:

The interface class org.apache.kafka.streams.kstream.TimeWindowedKStream<K, V> is an abstraction of a windowed data event stream of the key-value pairs. It is basically an intermediate representation of a KStream that can used for performing windowed aggregation operations, such as, count(), aggregate(), reduce(), etc.

The class org.apache.kafka.streams.kstream.TimeWindows encapsulates a timed window that is of a fixed duration (size) and shifts forward at fixed interval (advance). When advance < size, we get a hopping window, and when advance = size, we get a tumbling window.

The static method TimeWindows.ofSizeWithNoGrace(SIZE) returns a fixed length window of SIZE.

The method advanceBy(SIZE) on an instance of TimeWindows instance, sets the advance (or hop) SIZE.

The class org.apache.kafka.streams.kstream.SlidingWindows encapsulates a sliding window that is of a fixed duration (size) and is adjusted based on the incoming data event's timestamp. It will only include data events that lie in the time duration plus the grace period.

The static method SlidingWindows.ofTimeDifferenceAndGrace(SIZE, GRACE) returns a fixed length sliding window of SIZE with a grace period of GRACE.

The DSL method reduce(AGGREGATOR) allows one to aggregate each incoming data events that are grouped together by a key using the specified AGGREGATOR function.

Now, we need to create the Kafka topics tumbling-events, hopping-events, and sliding-events corresponding to the three windowing techniques we want to demonstrate.

To create the Kafka topic tumbling-events with a single partition using docker, execute the following command:

docker run --rm --net=host confluentinc/cp-kafka:7.0.0 kafka-topics --create --topic tumbling-events --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server localhost:20001

The following should be the typical output:

Output.2

Created topic tumbling-events.

To create the Kafka topic hopping-events with a single partition using docker, execute the following command:

docker run --rm --net=host confluentinc/cp-kafka:7.0.0 kafka-topics --create --topic hopping-events --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server localhost:20001

The following should be the typical output:

Output.3

Created topic hopping-events.

Finally, to create the Kafka topic sliding-events with a single partition using docker, execute the following command:

docker run --rm --net=host confluentinc/cp-kafka:7.0.0 kafka-topics --create --topic sliding-events --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server localhost:20001

The following should be the typical output:

Output.4

Created topic sliding-events.

Now is time to compile the code from Listing.1 and Listing.2. To do that, open a terminal window and execute the following commands:

$ $HOME/java/KafkaStreams/Fourth

$ mvn clean compile

Now, it is time to test the code from Listing.2 using the tumbling window technique. To do that, open a terminal window and execute the following commands:

$ $HOME/java/KafkaStreams/Fourth

$ mvn exec:java -Dexec.mainClass=com.polarsparc.kstreams.WindowedSum -Dexec.args="TM"

The following should be the typical output:

Output.5

[com.polarsparc.kstreams.WindowedSum.main()] INFO com.polarsparc.kstreams.WindowedSum - ---> Event type: tumbling-events
... SNIP ...
[com.polarsparc.kstreams.WindowedSum.main()] INFO com.polarsparc.kstreams.WindowedSum - ---> Topologies:
    Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [tumbling-events])
      --> KSTREAM-PEEK-0000000001
    Processor: KSTREAM-PEEK-0000000001 (stores: [])
      --> KSTREAM-REDUCE-0000000003
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-REDUCE-0000000003 (stores: [KSTREAM-REDUCE-STATE-STORE-0000000002])
      --> KTABLE-TOSTREAM-0000000004
      <-- KSTREAM-PEEK-0000000001
    Processor: KTABLE-TOSTREAM-0000000004 (stores: [])
      --> KSTREAM-PEEK-0000000005
      <-- KSTREAM-REDUCE-0000000003
    Processor: KSTREAM-PEEK-0000000005 (stores: [])
      --> none
      <-- KTABLE-TOSTREAM-0000000004
... SNIP ...
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e] State transition from REBALANCING to RUNNING
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1-consumer, groupId=tumbling-events] Requesting the log end offset for tumbling-events-0 in order to compute lag

Now, it is time to run the producer code from Listing.1 to publish to the Kafka topic tumbling-events. To do that, open a terminal window and execute the following commands:

$ $HOME/java/KafkaStreams/Fourth

$ mvn exec:java -Dexec.mainClass=com.polarsparc.kstreams.StreamEventGenerator -Dexec.args="tumbling-events"

The following should be the typical output:

Output.6

... SNIP ...
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.0.0
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1638723341187
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---------> Iteration: 1
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> Topic: tumbling-events
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> Events Count: 3
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [1] Key: A, Value: 3
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: yS_GW5KbTBqUJVfzsjJn0A
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [2] Key: B, Value: 7
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [3] Key: A, Value: 5
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> Sleep Gap: 2223
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---------> Iteration: 2
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> Topic: tumbling-events
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> Events Count: 4
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [1] Key: T, Value: 8
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [2] Key: B, Value: 9
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [3] Key: M, Value: 3
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [4] Key: M, Value: 4
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> Sleep Gap: 343
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---------> Iteration: 3
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> Topic: tumbling-events
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> Events Count: 9
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [1] Key: M, Value: 3
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [2] Key: A, Value: 5
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [3] Key: T, Value: 3
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [4] Key: M, Value: 8
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [5] Key: A, Value: 4
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [6] Key: S, Value: 8
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [7] Key: T, Value: 2
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [8] Key: T, Value: 2
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [9] Key: S, Value: 3
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> Sleep Gap: 2110
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---------> Iteration: 4
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> Topic: tumbling-events
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> Events Count: 4
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [1] Key: S, Value: 3
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [2] Key: T, Value: 8
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [3] Key: T, Value: 1
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [4] Key: A, Value: 6
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> Sleep Gap: 402
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---------> Iteration: 5
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> Topic: tumbling-events
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> Events Count: 5
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [1] Key: A, Value: 8
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [2] Key: T, Value: 8
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [3] Key: B, Value: 7
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [4] Key: T, Value: 4
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> [5] Key: B, Value: 8
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator -    ---> Sleep Gap: 2585
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-1 unregistered

The following would be the additional output in the terminal running the application from Listing.2 above:

Output.7

[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723341535] >> Key: A, Value: 3
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723341541] >> Key: B, Value: 7
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723341542] >> Key: A, Value: 5
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO org.apache.kafka.streams.state.internals.RocksDBTimestampedStore - Opening store KSTREAM-REDUCE-STATE-STORE-0000000002.1638723300000 in regular mode
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723341996] >> Window: Window{startMs=1638723340000, endMs=1638723345000}, Key: B, Sum: 7
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723341997] >> Window: Window{startMs=1638723340000, endMs=1638723345000}, Key: A, Sum: 8
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723343762] >> Key: T, Value: 8
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723343766] >> Key: B, Value: 9
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723343771] >> Key: M, Value: 3
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723343775] >> Key: M, Value: 4
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723344077] >> Window: Window{startMs=1638723340000, endMs=1638723345000}, Key: T, Sum: 8
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723344078] >> Window: Window{startMs=1638723340000, endMs=1638723345000}, Key: B, Sum: 16
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723344079] >> Window: Window{startMs=1638723340000, endMs=1638723345000}, Key: M, Sum: 7
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723344127] >> Key: M, Value: 3
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723344131] >> Key: A, Value: 5
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723344133] >> Key: T, Value: 3
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723344136] >> Key: M, Value: 8
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723344138] >> Key: A, Value: 4
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723344141] >> Key: S, Value: 8
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723344145] >> Key: T, Value: 2
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723344147] >> Key: T, Value: 2
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723344151] >> Key: S, Value: 3
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723345155] >> Window: Window{startMs=1638723340000, endMs=1638723345000}, Key: M, Sum: 18
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723345156] >> Window: Window{startMs=1638723340000, endMs=1638723345000}, Key: A, Sum: 17
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723345157] >> Window: Window{startMs=1638723340000, endMs=1638723345000}, Key: T, Sum: 15
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723345158] >> Window: Window{startMs=1638723340000, endMs=1638723345000}, Key: S, Sum: 11
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723346268] >> Key: S, Value: 3
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723346271] >> Key: T, Value: 8
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723346275] >> Key: T, Value: 1
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723346277] >> Key: A, Value: 6
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723346686] >> Key: A, Value: 8
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723346690] >> Key: T, Value: 8
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723346694] >> Key: B, Value: 7
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723346697] >> Key: T, Value: 4
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723346699] >> Key: B, Value: 8
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723347202] >> Window: Window{startMs=1638723345000, endMs=1638723350000}, Key: S, Sum: 3
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723347203] >> Window: Window{startMs=1638723345000, endMs=1638723350000}, Key: A, Sum: 14
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723347204] >> Window: Window{startMs=1638723345000, endMs=1638723350000}, Key: T, Sum: 21
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723347205] >> Window: Window{startMs=1638723345000, endMs=1638723350000}, Key: B, Sum: 15

Analyzing the lines from the Output.7 above we can see the tumbling window aggregation working as expected.

Moving on, it is time to test the code from Listing.2 using the hopping window technique. To do that, open a terminal window and execute the following commands:

$ $HOME/java/KafkaStreams/Fourth

$ mvn exec:java -Dexec.mainClass=com.polarsparc.kstreams.WindowedSum -Dexec.args="HP"

We will not show the output as it will be similar in nature to Output.5 above (except will indicate hopping-events).

Next, it is time to run the producer code from Listing.1 to publish to the Kafka topic hopping-events. To do that, open a terminal window and execute the following commands:

$ $HOME/java/KafkaStreams/Fourth

$ mvn exec:java -Dexec.mainClass=com.polarsparc.kstreams.StreamEventGenerator -Dexec.args="hopping-events"

We will not show the output as it will be similar to the Output.6 above.

The following would be the additional output in the terminal running the application from Listing.2 above (in the hopping window mode):

Output.8

[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723624318] >> Key: A, Value: 3
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723624323] >> Key: B, Value: 7
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723624324] >> Key: A, Value: 5
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO org.apache.kafka.streams.state.internals.RocksDBTimestampedStore - Opening store KSTREAM-REDUCE-STATE-STORE-0000000002.1638723600000 in regular mode
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723624978] >> Window: Window{startMs=1638723620000, endMs=1638723625000}, Key: B, Sum: 7
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723624980] >> Window: Window{startMs=1638723620000, endMs=1638723625000}, Key: A, Sum: 8
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723626545] >> Key: T, Value: 8
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723626549] >> Key: B, Value: 9
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723626552] >> Key: M, Value: 3
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723626555] >> Key: M, Value: 4
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723626904] >> Key: M, Value: 3
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723626907] >> Key: A, Value: 5
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723626911] >> Key: T, Value: 3
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723626914] >> Key: M, Value: 8
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723626916] >> Key: A, Value: 4
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723626919] >> Key: S, Value: 8
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723626922] >> Key: T, Value: 2
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723626924] >> Key: T, Value: 2
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723626927] >> Key: S, Value: 3
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723627030] >> Window: Window{startMs=1638723625000, endMs=1638723630000}, Key: B, Sum: 9
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723627031] >> Window: Window{startMs=1638723625000, endMs=1638723630000}, Key: M, Sum: 18
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723627032] >> Window: Window{startMs=1638723625000, endMs=1638723630000}, Key: A, Sum: 9
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723627033] >> Window: Window{startMs=1638723625000, endMs=1638723630000}, Key: T, Sum: 15
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723627034] >> Window: Window{startMs=1638723625000, endMs=1638723630000}, Key: S, Sum: 11
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723629043] >> Key: S, Value: 3
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723629046] >> Key: T, Value: 8
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723629048] >> Window: Window{startMs=1638723625000, endMs=1638723630000}, Key: S, Sum: 14
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723629049] >> Window: Window{startMs=1638723625000, endMs=1638723630000}, Key: T, Sum: 23
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723629054] >> Key: T, Value: 1
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723629056] >> Key: A, Value: 6
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723629460] >> Key: A, Value: 8
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723629464] >> Key: T, Value: 8
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723629467] >> Key: B, Value: 7
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723629470] >> Key: T, Value: 4
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723629473] >> Key: B, Value: 8
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723630075] >> Window: Window{startMs=1638723625000, endMs=1638723630000}, Key: A, Sum: 23
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723630076] >> Window: Window{startMs=1638723625000, endMs=1638723630000}, Key: T, Sum: 36
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723630077] >> Window: Window{startMs=1638723625000, endMs=1638723630000}, Key: B, Sum: 24

Analyzing the lines from the Output.8 above we can see the hopping window aggregation working as expected.

Finally, it is time to test the code from Listing.2 using the sliding window technique. To do that, open a terminal window and execute the following commands:

$ $HOME/java/KafkaStreams/Fourth

$ mvn exec:java -Dexec.mainClass=com.polarsparc.kstreams.WindowedSum -Dexec.args="SL"

We will not show the output as it will be similar in nature to Output.5 above (except will indicate sliding-events).

Next, it is time to run the producer code from Listing.1 to publish to the Kafka topic sliding-events. To do that, open a terminal window and execute the following commands:

$ $HOME/java/KafkaStreams/Fourth

$ mvn exec:java -Dexec.mainClass=com.polarsparc.kstreams.StreamEventGenerator -Dexec.args="sliding-events"

We will not show the output as it will be similar to the Output.6 above.

The following would be the additional output in the terminal running the application from Listing.2 above (in the sliding window mode):

Output.9

[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723890568] >> Key: A, Value: 3
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723890576] >> Key: B, Value: 7
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723890577] >> Key: A, Value: 5
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO org.apache.kafka.streams.state.internals.RocksDBTimestampedStore - Opening store KSTREAM-REDUCE-STATE-STORE-0000000002.1638723840000 in regular mode
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723891033] >> Window: Window{startMs=1638723885524, endMs=1638723890524}, Key: A, Sum: 3
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723891034] >> Window: Window{startMs=1638723885558, endMs=1638723890558}, Key: B, Sum: 7
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723891035] >> Window: Window{startMs=1638723890525, endMs=1638723895525}, Key: A, Sum: 5
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723891035] >> Window: Window{startMs=1638723885562, endMs=1638723890562}, Key: A, Sum: 8
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723892797] >> Key: T, Value: 8
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723892801] >> Key: B, Value: 9
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723892804] >> Key: M, Value: 3
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723892808] >> Key: M, Value: 4
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723893111] >> Window: Window{startMs=1638723887792, endMs=1638723892792}, Key: T, Sum: 8
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723893112] >> Window: Window{startMs=1638723890559, endMs=1638723895559}, Key: B, Sum: 9
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723893113] >> Window: Window{startMs=1638723887796, endMs=1638723892796}, Key: B, Sum: 16
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723893114] >> Window: Window{startMs=1638723887800, endMs=1638723892800}, Key: M, Sum: 3
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723893116] >> Window: Window{startMs=1638723892801, endMs=1638723897801}, Key: M, Sum: 4
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723893116] >> Window: Window{startMs=1638723887804, endMs=1638723892804}, Key: M, Sum: 7
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723893159] >> Key: M, Value: 3
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723893162] >> Key: A, Value: 5
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723893166] >> Key: T, Value: 3
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723893169] >> Key: M, Value: 8
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723893171] >> Key: A, Value: 4
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723893175] >> Key: S, Value: 8
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723893178] >> Key: T, Value: 2
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723893181] >> Key: T, Value: 2
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723893184] >> Key: S, Value: 3
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894188] >> Window: Window{startMs=1638723888155, endMs=1638723893155}, Key: M, Sum: 10
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894189] >> Window: Window{startMs=1638723888158, endMs=1638723893158}, Key: A, Sum: 13
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894190] >> Window: Window{startMs=1638723888162, endMs=1638723893162}, Key: T, Sum: 11
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894191] >> Window: Window{startMs=1638723892805, endMs=1638723897805}, Key: M, Sum: 11
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894192] >> Window: Window{startMs=1638723892801, endMs=1638723897801}, Key: M, Sum: 15
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894193] >> Window: Window{startMs=1638723893156, endMs=1638723898156}, Key: M, Sum: 8
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894194] >> Window: Window{startMs=1638723888165, endMs=1638723893165}, Key: M, Sum: 18
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894195] >> Window: Window{startMs=1638723890563, endMs=1638723895563}, Key: A, Sum: 9
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894196] >> Window: Window{startMs=1638723890525, endMs=1638723895525}, Key: A, Sum: 14
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894197] >> Window: Window{startMs=1638723893159, endMs=1638723898159}, Key: A, Sum: 4
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894198] >> Window: Window{startMs=1638723888168, endMs=1638723893168}, Key: A, Sum: 17
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894199] >> Window: Window{startMs=1638723888171, endMs=1638723893171}, Key: S, Sum: 8
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894199] >> Window: Window{startMs=1638723888174, endMs=1638723893174}, Key: T, Sum: 13
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894200] >> Window: Window{startMs=1638723893163, endMs=1638723898163}, Key: T, Sum: 4
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894201] >> Window: Window{startMs=1638723892793, endMs=1638723897793}, Key: T, Sum: 7
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894201] >> Window: Window{startMs=1638723893175, endMs=1638723898175}, Key: T, Sum: 2
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894202] >> Window: Window{startMs=1638723888178, endMs=1638723893178}, Key: T, Sum: 15
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894202] >> Window: Window{startMs=1638723893172, endMs=1638723898172}, Key: S, Sum: 3
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894203] >> Window: Window{startMs=1638723888181, endMs=1638723893181}, Key: S, Sum: 11
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723895301] >> Key: S, Value: 3
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723895304] >> Key: T, Value: 8
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723895308] >> Key: T, Value: 1
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723895312] >> Key: A, Value: 6
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723895720] >> Key: A, Value: 8
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723895723] >> Key: T, Value: 8
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723895726] >> Key: B, Value: 7
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723895729] >> Key: T, Value: 4
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723895731] >> Key: B, Value: 8
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896234] >> Window: Window{startMs=1638723893172, endMs=1638723898172}, Key: S, Sum: 6
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896235] >> Window: Window{startMs=1638723893182, endMs=1638723898182}, Key: S, Sum: 3
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896236] >> Window: Window{startMs=1638723890296, endMs=1638723895296}, Key: S, Sum: 14
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896237] >> Window: Window{startMs=1638723890301, endMs=1638723895301}, Key: T, Sum: 23
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896238] >> Window: Window{startMs=1638723890304, endMs=1638723895304}, Key: T, Sum: 24
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896239] >> Window: Window{startMs=1638723890563, endMs=1638723895563}, Key: A, Sum: 15
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896240] >> Window: Window{startMs=1638723890525, endMs=1638723895525}, Key: A, Sum: 20
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896240] >> Window: Window{startMs=1638723890308, endMs=1638723895308}, Key: A, Sum: 23
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896241] >> Window: Window{startMs=1638723893169, endMs=1638723898169}, Key: A, Sum: 14
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896242] >> Window: Window{startMs=1638723893159, endMs=1638723898159}, Key: A, Sum: 18
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896242] >> Window: Window{startMs=1638723895309, endMs=1638723900309}, Key: A, Sum: 8
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896243] >> Window: Window{startMs=1638723890716, endMs=1638723895716}, Key: A, Sum: 23
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896243] >> Window: Window{startMs=1638723890719, endMs=1638723895719}, Key: T, Sum: 32
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896244] >> Window: Window{startMs=1638723890722, endMs=1638723895722}, Key: B, Sum: 16
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896244] >> Window: Window{startMs=1638723895305, endMs=1638723900305}, Key: T, Sum: 12
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896245] >> Window: Window{startMs=1638723895302, endMs=1638723900302}, Key: T, Sum: 13
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896245] >> Window: Window{startMs=1638723893179, endMs=1638723898179}, Key: T, Sum: 21
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896246] >> Window: Window{startMs=1638723893175, endMs=1638723898175}, Key: T, Sum: 23
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896246] >> Window: Window{startMs=1638723893163, endMs=1638723898163}, Key: T, Sum: 25
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896247] >> Window: Window{startMs=1638723892793, endMs=1638723897793}, Key: T, Sum: 28
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896247] >> Window: Window{startMs=1638723895720, endMs=1638723900720}, Key: T, Sum: 4
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896248] >> Window: Window{startMs=1638723890725, endMs=1638723895725}, Key: T, Sum: 36
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896248] >> Window: Window{startMs=1638723892797, endMs=1638723897797}, Key: B, Sum: 15
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896249] >> Window: Window{startMs=1638723895723, endMs=1638723900723}, Key: B, Sum: 8
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896249] >> Window: Window{startMs=1638723890728, endMs=1638723895728}, Key: B, Sum: 24

Analyzing the lines from the Output.9 above we can see the sliding window aggregation working as expected.


References

Introduction to Stream Analytics Windowing Functions

Exploring Kafka Streams :: Part 4

Exploring Kafka Streams :: Part 3

Exploring Kafka Streams :: Part 2

Exploring Kafka Streams :: Part 1

Kafka Streams Developer Guide



© PolarSPARC