PolarSPARC

Exploring Kafka Streams :: Part 6


Bhaskar S 12/11/2021


Overview

In Part 5 of this series, we explored the different windowing schemes available in Kafka Streams for use in aggregations.

In the previous part, we displayed the systems current time (in milliseconds) in the log output to guesstimate if an event belonged within a time window (for a chosen window type). Ideally, we should have displayed the timestamp of the data event (or record) for this purpose. How do we get access to the data event (or record) timestamp ???

Next, from Part 3 of the series, we saw how the aggregation operations (such as count(), aggregate(), reduce(), etc) leverage the state store to save the aggregated value. By default, they use a disk based persistent key-value store. Can you instead use other types of key-value store, such as an in-memory store ???

In this part of the series, we will demonstrate the same three Windowing techniques as we saw in the previous part of the series, but using the data event (or record) timestamp and using an in-memory state store.

Kafka Streams Concepts

Timestamp

The time window for aggregation depends on the data event (or record) timestamp. The default type of the timestamp for a data event (or record) in Kafka is event-time, which is the time when the Kafka producer sent the event to the Kafka broker. The other timestamp type in Kafka is the ingestion-time, which is the time when the Kafka broker received the data event (or record).

In order to extract the timestamp from the data event (or record), one has to implement a custom timestamp extractor class and point to it using the property default.timestamp.extractor. By default, this property is set to the class org.apache.kafka.streams.processor.FailOnInvalidTimestamp.

In-memory State Store

From Part 3, we learnt that the default state store used in Kafka Streams is a high-performance, disk based key-value store called RocksDB. One can change this default state store to use an in-memory key-value store.

Hands-on with Kafka Streams

We will update the Java utility class KafkaConsumerConfig from the Common module located in the directory $HOME/java/KafkaStreams/Common to add a new convenience method as shown below:


Listing.1
/*
 * Name:   Kafka Consumer Configuration
 * Author: Bhaskar S
 * Date:   11/10/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.kstreams;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;

import java.util.Properties;

public final class KafkaConsumerConfig {
    public static Properties kafkaConfigurationOne(String appId) {
        Properties config = new Properties();

        config.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:20001");
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        return config;
    }

    public static Properties kafkaConfigurationTwo(String appId, int numThr) {
        Properties config = kafkaConfigurationOne(appId);

        config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThr);
        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
        config.put(StreamsConfig.STATE_DIR_CONFIG, "/home/polarsparc/kafka/state");

        return config;
    }

    public static Properties kafkaConfigurationThree(String appId, int numThr, long totMemSz) {
        Properties config = kafkaConfigurationTwo(appId, numThr);

        config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, totMemSz);

        return config;
    }

    public static Properties kafkaConfigurationFour(String appId, int numThr, Serde<?> serde) {
      Properties config = kafkaConfigurationTwo(appId, numThr);

      config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, serde.getClass());

      return config;
  }

  public static Properties kafkaConfigurationFive(String appId, int numThr, Serde<?> serde, Class<?> clazz) {
      Properties config = kafkaConfigurationFour(appId, numThr, serde);

      config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, clazz);

      return config;
  }

    private KafkaConsumerConfig() {}
}

The property StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG allows one to configure the data event (or record) timestamp extractor.

Since we modified the Common module, we need to once again compile and deploy the Common module so that the other Java modules can use it. To do that, open a terminal window and run the following commands:

$ $HOME/java/KafkaStreams/Common

$ mvn clean install

Fifth Application

In the Fifth module, we will simulate a data event stream, where each event contains a fictitious product key (as a string) and how many quantities were sold (as an integer) as value. We will demonstrate a STATEFUL Kafka Streams application that will consume the stream of data events from the specified Kafka topic and aggregate the total quantity of product sold using the three windowing techniques - tumbling, hopping, and sliding.

To setup the Java directory structure for the Fifth application, execute the following commands:

$ cd $HOME/java/KafkaStreams

$ mkdir -p $HOME/java/KafkaStreams/Fifth

$ mkdir -p Fifth/src/main/java Fifth/src/main/resources Fifth/target

$ mkdir -p Fifth/src/main/java/com/polarsparc/kstreams

$ mkdir -p Fifth/src/main/java/com/polarsparc/kstreams/model

$ mkdir -p Fifth/src/main/java/com/polarsparc/kstreams/publisher

$ cd $HOME/java/KafkaStreams/Fifth


The following is the listing for the Maven project file pom.xml that will be used:


pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <artifactId>KafkaStreams</artifactId>
        <groupId>com.polarsparc.kstreams</groupId>
        <version>1.0</version>
    </parent>

    <artifactId>Fifth</artifactId>
    <version>1.0</version>

    <dependencies>
        <dependency>
            <artifactId>Common</artifactId>
            <groupId>com.polarsparc.kstreams</groupId>
            <version>1.0</version>
        </dependency>
    </dependencies>
</project>

We need to modify the <modules> section in the parent pom.xml to include the Fifth module as shown below:


pom.xml (parent)
<modules>
  <module>Common</module>
  <module>First</module>
  <module>Second</module>
  <module>Third</module>
  <module>Fourth</module>
  <module>Fifth</module>
</modules>

The following is the Java POJO that encapsulates the product/quantity alert with a field to hold the data event (or record) timestamp:


Listing.2
/*
 * Name:   Encapsulation of the product, quantity, and timestamp values
 * Author: Bhaskar S
 * Date:   12/10/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.kstreams.model;

public class QtyEvent {
    private String product;
    private int quantity;
    private long timestamp;

    public QtyEvent() {
        this.product = null;
        this.quantity = 0;
        this.timestamp = 0;
    }

    public QtyEvent(String p, int q) {
        this.product = p;
        this.quantity = q;
        this.timestamp = 0;
    }

    public String getProduct() {
        return product;
    }

    public void setProduct(String p) {
        this.product = p;
    }

    public int getQuantity() {
        return quantity;
    }

    public void setQuantity(int q) {
        this.quantity = q;
    }

    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(long t) {
        this.timestamp = t;
    }

    public String toString() {
        return String.format("{%s, %d, %d}", product, quantity, timestamp);
    }
}

We will need a Kafka producer that will generate the stream of data events. The following is the Java Kafka publisher:


Listing.3
/*
 * Name:   Stream Data Event Generator for QtyEvent
 * Author: Bhaskar S
 * Date:   12/10/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.kstreams.publisher;

import com.polarsparc.kstreams.model.QtyEvent;
import com.polarsparc.kstreams.serde.JsonSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Random;

public class StreamQtyEventGenerator {
    private final static int MAX_EVENTS = 10;
    private final static int MAX_GAP = 3000; // 3 seconds = 3000 ms

    private static final Logger log = LoggerFactory.getLogger(StreamQtyEventGenerator.class.getName());

    private final static Random random = new Random(1001);

    private final static List<String> keysList = Arrays.asList("A", "M", "S", "B", "N", "T");
    private final static List<Integer> valuesList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);

    private static KafkaProducer<String, QtyEvent> createEventProducer() {
        Properties config = new Properties();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:20001");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
        return new KafkaProducer<>(config);
    }

    private static void generateDataEvent(boolean flag, String topic, Producer<String, QtyEvent> producer) {
        log.info(String.format("   ---> Topic: %s", topic));

        int cnt = random.nextInt(MAX_EVENTS);
        int gap = random.nextInt(MAX_GAP);

        log.info(String.format("   ---> Events Count: %d", cnt));

        for (int i = 1; i <= cnt; i++) {
            int ki = random.nextInt(keysList.size());
            int vi = random.nextInt(valuesList.size());

            String key = keysList.get(ki);
            Integer value = valuesList.get(vi);

            QtyEvent evt = new QtyEvent(key, value);

            log.info(String.format("   ---> [%d] Key: %s, Event: %s", i, key, evt));

            if (!flag) {
                try {
                    producer.send(new ProducerRecord<>(topic, key, evt)).get();
                }
                catch (Exception ex) {
                    log.error(ex.getMessage());
                }
            }
        }

        try {
            Thread.sleep(gap);
        }
        catch (Exception ignore)  {
        }

        log.info(String.format("   ---> Sleep Gap: %d", gap));
    }

    public static void main(String[] args) {
        if (args.length < 1) {
            System.out.printf("Usage: java %s <topic-name> [--dry-run]\n", StreamQtyEventGenerator.class.getName());
            System.exit(1);
        }

        boolean dryRun = args.length == 2 && args[1].equalsIgnoreCase("--dry-run");

        Producer<String, QtyEvent> producer = null;
        if (!dryRun) {
            producer = createEventProducer();
        }

        for (int i = 1; i <= 5; i++) {
            log.info(String.format("---------> Iteration: %d", i));

            generateDataEvent(dryRun, args[0], producer);
        }

        if (!dryRun) {
            producer.close();
        }
    }
}

In the Kafka producer, we use a fixed value as the random number generator seed to emit a predictable set of values. In each iteration, the Kafka producer will publish a set of random events and then sleep for a random time (that is less than 3 seconds) before continuing with the next iteration.

In addition, there is a --dry-run option to display the data events (without publishing to Kafka).

The following is the custom timestamp extractor class that will extract the data event (or Kafka record) timestamp and set a field in the value object:


Listing.4
/*
 * Name:   QtyEvent timestamp extractor
 * Author: Bhaskar S
 * Date:   12/10/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.kstreams;

import com.polarsparc.kstreams.model.QtyEvent;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;

public class QtyEventTimeExtractor implements TimestampExtractor {
    @Override
    public long extract(ConsumerRecord<Object, Object> consumerRecord, long prevTimestamp) {
        long ts = consumerRecord.timestamp();

        QtyEvent evt = (QtyEvent) consumerRecord.value();
        evt.setTimestamp(ts);

        return ts;
    }
}

The following is the Java based STATEFUL Kafka Streams application that consumes and processes events from the specific Kafka topic based on the input argument (that controls the type of windowing technique):


Listing.5
/*
 * Name:   Sum quantity values using various Windowing options
 * Author: Bhaskar S
 * Date:   12/10/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.kstreams;

import com.polarsparc.kstreams.model.QtyEvent;
import com.polarsparc.kstreams.serde.JsonDeserializer;
import com.polarsparc.kstreams.serde.JsonSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.Stores;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;

public class WindowedQtySum {
    private static void usage() {
        System.out.printf("Usage: java %s <TM | HP | SL>\n", WindowedQtySum.class.getName());
        System.exit(1);
    }

    public static void main(String[] args) {
        if (args.length != 1) {
            usage();
        }

        String topicName = switch (args[0]) {
            case "TM" -> "tumbling-events";
            case "HP" -> "hopping-events";
            case "SL" -> "sliding-events";
            default -> null;
        };

        if (topicName == null) {
            usage();
        }

        Logger log = LoggerFactory.getLogger(WindowedQtySum.class.getName());

        log.info(String.format("---> Event type: %s", topicName));

        StreamsConfig config = new StreamsConfig(KafkaConsumerConfig.kafkaConfigurationFive(topicName, 1,
                Serdes.Integer(), QtyEventTimeExtractor.class));

        JsonSerializer<QtyEvent> jsonSer = new JsonSerializer<>();
        JsonDeserializer<QtyEvent> jsonDe = new JsonDeserializer<>(QtyEvent.class);

        Serde<String> stringSerde = Serdes.String();
        Serde<QtyEvent> jsonSerde = Serdes.serdeFrom(jsonSer, jsonDe);

        // Window duration
        Duration windowSz = Duration.ofSeconds(5);

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, QtyEvent> stream = builder.stream(topicName, Consumed.with(stringSerde, jsonSerde));

        KGroupedStream<String, Integer> groupedStream = stream
                .peek((key, event) -> log.info(String.format("---> [%d] >> Key: %s, Event: %s",
                        System.currentTimeMillis(), key, event)))
                .mapValues(QtyEvent::getQuantity)
                .groupByKey();

        TimeWindowedKStream<String, Integer> windowedStream = switch (args[0]) {
            default -> { // TM is the default
                TimeWindows tumblingWindow = TimeWindows.ofSizeWithNoGrace(windowSz);
                tumblingWindow.advanceBy(windowSz); // *IMPORTANT* for tumbling window size = advance
                yield groupedStream.windowedBy(tumblingWindow);
            }
            case "HP" -> {
                Duration advanceSz = Duration.ofSeconds(2);
                TimeWindows hoppingWindow = TimeWindows.ofSizeWithNoGrace(windowSz);
                hoppingWindow.advanceBy(advanceSz);
                yield groupedStream.windowedBy(hoppingWindow);
            }
            case "SL" -> {
                Duration graceSz = Duration.ofMillis(500); // Grace period
                SlidingWindows slidingWindow = SlidingWindows.ofTimeDifferenceAndGrace(windowSz, graceSz);
                yield groupedStream.windowedBy(slidingWindow);
            }
        };
        windowedStream.reduce(Integer::sum, Materialized.as(Stores.inMemoryWindowStore(topicName,
                        windowSz, windowSz, false)))
                .toStream()
                .peek((winKey, sum) -> log.info(String.format("---> [%d] >> Window: %s, Key: %s, Sum: %d",
                        System.currentTimeMillis(), winKey.window().toString(), winKey.key(), sum)));

        Topology topology = builder.build();

        log.info(String.format("---> %s", topology.describe().toString()));

        KafkaStreams streams = new KafkaStreams(topology, config);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

The code from Listing.5 above needs some explanation:

Notice that we call the method KafkaConsumerConfig.kafkaConfigurationFive(...) to indicate the timestamp extractor class com.polarsparc.kstreams.QtyEventTimeExtractor to use. Before the data event reaches the first DSL call peek(ACTION), the timestamp from the Kafka message being consumed is extracted and set in the corresponding QtyEvent object instance.

The class org.apache.kafka.streams.state.Stores is the factory object for creating the various state stores, such as the persistent key-value store or the in-memory key-value store.

The static method Stores.inMemoryWindowStore(NAME, RETENTION, WINDOW, DUPLICATES) returns an instance of in-memory key-value store that is identified by NAME, maintains history of aggregated values for upto RETENTION period, uses a time window period of length WINDOW, and the last parameter DUPLICATES indicates whether or not duplicate keys are allowed in the time window period.

The static method Materialized.as(STORE) indicates which specified state STORE is to be used to save the aggregated values.

Now, we need to reset the Kafka topics tumbling-events, hopping-events, and sliding-events corresponding to the three windowing techniques we want to demonstrate. To do that, execute the following commands:

docker run --rm --net=host confluentinc/cp-kafka:7.0.0 kafka-topics --delete --topic tumbling-events --bootstrap-server localhost:20001

docker run --rm --net=host confluentinc/cp-kafka:7.0.0 kafka-topics --create --topic tumbling-events --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server localhost:20001

docker run --rm --net=host confluentinc/cp-kafka:7.0.0 kafka-topics --delete --topic hopping-events --bootstrap-server localhost:20001

docker run --rm --net=host confluentinc/cp-kafka:7.0.0 kafka-topics --create --topic hopping-events --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server localhost:20001

docker run --rm --net=host confluentinc/cp-kafka:7.0.0 kafka-topics --delete --topic sliding-events --bootstrap-server localhost:20001

docker run --rm --net=host confluentinc/cp-kafka:7.0.0 kafka-topics --create --topic sliding-events --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server localhost:20001

Now is time to compile the code for module Fifth. To do that, open a terminal window and execute the following commands:

$ $HOME/java/KafkaStreams/Fifth

$ mvn clean compile

To run the code from Listing.5 using the tumbling window technique, open a terminal window and execute the following commands:

$ $HOME/java/KafkaStreams/Fifth

$ mvn exec:java -Dexec.mainClass=com.polarsparc.kstreams.WindowedQtySum -Dexec.args="TM"

The following should be the typical output:

Output.1

... SNIP ...
[com.polarsparc.kstreams.WindowedQtySum.main()] INFO com.polarsparc.kstreams.WindowedQtySum - ---> Topologies:
    Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [tumbling-events])
      --> KSTREAM-PEEK-0000000001
    Processor: KSTREAM-PEEK-0000000001 (stores: [])
      --> KSTREAM-MAPVALUES-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])
      --> KSTREAM-REDUCE-0000000003
      <-- KSTREAM-PEEK-0000000001
    Processor: KSTREAM-REDUCE-0000000003 (stores: [tumbling-events])
      --> KTABLE-TOSTREAM-0000000004
      <-- KSTREAM-MAPVALUES-0000000002
    Processor: KTABLE-TOSTREAM-0000000004 (stores: [])
      --> KSTREAM-PEEK-0000000005
      <-- KSTREAM-REDUCE-0000000003
    Processor: KSTREAM-PEEK-0000000005 (stores: [])
      --> none
      <-- KTABLE-TOSTREAM-0000000004
... SNIP ...
[tumbling-events-50ad9467-8ee5-4fbb-8948-09beea5b7f4b-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [tumbling-events-50ad9467-8ee5-4fbb-8948-09beea5b7f4b] State transition from REBALANCING to RUNNING
[tumbling-events-50ad9467-8ee5-4fbb-8948-09beea5b7f4b-StreamThread-1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=tumbling-events-50ad9467-8ee5-4fbb-8948-09beea5b7f4b-StreamThread-1-consumer, groupId=tumbling-events] Requesting the log end offset for tumbling-events-0 in order to compute lag

Notice the name of the store - it is tumbling-events as specified when creating the store. Also, we will see there are *NO* directories/files created under /home/polarsparc/kafka/state.

To run the producer code from Listing.3 to publish to the desired Kafka topic tumbling-events, open a terminal window and execute the following commands:

$ $HOME/java/KafkaStreams/Fifth

$ mvn exec:java -Dexec.mainClass=com.polarsparc.kstreams.publisher.StreamQtyEventGenerator -Dexec.args="tumbling-events"

The following would be the additional output in the terminal running the application from Listing.5 above:

Output.2

[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253510860] >> Key: A, Event: {A, 3, 1639253510712}
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253510866] >> Key: B, Event: {B, 7, 1639253510815}
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253510866] >> Key: A, Event: {A, 5, 1639253510820}
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253511289] >> Window: Window{startMs=1639253510000, endMs=1639253515000}, Key: B, Sum: 7
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253511290] >> Window: Window{startMs=1639253510000, endMs=1639253515000}, Key: A, Sum: 8
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253513058] >> Key: T, Event: {T, 8, 1639253513051}
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253513062] >> Key: B, Event: {B, 9, 1639253513056}
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253513066] >> Key: M, Event: {M, 3, 1639253513061}
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253513069] >> Key: M, Event: {M, 4, 1639253513065}
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253513372] >> Window: Window{startMs=1639253510000, endMs=1639253515000}, Key: T, Sum: 8
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253513373] >> Window: Window{startMs=1639253510000, endMs=1639253515000}, Key: B, Sum: 16
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253513375] >> Window: Window{startMs=1639253510000, endMs=1639253515000}, Key: M, Sum: 7
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253513422] >> Key: M, Event: {M, 3, 1639253513415}
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253513425] >> Key: A, Event: {A, 5, 1639253513420}
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253513427] >> Key: T, Event: {T, 3, 1639253513423}
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253513430] >> Key: M, Event: {M, 8, 1639253513426}
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253513433] >> Key: A, Event: {A, 4, 1639253513429}
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253513438] >> Key: S, Event: {S, 8, 1639253513432}
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253513440] >> Key: T, Event: {T, 2, 1639253513437}
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253513444] >> Key: T, Event: {T, 2, 1639253513439}
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253513448] >> Key: S, Event: {S, 3, 1639253513442}
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253514454] >> Window: Window{startMs=1639253510000, endMs=1639253515000}, Key: M, Sum: 18
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253514455] >> Window: Window{startMs=1639253510000, endMs=1639253515000}, Key: A, Sum: 17
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253514457] >> Window: Window{startMs=1639253510000, endMs=1639253515000}, Key: T, Sum: 15
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253514458] >> Window: Window{startMs=1639253510000, endMs=1639253515000}, Key: S, Sum: 11
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253515565] >> Key: S, Event: {S, 3, 1639253515559}
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253515568] >> Key: T, Event: {T, 8, 1639253515563}
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253515572] >> Key: T, Event: {T, 1, 1639253515567}
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253515575] >> Key: A, Event: {A, 6, 1639253515571}
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253515986] >> Key: A, Event: {A, 8, 1639253515979}
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253515989] >> Key: T, Event: {T, 8, 1639253515984}
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253515993] >> Key: B, Event: {B, 7, 1639253515988}
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253515997] >> Key: T, Event: {T, 4, 1639253515991}
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253516000] >> Key: B, Event: {B, 8, 1639253515996}
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253516503] >> Window: Window{startMs=1639253515000, endMs=1639253520000}, Key: S, Sum: 3
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253516504] >> Window: Window{startMs=1639253515000, endMs=1639253520000}, Key: A, Sum: 14
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253516505] >> Window: Window{startMs=1639253515000, endMs=1639253520000}, Key: T, Sum: 21
[tumbling-events-4a05503a-f106-4895-b827-837be00bdfd0-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253516506] >> Window: Window{startMs=1639253515000, endMs=1639253520000}, Key: B, Sum: 15

Analyzing the lines from the Output.2 above we can see the tumbling window aggregation working as expected.

Next, to run the code from Listing.5 using the hopping window technique, open a terminal window and execute the following commands:

$ $HOME/java/KafkaStreams/Fifth

$ mvn exec:java -Dexec.mainClass=com.polarsparc.kstreams.WindowedQtySum -Dexec.args="HP"

We will not show the output as it will be similar in nature to Output.5 above (except will indicate hopping-events).

Now, to re-run the producer code from Listing.3 to publish to the Kafka topic hopping-events, open a terminal window and execute the following commands:

$ $HOME/java/KafkaStreams/Fifth

$ mvn exec:java -Dexec.mainClass=com.polarsparc.kstreams.publisher.StreamQtyEventGenerator -Dexec.args="hopping-events"

The following would be the additional output in the terminal running the application from Listing.5 above (in the hopping window mode):

Output.3

[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253848339] >> Key: A, Event: {A, 3, 1639253848236}
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253848344] >> Key: B, Event: {B, 7, 1639253848293}
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253848344] >> Key: A, Event: {A, 5, 1639253848297}
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253849167] >> Window: Window{startMs=1639253845000, endMs=1639253850000}, Key: B, Sum: 7
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253849168] >> Window: Window{startMs=1639253845000, endMs=1639253850000}, Key: A, Sum: 8
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253850533] >> Key: T, Event: {T, 8, 1639253850528}
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253850538] >> Key: B, Event: {B, 9, 1639253850533}
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253850541] >> Key: M, Event: {M, 3, 1639253850537}
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253850545] >> Key: M, Event: {M, 4, 1639253850541}
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253850896] >> Key: M, Event: {M, 3, 1639253850890}
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253850899] >> Key: A, Event: {A, 5, 1639253850894}
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253850903] >> Key: T, Event: {T, 3, 1639253850898}
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253850905] >> Key: M, Event: {M, 8, 1639253850901}
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253850907] >> Key: A, Event: {A, 4, 1639253850904}
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253850911] >> Key: S, Event: {S, 8, 1639253850907}
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253850913] >> Key: T, Event: {T, 2, 1639253850910}
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253850916] >> Key: T, Event: {T, 2, 1639253850913}
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253850921] >> Key: S, Event: {S, 3, 1639253850916}
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253851224] >> Window: Window{startMs=1639253850000, endMs=1639253855000}, Key: B, Sum: 9
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253851225] >> Window: Window{startMs=1639253850000, endMs=1639253855000}, Key: M, Sum: 18
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253851226] >> Window: Window{startMs=1639253850000, endMs=1639253855000}, Key: A, Sum: 9
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253851227] >> Window: Window{startMs=1639253850000, endMs=1639253855000}, Key: T, Sum: 15
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253851228] >> Window: Window{startMs=1639253850000, endMs=1639253855000}, Key: S, Sum: 11
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253853038] >> Key: S, Event: {S, 3, 1639253853032}
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253853042] >> Key: T, Event: {T, 8, 1639253853036}
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253853046] >> Key: T, Event: {T, 1, 1639253853040}
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253853048] >> Key: A, Event: {A, 6, 1639253853043}
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253853250] >> Window: Window{startMs=1639253850000, endMs=1639253855000}, Key: S, Sum: 14
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253853251] >> Window: Window{startMs=1639253850000, endMs=1639253855000}, Key: T, Sum: 24
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253853252] >> Window: Window{startMs=1639253850000, endMs=1639253855000}, Key: A, Sum: 15
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253853455] >> Key: A, Event: {A, 8, 1639253853450}
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253853459] >> Key: T, Event: {T, 8, 1639253853454}
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253853463] >> Key: B, Event: {B, 7, 1639253853458}
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253853466] >> Key: T, Event: {T, 4, 1639253853462}
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253853469] >> Key: B, Event: {B, 8, 1639253853465}
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253854272] >> Window: Window{startMs=1639253850000, endMs=1639253855000}, Key: A, Sum: 23
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253854273] >> Window: Window{startMs=1639253850000, endMs=1639253855000}, Key: T, Sum: 36
[hopping-events-5da14dc3-6953-4fca-86c4-e99e3dedad35-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639253854274] >> Window: Window{startMs=1639253850000, endMs=1639253855000}, Key: B, Sum: 24

Analyzing the lines from the Output.3 above we can see the hopping window aggregation working as expected.

Finally, to run the code from Listing.5 using the sliding window technique, open a terminal window and execute the following commands:

$ $HOME/java/KafkaStreams/Fifth

$ mvn exec:java -Dexec.mainClass=com.polarsparc.kstreams.WindowedQtySum -Dexec.args="SL"

Now, to re-run the producer code from Listing.3 to publish to the Kafka topic sliding-events, open a terminal window and execute the following commands:

$ $HOME/java/KafkaStreams/Fifth

$ mvn exec:java -Dexec.mainClass=com.polarsparc.kstreams.publisher.StreamQtyEventGenerator -Dexec.args="sliding-events"

The following would be the additional output in the terminal running the application from Listing.5 above (in the sliding window mode):

Output.4

[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254129409] >> Key: A, Event: {A, 3, 1639254129300}
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254129421] >> Key: B, Event: {B, 7, 1639254129368}
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254129424] >> Key: A, Event: {A, 5, 1639254129372}
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254129746] >> Window: Window{startMs=1639254124300, endMs=1639254129300}, Key: A, Sum: 3
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254129747] >> Window: Window{startMs=1639254124368, endMs=1639254129368}, Key: B, Sum: 7
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254129748] >> Window: Window{startMs=1639254129301, endMs=1639254134301}, Key: A, Sum: 5
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254129749] >> Window: Window{startMs=1639254124372, endMs=1639254129372}, Key: A, Sum: 8
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254131609] >> Key: T, Event: {T, 8, 1639254131603}
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254131613] >> Key: B, Event: {B, 9, 1639254131608}
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254131618] >> Key: M, Event: {M, 3, 1639254131612}
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254131621] >> Key: M, Event: {M, 4, 1639254131617}
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254131824] >> Window: Window{startMs=1639254126603, endMs=1639254131603}, Key: T, Sum: 8
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254131825] >> Window: Window{startMs=1639254129369, endMs=1639254134369}, Key: B, Sum: 9
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254131826] >> Window: Window{startMs=1639254126608, endMs=1639254131608}, Key: B, Sum: 16
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254131826] >> Window: Window{startMs=1639254126612, endMs=1639254131612}, Key: M, Sum: 3
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254131827] >> Window: Window{startMs=1639254131613, endMs=1639254136613}, Key: M, Sum: 4
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254131828] >> Window: Window{startMs=1639254126617, endMs=1639254131617}, Key: M, Sum: 7
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254131974] >> Key: M, Event: {M, 3, 1639254131969}
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254131979] >> Key: A, Event: {A, 5, 1639254131973}
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254131984] >> Key: T, Event: {T, 3, 1639254131980}
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254131988] >> Key: M, Event: {M, 8, 1639254131984}
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254131991] >> Key: A, Event: {A, 4, 1639254131987}
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254131994] >> Key: S, Event: {S, 8, 1639254131990}
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254131998] >> Key: T, Event: {T, 2, 1639254131994}
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254132001] >> Key: T, Event: {T, 2, 1639254131997}
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254132007] >> Key: S, Event: {S, 3, 1639254132001}
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254132911] >> Window: Window{startMs=1639254126969, endMs=1639254131969}, Key: M, Sum: 10
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254132912] >> Window: Window{startMs=1639254126973, endMs=1639254131973}, Key: A, Sum: 13
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254132913] >> Window: Window{startMs=1639254126980, endMs=1639254131980}, Key: T, Sum: 11
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254132914] >> Window: Window{startMs=1639254131618, endMs=1639254136618}, Key: M, Sum: 11
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254132916] >> Window: Window{startMs=1639254131613, endMs=1639254136613}, Key: M, Sum: 15
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254132916] >> Window: Window{startMs=1639254131970, endMs=1639254136970}, Key: M, Sum: 8
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254132917] >> Window: Window{startMs=1639254126984, endMs=1639254131984}, Key: M, Sum: 18
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254132918] >> Window: Window{startMs=1639254129373, endMs=1639254134373}, Key: A, Sum: 9
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254132918] >> Window: Window{startMs=1639254129301, endMs=1639254134301}, Key: A, Sum: 14
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254132919] >> Window: Window{startMs=1639254131974, endMs=1639254136974}, Key: A, Sum: 4
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254132919] >> Window: Window{startMs=1639254126987, endMs=1639254131987}, Key: A, Sum: 17
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254132920] >> Window: Window{startMs=1639254126990, endMs=1639254131990}, Key: S, Sum: 8
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254132921] >> Window: Window{startMs=1639254126994, endMs=1639254131994}, Key: T, Sum: 13
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254132921] >> Window: Window{startMs=1639254131981, endMs=1639254136981}, Key: T, Sum: 4
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254132922] >> Window: Window{startMs=1639254131604, endMs=1639254136604}, Key: T, Sum: 7
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254132922] >> Window: Window{startMs=1639254131995, endMs=1639254136995}, Key: T, Sum: 2
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254132923] >> Window: Window{startMs=1639254126997, endMs=1639254131997}, Key: T, Sum: 15
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254132923] >> Window: Window{startMs=1639254131991, endMs=1639254136991}, Key: S, Sum: 3
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254132924] >> Window: Window{startMs=1639254127001, endMs=1639254132001}, Key: S, Sum: 11
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134123] >> Key: S, Event: {S, 3, 1639254134117}
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134128] >> Key: T, Event: {T, 8, 1639254134122}
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134132] >> Key: T, Event: {T, 1, 1639254134125}
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134135] >> Key: A, Event: {A, 6, 1639254134129}
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134543] >> Key: A, Event: {A, 8, 1639254134538}
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134547] >> Key: T, Event: {T, 8, 1639254134542}
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134551] >> Key: B, Event: {B, 7, 1639254134546}
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134553] >> Key: T, Event: {T, 4, 1639254134549}
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134557] >> Key: B, Event: {B, 8, 1639254134553}
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134960] >> Window: Window{startMs=1639254131991, endMs=1639254136991}, Key: S, Sum: 6
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134961] >> Window: Window{startMs=1639254132002, endMs=1639254137002}, Key: S, Sum: 3
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134962] >> Window: Window{startMs=1639254129117, endMs=1639254134117}, Key: S, Sum: 14
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134963] >> Window: Window{startMs=1639254129122, endMs=1639254134122}, Key: T, Sum: 23
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134964] >> Window: Window{startMs=1639254129125, endMs=1639254134125}, Key: T, Sum: 24
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134965] >> Window: Window{startMs=1639254129373, endMs=1639254134373}, Key: A, Sum: 15
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134966] >> Window: Window{startMs=1639254129301, endMs=1639254134301}, Key: A, Sum: 20
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134967] >> Window: Window{startMs=1639254129129, endMs=1639254134129}, Key: A, Sum: 23
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134968] >> Window: Window{startMs=1639254131988, endMs=1639254136988}, Key: A, Sum: 14
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134968] >> Window: Window{startMs=1639254131974, endMs=1639254136974}, Key: A, Sum: 18
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134969] >> Window: Window{startMs=1639254134130, endMs=1639254139130}, Key: A, Sum: 8
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134969] >> Window: Window{startMs=1639254129538, endMs=1639254134538}, Key: A, Sum: 23
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134970] >> Window: Window{startMs=1639254129542, endMs=1639254134542}, Key: T, Sum: 32
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134970] >> Window: Window{startMs=1639254129546, endMs=1639254134546}, Key: B, Sum: 16
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134971] >> Window: Window{startMs=1639254134126, endMs=1639254139126}, Key: T, Sum: 12
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134971] >> Window: Window{startMs=1639254134123, endMs=1639254139123}, Key: T, Sum: 13
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134972] >> Window: Window{startMs=1639254131998, endMs=1639254136998}, Key: T, Sum: 21
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134972] >> Window: Window{startMs=1639254131995, endMs=1639254136995}, Key: T, Sum: 23
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134973] >> Window: Window{startMs=1639254131981, endMs=1639254136981}, Key: T, Sum: 25
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134973] >> Window: Window{startMs=1639254131604, endMs=1639254136604}, Key: T, Sum: 28
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134974] >> Window: Window{startMs=1639254134543, endMs=1639254139543}, Key: T, Sum: 4
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134974] >> Window: Window{startMs=1639254129549, endMs=1639254134549}, Key: T, Sum: 36
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134975] >> Window: Window{startMs=1639254131609, endMs=1639254136609}, Key: B, Sum: 15
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134975] >> Window: Window{startMs=1639254134547, endMs=1639254139547}, Key: B, Sum: 8
[sliding-events-c6ae6aee-11ca-483c-8e12-432416340ace-StreamThread-1] INFO com.polarsparc.kstreams.WindowedQtySum - ---> [1639254134976] >> Window: Window{startMs=1639254129553, endMs=1639254134553}, Key: B, Sum: 24

Analyzing the lines from the Output.4 above we can see the sliding window aggregation working as expected.


References

Exploring Kafka Streams :: Part 5

Exploring Kafka Streams :: Part 4

Exploring Kafka Streams :: Part 3

Exploring Kafka Streams :: Part 2

Exploring Kafka Streams :: Part 1

Kafka Streams Developer Guide



© PolarSPARC