PolarSPARC

Exploring Kafka Streams :: Part 4


Bhaskar S 11/26/2021


Overview

In Part 3 of this series, we continued to dig in a little deeper into Kafka Streams and explored the concepts around state store and record cache.

In this part of the series, we will shift gears to explore serialization and deserialization of custom data objects with Kafka Streams.

Kafka Streams Concepts

Data Serialization and Deserialization

As we have seen thus far, every Kafka Streams application must specify the appropriate serializer and deserializer (serde) class for both the key as well as the data value. Until now, we only used primitive data types, such as String, Long, etc., for the Kafka keys as well as the data event values. Typical Enterprise applications involve the use of custom data objects. In those cases, one needs to be able to convert an instance of the custom object to an array of bytes and from an array of bytes back to an instance of the custom object. This process of converting an object instance to an array of bytes is called Serialization and the process of creating an instance of the custom object from an array of bytes is called Deserialization.

In order to support a custom data object (or Java POJO), one must perform the following steps:

Hands-on with Kafka Streams

Third Application

In the Third module, we will simulate the price functuation of a handful of fictitious crypto coins. The price alerts of these fictitious crypto coins will be encapsulated in a Java POJO and a JSON representation of the POJO will be published to the Kafka topic crypto-price-alerts. We will demonstrate a STATEFUL Kafka Streams application that will consume the JSON data events from the Kafka topic, filter only those events whose price change has DECREASED greater than 25% and collect the lowest price event for the various fictitious crypto coins.

We will make some changes to the Common module located in the directory $HOME/java/KafkaStreams/Common to include the custom serializer, deserializer, and a utility class.

To add an additional Java directory to the Common module, execute the following commands:

$ cd $HOME/java/KafkaStreams

$ mkdir -p $HOME/java/KafkaStreams/Common

$ mkdir -p Common/src/main/java/com/polarsparc/kstreams/serde


The following is the listing for the updated Maven project file pom.xml:


pom.xml (Common)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <artifactId>KafkaStreams</artifactId>
        <groupId>com.polarsparc.kstreams</groupId>
        <version>1.0</version>
    </parent>

    <artifactId>Common</artifactId>
    <version>1.0</version>

    <dependencies>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.13.0</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jsr310</artifactId>
            <version>2.13.0</version>
        </dependency>
    </dependencies>
</project>

The following is the Java utility class related to the fictitious crypto coins:


Listing.1
/*
 * Name:   Common Utilities
 * Author: Bhaskar S
 * Date:   11/25/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.kstreams;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

public interface KafkaStreamsUtils {
    static ObjectMapper getObjectMapper() {
        final ObjectMapper mapper = new ObjectMapper();

        // Very important for handling LocalDateTime
        {
            mapper.registerModule(new JavaTimeModule());
            mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        }

        return mapper;
    }
}

### ATTENTION ###

Not registering the JavaTimeModule with the ObjectMapper will result in the following error when trying to convert the CryptoAlert class to JSON:
Java 8 date/time type `java.time.LocalDateTime` not supported by default: add Module "com.fasterxml.jackson.datatype:jackson-datatype-jsr310" to enable handling

The following is the custom Java serializer class that will serialize any Java object to JSON:


Listing.2
/*
 * Name:   JSON Serializer
 * Author: Bhaskar S
 * Date:   11/25/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.kstreams.serde;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.polarsparc.kstreams.common.Crypto;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

public class JsonSerializer<T> implements Serializer<T> {
    private final ObjectMapper mapper = Crypto.getObjectMapper();

    @Override
    public byte[] serialize(String s, T data) {
        if (data == null) {
            return null;
        }
        try {
            return mapper.writeValueAsBytes(data);
        }
        catch (Exception ex) {
            throw new SerializationException(ex.getMessage());
        }
    }
}

The following is the custom Java deserializer class that will deserialize JSON to the specified Java class object:


Listing.3
/*
 * Name:   JSON Deserializer
 * Author: Bhaskar S
 * Date:   11/25/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.kstreams.serde;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.polarsparc.kstreams.common.Crypto;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

public class JsonDeserializer<T> implements Deserializer<T> {
    private final ObjectMapper mapper = Crypto.getObjectMapper();

    private final Class<T> clazz;

    public JsonDeserializer(Class<T> clazz) {
        this.clazz = clazz;
    }

    @Override
    public T deserialize(String s, byte[] bytes) {
        if (bytes == null) {
            return null;
        }

        T data;
        try {
            data = mapper.readValue(bytes, clazz);
        }
        catch (Exception ex) {
            throw new SerializationException(ex.getMessage());
        }

        return data;
    }
}

Now, it is time to setup the Java directory structure for the Third application. To do that, execute the following commands:

$ cd $HOME/java/KafkaStreams

$ mkdir -p $HOME/java/KafkaStreams/Third

$ mkdir -p Third/src/main/java Third/src/main/resources Third/target

$ mkdir -p Third/src/main/java/com/polarsparc/kstreams

$ mkdir -p Third/src/main/java/com/polarsparc/kstreams/common

$ mkdir -p Third/src/main/java/com/polarsparc/kstreams/model

$ mkdir -p Third/src/main/java/com/polarsparc/kstreams/publisher

$ cd $HOME/java/KafkaStreams/Third


The following is the listing for the Maven project file pom.xml that will be used:


pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <artifactId>KafkaStreams</artifactId>
        <groupId>com.polarsparc.kstreams</groupId>
        <version>1.0</version>
    </parent>

    <artifactId>Third</artifactId>
    <version>1.0</version>

    <dependencies>
        <dependency>
            <artifactId>Common</artifactId>
            <groupId>com.polarsparc.kstreams</groupId>
            <version>1.0</version>
        </dependency>
    </dependencies>
</project>

We need to modify the <modules> section in the parent pom.xml to include the Third module as shown below:


pom.xml (parent)
<modules>
  <module>Common</module>
  <module>First</module>
  <module>Second</module>
  <module>Third</module>
</modules>

The following is the Java POJO that encapsulates a price alert of the fictitious crypto coins:


Listing.4
/*
 * Name:   Crypto Alert
 * Author: Bhaskar S
 * Date:   11/24/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.kstreams.model;

import java.time.LocalDateTime;

public record CryptoAlert(String name, int change, boolean up, double price, LocalDateTime timestamp) {}

Notice that the POJO has been defined as a Java record, which is a special immutable class that encapsulates a fixed set of data fields. Under-the-hood this generates a class with all the data fields marked as final, with accessor methods that match the field name, an all arguments constructor, and the methods toString(), equals(), and hashCode().

The following is the Java utility class related to the fictitious crypto coins:


Listing.5
/*
 * Name:   Crypto Utility
 * Author: Bhaskar S
 * Date:   11/24/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.kstreams.common;

import com.polarsparc.kstreams.model.CryptoAlert;

import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
import java.util.Random;

public interface Crypto {
    String CRYPTO_ALERTS_TOPIC = "crypto-price-alerts";

    List<String> CRYPTO_LIST = Arrays.asList("ANT", "BEE", "CAT", "DOG", "EEL");
    List<Double> CRYPTO_PRICE_LIST = Arrays.asList(10.50, 9.75, 12.25, 11.50, 8.00);
    List<Integer> PERCENT_LIST = Arrays.asList(5, 10, 15, 20, 25, 30, 35, 40, 45, 50);

    Random random = new Random(System.currentTimeMillis());

    static CryptoAlert generateNextCryptoAlert() {
        int cp = random.nextInt(Crypto.CRYPTO_LIST.size());
        int pp = random.nextInt(Crypto.PERCENT_LIST.size());

        double offset = Crypto.CRYPTO_PRICE_LIST.get(cp) * Crypto.PERCENT_LIST.get(pp) / 100;

        boolean up = true;
        double price = Crypto.CRYPTO_PRICE_LIST.get(cp);
        if (random.nextInt(100) % 2 != 0) {
            up = false;
            price -= offset;
        } else {
            price += offset;
        }

        return new CryptoAlert(Crypto.CRYPTO_LIST.get(cp),
                Crypto.PERCENT_LIST.get(pp),
                up,
                price,
                LocalDateTime.now());
    }
}

We will need a Kafka producer that will generate random about 10 price alerts for the fictitious crypto coins. The following is the Java Kafka publisher:


Listing.6
/*
 * Name:   Crypto Alerts Publisher
 * Author: Bhaskar S
 * Date:   11/24/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.kstreams.publisher;

import com.polarsparc.kstreams.common.Crypto;
import com.polarsparc.kstreams.model.CryptoAlert;
import com.polarsparc.kstreams.serde.JsonSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

public class CryptoAlertsPublisher {
    private static final Logger log = LoggerFactory.getLogger(CryptoAlertsPublisher.class.getName());

    private static KafkaProducer<String, CryptoAlert> createCryptoAlertsProducer() {
        Properties config = new Properties();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:20001");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
        return new KafkaProducer<>(config);
    }

    public static void main(String[] args) {
        Producer<String, CryptoAlert> producer = createCryptoAlertsProducer();

        for (int i = 1; i <= 10; i++) {
            CryptoAlert alert = Crypto.generateNextCryptoAlert();
            try {
                log.info(String.format("---> [%02d] Crypto alert: %s", i, alert));

                ProducerRecord<String, CryptoAlert> record = new ProducerRecord<>(Crypto.CRYPTO_ALERTS_TOPIC,
                        alert.name(),
                        alert);

                producer.send(record).get();
            }
            catch (Exception ex) {
                log.error(ex.getMessage());
            }
        }

        producer.close();
    }
}

The following is the Java based STATEFUL Kafka Streams application that consumes and processes events from the Kafka topic crypto-price-alerts:


Listing.7
/*
 * Name:   Crypto Alerts Watcher (Stateful)
 * Author: Bhaskar S
 * Date:   11/25/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.kstreams;

import com.polarsparc.kstreams.common.Crypto;
import com.polarsparc.kstreams.model.CryptoAlert;
import com.polarsparc.kstreams.serde.JsonDeserializer;
import com.polarsparc.kstreams.serde.JsonSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CryptoAlertStateful {
    public static void main(String[] args) {
        Logger log = LoggerFactory.getLogger(CryptoAlertStateful.class.getName());

        StreamsConfig config = new StreamsConfig(KafkaConsumerConfig.kafkaConfigurationTwo(
                "crypto-alerts-watcher", 1));

        StreamsBuilder builder = new StreamsBuilder();

        JsonSerializer<CryptoAlert> cryptoAlertSer = new JsonSerializer<>();
        JsonDeserializer<CryptoAlert> cryptoAlertDe = new JsonDeserializer<>(CryptoAlert.class);

        Serde<String> stringSerde = Serdes.String();
        Serde<CryptoAlert> cryptoAlertSerde = Serdes.serdeFrom(cryptoAlertSer, cryptoAlertDe);

        KStream<String, CryptoAlert> stream = builder.stream(Crypto.CRYPTO_ALERTS_TOPIC,
                Consumed.with(stringSerde, cryptoAlertSerde));

        stream.peek((symbol, alert) -> log.info(String.format("---> [Start] Symbol: %s, Alert: %s",
                        symbol, alert.toString())))
                .filter((symbol, alert) -> !alert.up() && alert.change() > 25) // crypto is down greater that 25%
                .groupByKey()
                .aggregate(
                        () -> null, // Initializer
                        (symbol, alert, agg) -> {
                            CryptoAlert last = agg;
                            if (last == null) {
                                last = alert;
                            } else {
                                if (alert.change() >= last.change()) {
                                    last = alert;
                                }
                            }
                            return last;
                        }, // Aggregator
                        Materialized.with(stringSerde, cryptoAlertSerde) // Store
                )
                .toStream()
                .peek((symbol, alert) -> log.info(String.format("---> [Final] Symbol: %s, Alert: %s",
                        symbol, alert.toString())));

        Topology topology = builder.build();

        log.info(String.format("---> %s", topology.describe().toString()));

        KafkaStreams streams = new KafkaStreams(topology, config);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

The code from Listing.7 above needs some explanation:

The class org.apache.kafka.common.serialization.Serdes is a factory class for creating the appropriate serializers and deserializers.

The method Serdes.serdeFrom(serializer, deserializer) allows one to create an instance of a org.apache.kafka.common.serialization.Serde from the specified instances of the serializers and deserializers.

The DSL method groupByKey() actually returns an instance of the class org.apache.kafka.streams.kstream.KGroupedStream<K, V>, which is an abstraction of a grouped data event stream of the key-value pairs. It is basically an intermediate representation of a KStream that can used for performing aggregation operations, such as, count(), aggregate(), reduce(), etc.

The DSL method aggregate(INITIALIZER, AGGREGATOR, STORE) allows one to perform generic aggregation operation on each incoming data events that are grouped together by a key. Data events with either a NULL key or NULL value are IGNORED. The specified INITIALIZER function is applied just ONCE before the first data event is processed. The specified AGGREGATOR function is applied for each of the incoming data events. The specified local STORE is used for persisting the results of the aggregation operation.

Now, we need to create the Kafka topic crypto-price-alerts with a single partition using docker by executing the following command:

docker run --rm --net=host confluentinc/cp-kafka:7.0.0 kafka-topics --create --topic crypto-price-alerts --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server localhost:20001

The following should be the typical output:

Output.1

Created topic crypto-price-alerts.

Now is time to compile the code from Listing.6 and Listing.7. To do that, open a terminal window and execute the following commands:

$ $HOME/java/KafkaStreams/Third

$ mvn clean compile

Now, it is time to test the code from Listing.7. To do that, open a terminal window and execute the following commands:

$ $HOME/java/KafkaStreams/Third

$ mvn exec:java -Dexec.mainClass=com.polarsparc.kstreams.CryptoAlertStateful

The following should be the typical output:

Output.2

[com.polarsparc.kstreams.CryptoAlertStateful.main()] INFO org.apache.kafka.streams.StreamsConfig - StreamsConfig values: 
  application.id = crypto-alerts-watcher
    ... SNIP ...
  commit.interval.ms = 1000
    ... SNIP ...
  state.dir = /home/polarsparc/kafka/state
    ... SNIP ...
[com.polarsparc.kstreams.CryptoAlertStateful.main()] INFO com.polarsparc.kstreams.CryptoAlertStateful - ---> Topologies:
    Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [crypto-price-alerts])
      --> KSTREAM-PEEK-0000000001
    Processor: KSTREAM-PEEK-0000000001 (stores: [])
      --> KSTREAM-FILTER-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-FILTER-0000000002 (stores: [])
      --> KSTREAM-AGGREGATE-0000000004
      <-- KSTREAM-PEEK-0000000001
    Processor: KSTREAM-AGGREGATE-0000000004 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000003])
      --> KTABLE-TOSTREAM-0000000005
      <-- KSTREAM-FILTER-0000000002
    Processor: KTABLE-TOSTREAM-0000000005 (stores: [])
      --> KSTREAM-PEEK-0000000006
      <-- KSTREAM-AGGREGATE-0000000004
    Processor: KSTREAM-PEEK-0000000006 (stores: [])
      --> none
      <-- KTABLE-TOSTREAM-0000000005
... SNIP ...
[com.polarsparc.kstreams.CryptoAlertStateful.main()] INFO org.apache.kafka.streams.KafkaStreams - stream-client [crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324] State transition from CREATED to REBALANCING
[crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324-StreamThread-1] Starting
[crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324-StreamThread-1] State transition from CREATED to STARTING
[crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324-StreamThread-1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324-StreamThread-1-consumer, groupId=crypto-alerts-watcher] Subscribed to topic(s): crypto-price-alerts
... SNIP ...
[crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324-StreamThread-1-consumer, groupId=crypto-alerts-watcher] Updating assignment with
  Assigned partitions:                       [crypto-price-alerts-0]
  Current owned partitions:                  []
  Added partitions (assigned - owned):       [crypto-price-alerts-0]
  Revoked partitions (owned - assigned):     []
... SNIP ...
[crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.TaskManager - stream-thread [crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324-StreamThread-1] Handle new assignment with:
  New active tasks: [0_0]
  New standby tasks: []
  Existing active tasks: []
  Existing standby tasks: []
... SNIP ...
[crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324-StreamThread-1] INFO org.apache.kafka.streams.state.internals.RocksDBTimestampedStore - Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000003 in regular mode
[crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.ProcessorStateManager - stream-thread [crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324-StreamThread-1] task [0_0] State store KSTREAM-AGGREGATE-STATE-STORE-0000000003 did not find checkpoint offset, hence would default to the 
... SNIP ...
[crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324] State transition from REBALANCING to RUNNING

Now, it is time to run the price alerts publisher code from Listing.6. To do that, open a terminal window and execute the following commands:

$ $HOME/java/KafkaStreams/Third

$ mvn exec:java -Dexec.mainClass=com.polarsparc.kstreams.publisher.CryptoAlertsPublisher

The following should be the typical output:

Output.3

... SNIP ...
[com.polarsparc.kstreams.publisher.CryptoAlertsPublisher.main()] INFO com.polarsparc.kstreams.publisher.CryptoAlertsPublisher - ---> [02] Crypto alert: {"name":"BEE","change":35,"up":false,"price":6.3375,"timestamp":"2021-11-25T20:37:21.241126208"}
[com.polarsparc.kstreams.publisher.CryptoAlertsPublisher.main()] INFO com.polarsparc.kstreams.publisher.CryptoAlertsPublisher - ---> [03] Crypto alert: {"name":"DOG","change":45,"up":true,"price":16.675,"timestamp":"2021-11-25T20:37:21.246422576"}
[com.polarsparc.kstreams.publisher.CryptoAlertsPublisher.main()] INFO com.polarsparc.kstreams.publisher.CryptoAlertsPublisher - ---> [04] Crypto alert: {"name":"EEL","change":40,"up":false,"price":4.8,"timestamp":"2021-11-25T20:37:21.251814042"}
[com.polarsparc.kstreams.publisher.CryptoAlertsPublisher.main()] INFO com.polarsparc.kstreams.publisher.CryptoAlertsPublisher - ---> [05] Crypto alert: {"name":"ANT","change":40,"up":false,"price":6.3,"timestamp":"2021-11-25T20:37:21.257238922"}
[com.polarsparc.kstreams.publisher.CryptoAlertsPublisher.main()] INFO com.polarsparc.kstreams.publisher.CryptoAlertsPublisher - ---> [06] Crypto alert: {"name":"EEL","change":5,"up":false,"price":7.6,"timestamp":"2021-11-25T20:37:21.262220395"}
[com.polarsparc.kstreams.publisher.CryptoAlertsPublisher.main()] INFO com.polarsparc.kstreams.publisher.CryptoAlertsPublisher - ---> [07] Crypto alert: {"name":"ANT","change":20,"up":true,"price":12.6,"timestamp":"2021-11-25T20:37:21.266043975"}
[com.polarsparc.kstreams.publisher.CryptoAlertsPublisher.main()] INFO com.polarsparc.kstreams.publisher.CryptoAlertsPublisher - ---> [08] Crypto alert: {"name":"CAT","change":35,"up":false,"price":7.9625,"timestamp":"2021-11-25T20:37:21.272748338"}
[com.polarsparc.kstreams.publisher.CryptoAlertsPublisher.main()] INFO com.polarsparc.kstreams.publisher.CryptoAlertsPublisher - ---> [09] Crypto alert: {"name":"EEL","change":40,"up":false,"price":4.8,"timestamp":"2021-11-25T20:37:21.278127632"}
[com.polarsparc.kstreams.publisher.CryptoAlertsPublisher.main()] INFO com.polarsparc.kstreams.publisher.CryptoAlertsPublisher - ---> [10] Crypto alert: {"name":"BEE","change":15,"up":true,"price":11.2125,"timestamp":"2021-11-25T20:37:21.282620234"}
[com.polarsparc.kstreams.publisher.CryptoAlertsPublisher.main()] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
... SNIP ...

The following would be the additional output in the terminal running the application from Listing.7 above:

Output.4

[crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324-StreamThread-1] INFO com.polarsparc.kstreams.CryptoAlertStateful - ---> [Start] Symbol: CAT, Alert: CryptoAlert[name=CAT, change=40, up=true, price=17.15, timestamp=2021-11-25T20:37:20.902646034]
[crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324-StreamThread-1] INFO com.polarsparc.kstreams.CryptoAlertStateful - ---> [Start] Symbol: BEE, Alert: CryptoAlert[name=BEE, change=35, up=false, price=6.3375, timestamp=2021-11-25T20:37:21.241126208]
[crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324-StreamThread-1] INFO com.polarsparc.kstreams.CryptoAlertStateful - ---> [Start] Symbol: DOG, Alert: CryptoAlert[name=DOG, change=45, up=true, price=16.675, timestamp=2021-11-25T20:37:21.246422576]
[crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324-StreamThread-1] INFO com.polarsparc.kstreams.CryptoAlertStateful - ---> [Start] Symbol: EEL, Alert: CryptoAlert[name=EEL, change=40, up=false, price=4.8, timestamp=2021-11-25T20:37:21.251814042]
[crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324-StreamThread-1] INFO com.polarsparc.kstreams.CryptoAlertStateful - ---> [Start] Symbol: ANT, Alert: CryptoAlert[name=ANT, change=40, up=false, price=6.3, timestamp=2021-11-25T20:37:21.257238922]
[crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324-StreamThread-1] INFO com.polarsparc.kstreams.CryptoAlertStateful - ---> [Start] Symbol: EEL, Alert: CryptoAlert[name=EEL, change=5, up=false, price=7.6, timestamp=2021-11-25T20:37:21.262220395]
[crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324-StreamThread-1] INFO com.polarsparc.kstreams.CryptoAlertStateful - ---> [Start] Symbol: ANT, Alert: CryptoAlert[name=ANT, change=20, up=true, price=12.6, timestamp=2021-11-25T20:37:21.266043975]
[crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324-StreamThread-1] INFO com.polarsparc.kstreams.CryptoAlertStateful - ---> [Start] Symbol: CAT, Alert: CryptoAlert[name=CAT, change=35, up=false, price=7.9625, timestamp=2021-11-25T20:37:21.272748338]
[crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324-StreamThread-1] INFO com.polarsparc.kstreams.CryptoAlertStateful - ---> [Start] Symbol: EEL, Alert: CryptoAlert[name=EEL, change=40, up=false, price=4.8, timestamp=2021-11-25T20:37:21.278127632]
[crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324-StreamThread-1] INFO com.polarsparc.kstreams.CryptoAlertStateful - ---> [Start] Symbol: BEE, Alert: CryptoAlert[name=BEE, change=15, up=true, price=11.2125, timestamp=2021-11-25T20:37:21.282620234]
[crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324-StreamThread-1] INFO com.polarsparc.kstreams.CryptoAlertStateful - ---> [Final] Symbol: BEE, Alert: CryptoAlert[name=BEE, change=35, up=false, price=6.3375, timestamp=2021-11-25T20:37:21.241126208]
[crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324-StreamThread-1] INFO com.polarsparc.kstreams.CryptoAlertStateful - ---> [Final] Symbol: ANT, Alert: CryptoAlert[name=ANT, change=40, up=false, price=6.3, timestamp=2021-11-25T20:37:21.257238922]
[crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324-StreamThread-1] INFO com.polarsparc.kstreams.CryptoAlertStateful - ---> [Final] Symbol: CAT, Alert: CryptoAlert[name=CAT, change=35, up=false, price=7.9625, timestamp=2021-11-25T20:37:21.272748338]
[crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324-StreamThread-1] INFO com.polarsparc.kstreams.CryptoAlertStateful - ---> [Final] Symbol: EEL, Alert: CryptoAlert[name=EEL, change=40, up=false, price=4.8, timestamp=2021-11-25T20:37:21.278127632]
[crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [crypto-alerts-watcher-9b28dd20-d38e-4503-825d-80794e027324-StreamThread-1] Processed 10 total records, ran 0 punctuators, and committed 1 total tasks since the last update

The lines in the Output.4 above with "---> [Final]" are the result of the aggregate() operation.


References

Exploring Kafka Streams :: Part 3

Exploring Kafka Streams :: Part 2

Exploring Kafka Streams :: Part 1

Kafka Streams Developer Guide



© PolarSPARC