PolarSPARC

Exploring Kafka Streams :: Part 7


Bhaskar S 12/17/2021


Overview

In Part 6 of this series, we demonstrated how to extract the data event (or Kafka message) timestamp and use an in-memory state store in Kafka Streams applications.

In this part of the series, we will demonstrate the different Join operations between two streams of data events.

Kafka Streams Concepts

Joins

Kafka Streams allows one to join two streams, be it a KStream or a KTable. From Part 1, we know that a KStream is an unbounded stream of events that represents an append-only semantics, while KTable represents an update semantics similar to a database table.

Kafka Streams supports the following three Join types:

A KStream can be joined with another KStream only with windowing enabled, which makes sense as a KStream is unbounded stream and hence needs to be bounded using a time window. Windowing is not needed when a KTable is involved as it represents a database table semantics.

The following table illustrates the stream types and their supported join types:


Stream Options
Figure.4

In order for the joins to work in Kafka Streams, the following important criteria, referred to as the Co-Partitioning rules, have to be met:

Hands-on with Kafka Streams

Sixth Application

In the Sixth module, we will simulate a data event stream, where each event contains a fictitious product key (as a string) and how many quantities were sold (as an integer) as value. We will demonstrate a STATEFUL Kafka Streams application that will consume the stream of data events from the specified Kafka topic and aggregate the total quantity of product sold using the three windowing techniques - tumbling, hopping, and sliding.

To setup the Java directory structure for the Sixth application, execute the following commands:

$ cd $HOME/java/KafkaStreams

$ mkdir -p $HOME/java/KafkaStreams/Sixth

$ mkdir -p Sixth/src/main/java Sixth/src/main/resources Sixth/target

$ mkdir -p Sixth/src/main/java/com/polarsparc/kstreams

$ cd $HOME/java/KafkaStreams/Sixth


The following is the listing for the Maven project file pom.xml that will be used:


pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <artifactId>KafkaStreams</artifactId>
        <groupId>com.polarsparc.kstreams</groupId>
        <version>1.0</version>
    </parent>

    <artifactId>Sixth</artifactId>
    <version>1.0</version>

    <dependencies>
        <dependency>
            <artifactId>Common</artifactId>
            <groupId>com.polarsparc.kstreams</groupId>
            <version>1.0</version>
        </dependency>
    </dependencies>
</project>

We need to modify the <modules> section in the parent pom.xml to include the Sixth module as shown below:


pom.xml (parent)
<modules>
  <module>Common</module>
  <module>First</module>
  <module>Second</module>
  <module>Third</module>
  <module>Fourth</module>
  <module>Fifth</module>
  <module>Sixth</module>
</modules>

We will need a Kafka producer that will generate the stream of data events on two Kafka topics. The following is the Java Kafka publisher that uses two threads (one for each Kafka topic) to publish events:


Listing.1
/*
 * Name:   Stream Data Event Generator for 2 Topics
 * Author: Bhaskar S
 * Date:   12/11/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.kstreams;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Random;

public class TwoMetricsGenerator {
    private static final Logger log = LoggerFactory.getLogger(TwoMetricsGenerator.class.getName());

    private final static List<String> keysList = Arrays.asList("A", "M", "S", "B", "N", "T");

    private final static Random random = new Random(1001);

    private static KafkaProducer<String, Integer> createMetricsProducer() {
        Properties config = new Properties();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:20001");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        return new KafkaProducer<>(config);
    }

    private static void generateMetricEvent(boolean flag, int type, String topic, Producer<String, Integer> producer) {
        int ki = random.nextInt(keysList.size());

        String key = keysList.get(ki);

        int value;
        if (type == 1) {
            value = random.nextInt(101);
        } else {
            value = random.nextInt(4, 32);
        }

        log.info(String.format("---> [%d] Topic: %s, Key: %s, Value: %d", type, topic, key, value));

        if (!flag) {
            try {
                producer.send(new ProducerRecord<>(topic, key, value)).get();
            }
            catch (Exception ex) {
                log.error(ex.getMessage());
            }
        }

        try {
            Thread.sleep(500);
        }
        catch (Exception ignore)  {
        }
    }

    public static void main(String[] args) {
        if (args.length < 2) {
            System.out.printf("Usage: java %s <topic-1> <topic-2> [--dry-run]\n", TwoMetricsGenerator.class.getName());
            System.exit(1);
        }

        boolean dryRun = args.length == 3 && args[2].equalsIgnoreCase("--dry-run");

        Producer<String, Integer> producer = null;
        if (!dryRun) {
            producer = createMetricsProducer();
        }

        class TaskOne extends Thread {
            final Producer<String, Integer> kp;

            TaskOne(Producer<String, Integer> kp) {
                this.kp = kp;
            }

            @Override
            public void run() {
                for (int i = 1; i <= 10; i++) {
                    generateMetricEvent(dryRun, 1, args[0], kp);
                }
            }
        }

        class TaskTwo extends Thread {
            final Producer<String, Integer> kp;

            TaskTwo(Producer<String, Integer> kp) {
                this.kp = kp;
            }

            @Override
            public void run() {
                for (int i = 1; i <= 10; i++) {
                    generateMetricEvent(dryRun, 2, args[1], kp);
                }
            }
        }

        TaskOne t1 = new TaskOne(producer);
        TaskTwo t2 = new TaskTwo(producer);

        t1.start();
        t2.start();

        try {
            t1.join();
        }
        catch (Exception ex) {
            // Ignore
        }

        try {
            t2.join();
        }
        catch (Exception ex) {
            // Ignore
        }

        if (!dryRun) {
            producer.close();
        }
    }
}

The following is the Java based Kafka Streams application that builds two streams, each consuming events (Kafka messages) from the specified Kafka topics and joining the two streams based on the third argument (that controls the type of streams/join to use):


Listing.2
/*
 * Name:   Two Data Streams Joiner
 * Author: Bhaskar S
 * Date:   12/11/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.kstreams;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;

public class StreamsJoiner {
    private static void usage() {
        System.out.printf("Usage: java %s <topic-1> <topic-2> <SSI | STL | TTO>\n", StreamsJoiner.class.getName());
        System.exit(1);
    }

    public static void main(String[] args) {
        if (args.length != 3) {
            usage();
        }

        Logger log = LoggerFactory.getLogger(StreamsJoiner.class.getName());

        StreamsConfig config = new StreamsConfig(KafkaConsumerConfig.kafkaConfigurationFour(
                String.format("%s-%s", args[0], args[1]), 1, Serdes.Integer()));

        Serde<String> stringSerde = Serdes.String();
        Serde<Integer> integerSerde = Serdes.Integer();

        // Window duration - 3 secs
        Duration windowSz = Duration.ofSeconds(3);

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, Integer> streamOne = builder.stream(args[0], Consumed.with(stringSerde, integerSerde))
                .peek((key, metric) -> log.info(String.format(".S1> Key: %s, Metrics-1: %d", key, metric)));

        KStream<String, Integer> streamTwo = builder.stream(args[1], Consumed.with(stringSerde, integerSerde))
                .peek((key, metric) -> log.info(String.format(".S2> Key: %s, Metrics-2: %d", key, metric)));

        switch (args[2]) {
            case "SSI" -> { // KStream-KStream Inner Join
                log.info("------> KStream-KStream Inner Join");

                streamOne.join(streamTwo,
                                (oneVal, twoVal) -> String.format("Metrics-1: %d, Metrics-2: %d", oneVal, twoVal),
                                JoinWindows.of(windowSz))
                        .peek((key, value) -> log.info(String.format("SSI> Key: %s, Value: %s", key, value)));
            }
            case "STL" -> { // KStream-KTable Left Join
                log.info("------> KStream-KTable Left Join");

                KTable<String, Integer> tableTwo = streamTwo.toTable();
                tableTwo.toStream()
                        .peek((key, metric) -> log.info(String.format(".T2> Key: %s, Metrics-2: %d", key, metric)));

                streamOne.leftJoin(tableTwo,
                                (oneVal, twoVal) -> String.format("Metrics-1: %d, Metrics-2: %d", oneVal, twoVal))
                        .peek((key, value) -> log.info(String.format("STL> Key: %s, Value: %s", key, value)));
            }
            case "TTO" -> { // KTable-KTable Outer Join
                log.info("------> KTable-KTable Outer Join");

                KTable<String, Integer> tableOne = streamOne.toTable();
                tableOne.toStream()
                        .peek((key, metric) -> log.info(String.format(".T1> Key: %s, Metrics-1: %d", key, metric)));

                KTable<String, Integer> tableTwo = streamTwo.toTable();
                tableTwo.toStream()
                        .peek((key, metric) -> log.info(String.format(".T2> Key: %s, Metrics-2: %d", key, metric)));

                KTable<String, String> joined = tableOne.outerJoin(tableTwo,
                        (oneVal, twoVal) -> String.format("Metrics-1: %d, Metrics-2: %d", oneVal, twoVal));
                joined.toStream()
                        .peek((key, value) -> log.info(String.format("TTO> Key: %s, Value: %s", key, value)));
            }
        }

        Topology topology = builder.build();

        log.info(String.format("---> %s", topology.describe().toString()));

        KafkaStreams streams = new KafkaStreams(topology, config);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

The code from Listing.2 above needs some explanation:

The method KSTREAM-1.join(KSTREAM-2, VALUE-JOINER, JOIN-WINDOW) performs an inner join on the events from KSTREAM-1 with the events from KSTREAM-2, that lie within the specified time window period JOIN-WINDOW, using the specified joiner operator VALUE-JOINER. The two streams use the default key and value serde to extract the events from the resp[ective streams.

The method KSTREAM.leftJoin(KTABLE, VALUE-JOINER) performs a left join on the events from KSTREAM with the current state of events in KTABLE, using the specified joiner operator VALUE-JOINER. The two streams use the default key and value serde to extract the events from the resp[ective streams.

The method KTABLE-1.outerJoin(KTABLE-2, VALUE-JOINER) performs an outer join on the the current state of events in KTABLE-1 with the current state of events in KTABLE-2, using the specified joiner operator VALUE-JOINER. The two streams use the default key and value serde to extract the events from the resp[ective streams.

Now, we need to create the two Kafka topics events-one and events-two corresponding to the two sources of events. To do that, execute the following commands:

docker run --rm --net=host confluentinc/cp-kafka:7.0.0 kafka-topics --create --topic events-one --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server localhost:20001

docker run --rm --net=host confluentinc/cp-kafka:7.0.0 kafka-topics --create --topic events-two --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server localhost:20001

Now is time to compile the code for module Sixth. To do that, open a terminal window and execute the following commands:

$ $HOME/java/KafkaStreams/Sixth

$ mvn clean compile

To run the code from Listing.2 to demonstrate KStream-KStream inner join, open a terminal window and execute the following commands:

$ $HOME/java/KafkaStreams/Sixth

$ mvn exec:java -Dexec.mainClass=com.polarsparc.kstreams.StreamsJoiner -Dexec.args="events-one events-two SSI"

The following should be the typical output:

Output.1

... SNIP ...
[com.polarsparc.kstreams.StreamsJoiner.main()] INFO com.polarsparc.kstreams.StreamsJoiner - ------> KStream-KStream Inner Join
[com.polarsparc.kstreams.StreamsJoiner.main()] INFO com.polarsparc.kstreams.StreamsJoiner - ---> Topologies:
    Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [events-one])
      --> KSTREAM-PEEK-0000000001
    Source: KSTREAM-SOURCE-0000000002 (topics: [events-two])
      --> KSTREAM-PEEK-0000000003
    Processor: KSTREAM-PEEK-0000000001 (stores: [])
      --> KSTREAM-WINDOWED-0000000004
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-PEEK-0000000003 (stores: [])
      --> KSTREAM-WINDOWED-0000000005
      <-- KSTREAM-SOURCE-0000000002
    Processor: KSTREAM-WINDOWED-0000000004 (stores: [KSTREAM-JOINTHIS-0000000006-store])
      --> KSTREAM-JOINTHIS-0000000006
      <-- KSTREAM-PEEK-0000000001
    Processor: KSTREAM-WINDOWED-0000000005 (stores: [KSTREAM-JOINOTHER-0000000007-store])
      --> KSTREAM-JOINOTHER-0000000007
      <-- KSTREAM-PEEK-0000000003
    Processor: KSTREAM-JOINOTHER-0000000007 (stores: [KSTREAM-JOINTHIS-0000000006-store])
      --> KSTREAM-MERGE-0000000008
      <-- KSTREAM-WINDOWED-0000000005
    Processor: KSTREAM-JOINTHIS-0000000006 (stores: [KSTREAM-JOINOTHER-0000000007-store])
      --> KSTREAM-MERGE-0000000008
      <-- KSTREAM-WINDOWED-0000000004
    Processor: KSTREAM-MERGE-0000000008 (stores: [])
      --> KSTREAM-PEEK-0000000009
      <-- KSTREAM-JOINTHIS-0000000006, KSTREAM-JOINOTHER-0000000007
    Processor: KSTREAM-PEEK-0000000009 (stores: [])
      --> none
      <-- KSTREAM-MERGE-0000000008
... SNIP ...
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4] State transition from REBALANCING to RUNNING
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1-consumer, groupId=events-one-events-two] Requesting the log end offset for events-one-0 in order to compute lag

To run the events producer code from Listing.1 to publish a stream of events to the two Kafka topics events-one and events-two respectively, open a terminal window and execute the following commands:

$ $HOME/java/KafkaStreams/Sixth

$ mvn exec:java -Dexec.mainClass=com.polarsparc.kstreams.TwoMetricsGenerator -Dexec.args="events-one events-two"

The following should be the typical output:

Output.2

... SNIP ...
[com.polarsparc.kstreams.TwoMetricsGenerator.main()] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.0.0
[com.polarsparc.kstreams.TwoMetricsGenerator.main()] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962
[com.polarsparc.kstreams.TwoMetricsGenerator.main()] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1639337443497
[Thread-1] INFO com.polarsparc.kstreams.TwoMetricsGenerator - ---> [1] Topic: events-one, Key: T, Value: 33
[Thread-2] INFO com.polarsparc.kstreams.TwoMetricsGenerator - ---> [2] Topic: events-two, Key: A, Value: 30
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: T2SYasm_Twu9z2CH1tITkQ
[Thread-1] INFO com.polarsparc.kstreams.TwoMetricsGenerator - ---> [1] Topic: events-one, Key: B, Value: 32
[Thread-2] INFO com.polarsparc.kstreams.TwoMetricsGenerator - ---> [2] Topic: events-two, Key: A, Value: 14
[Thread-1] INFO com.polarsparc.kstreams.TwoMetricsGenerator - ---> [1] Topic: events-one, Key: N, Value: 84
[Thread-2] INFO com.polarsparc.kstreams.TwoMetricsGenerator - ---> [2] Topic: events-two, Key: T, Value: 14
[Thread-1] INFO com.polarsparc.kstreams.TwoMetricsGenerator - ---> [1] Topic: events-one, Key: B, Value: 74
[Thread-2] INFO com.polarsparc.kstreams.TwoMetricsGenerator - ---> [2] Topic: events-two, Key: M, Value: 9
[Thread-1] INFO com.polarsparc.kstreams.TwoMetricsGenerator - ---> [1] Topic: events-one, Key: M, Value: 45
[Thread-2] INFO com.polarsparc.kstreams.TwoMetricsGenerator - ---> [2] Topic: events-two, Key: B, Value: 14
[Thread-1] INFO com.polarsparc.kstreams.TwoMetricsGenerator - ---> [1] Topic: events-one, Key: M, Value: 27
[Thread-2] INFO com.polarsparc.kstreams.TwoMetricsGenerator - ---> [2] Topic: events-two, Key: A, Value: 10
[Thread-1] INFO com.polarsparc.kstreams.TwoMetricsGenerator - ---> [1] Topic: events-one, Key: T, Value: 74
[Thread-2] INFO com.polarsparc.kstreams.TwoMetricsGenerator - ---> [2] Topic: events-two, Key: M, Value: 21
[Thread-1] INFO com.polarsparc.kstreams.TwoMetricsGenerator - ---> [1] Topic: events-one, Key: A, Value: 50
[Thread-2] INFO com.polarsparc.kstreams.TwoMetricsGenerator - ---> [2] Topic: events-two, Key: S, Value: 5
[Thread-1] INFO com.polarsparc.kstreams.TwoMetricsGenerator - ---> [1] Topic: events-one, Key: T, Value: 83
[Thread-2] INFO com.polarsparc.kstreams.TwoMetricsGenerator - ---> [2] Topic: events-two, Key: T, Value: 24
[Thread-1] INFO com.polarsparc.kstreams.TwoMetricsGenerator - ---> [1] Topic: events-one, Key: S, Value: 82
[Thread-2] INFO com.polarsparc.kstreams.TwoMetricsGenerator - ---> [2] Topic: events-two, Key: A, Value: 30
[com.polarsparc.kstreams.TwoMetricsGenerator.main()] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[com.polarsparc.kstreams.TwoMetricsGenerator.main()] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
[com.polarsparc.kstreams.TwoMetricsGenerator.main()] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
[com.polarsparc.kstreams.TwoMetricsGenerator.main()] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
[com.polarsparc.kstreams.TwoMetricsGenerator.main()] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-1 unregistered

The following would be the additional output in the terminal running the application from Listing.2 above:

Output.3

[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S1> Key: T, Metrics-1: 33
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S2> Key: A, Metrics-2: 30
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S1> Key: B, Metrics-1: 32
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S2> Key: A, Metrics-2: 14
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S1> Key: N, Metrics-1: 84
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S2> Key: T, Metrics-2: 14
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - SSI> Key: T, Value: Metrics-1: 33, Metrics-2: 14
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S1> Key: B, Metrics-1: 74
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S2> Key: M, Metrics-2: 9
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S1> Key: M, Metrics-1: 45
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - SSI> Key: M, Value: Metrics-1: 45, Metrics-2: 9
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S2> Key: B, Metrics-2: 14
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - SSI> Key: B, Value: Metrics-1: 32, Metrics-2: 14
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - SSI> Key: B, Value: Metrics-1: 74, Metrics-2: 14
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S1> Key: M, Metrics-1: 27
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - SSI> Key: M, Value: Metrics-1: 27, Metrics-2: 9
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S2> Key: A, Metrics-2: 10
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S1> Key: T, Metrics-1: 74
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - SSI> Key: T, Value: Metrics-1: 74, Metrics-2: 14
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S2> Key: M, Metrics-2: 21
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - SSI> Key: M, Value: Metrics-1: 45, Metrics-2: 21
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - SSI> Key: M, Value: Metrics-1: 27, Metrics-2: 21
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S1> Key: A, Metrics-1: 50
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - SSI> Key: A, Value: Metrics-1: 50, Metrics-2: 10
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S2> Key: S, Metrics-2: 5
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S1> Key: T, Metrics-1: 83
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S2> Key: T, Metrics-2: 24
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - SSI> Key: T, Value: Metrics-1: 74, Metrics-2: 24
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - SSI> Key: T, Value: Metrics-1: 83, Metrics-2: 24
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S1> Key: S, Metrics-1: 82
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - SSI> Key: S, Value: Metrics-1: 82, Metrics-2: 5
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S2> Key: A, Metrics-2: 30
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - SSI> Key: A, Value: Metrics-1: 50, Metrics-2: 30

Analyzing the lines from the Output.3 above we can see the inner join working as expected.

We need to reset the two Kafka topics events-one and events-one for the next demonstration. To do that, execute the following commands:

docker run --rm --net=host confluentinc/cp-kafka:7.0.0 kafka-topics --delete --topic events-one --bootstrap-server localhost:20001

docker run --rm --net=host confluentinc/cp-kafka:7.0.0 kafka-topics --create --topic events-one --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server localhost:20001

docker run --rm --net=host confluentinc/cp-kafka:7.0.0 kafka-topics --delete --topic events-two --bootstrap-server localhost:20001

docker run --rm --net=host confluentinc/cp-kafka:7.0.0 kafka-topics --create --topic events-two --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server localhost:20001

Next, to run the code from Listing.2 to demonstrate KStream-KTable left join, open a terminal window and execute the following commands:

$ $HOME/java/KafkaStreams/Sixth

$ mvn exec:java -Dexec.mainClass=com.polarsparc.kstreams.StreamsJoiner -Dexec.args="events-one events-two STL"

We will not show the output as it will be similar in nature to Output.1 above (except will indicate KStream-KTable Left Join).

Now, re-run the producer code from Listing.1.

The following would be the additional output in the terminal running the application from Listing.2 above (for the KStream-KTable left join):

Output.4

[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S1> Key: T, Metrics-1: 33
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - STL> Key: T, Value: Metrics-1: 33, Metrics-2: null
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S2> Key: A, Metrics-2: 30
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S1> Key: B, Metrics-1: 32
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - STL> Key: B, Value: Metrics-1: 32, Metrics-2: null
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S2> Key: A, Metrics-2: 14
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .T2> Key: A, Metrics-2: 14
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S1> Key: N, Metrics-1: 84
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - STL> Key: N, Value: Metrics-1: 84, Metrics-2: null
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S2> Key: T, Metrics-2: 14
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S1> Key: B, Metrics-1: 74
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - STL> Key: B, Value: Metrics-1: 74, Metrics-2: null
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S2> Key: M, Metrics-2: 9
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .T2> Key: T, Metrics-2: 14
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .T2> Key: M, Metrics-2: 9
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S1> Key: M, Metrics-1: 45
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - STL> Key: M, Value: Metrics-1: 45, Metrics-2: 9
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S2> Key: B, Metrics-2: 14
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S1> Key: M, Metrics-1: 27
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - STL> Key: M, Value: Metrics-1: 27, Metrics-2: 9
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S2> Key: A, Metrics-2: 10
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .T2> Key: B, Metrics-2: 14
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .T2> Key: A, Metrics-2: 10
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S1> Key: T, Metrics-1: 74
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - STL> Key: T, Value: Metrics-1: 74, Metrics-2: 14
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S2> Key: M, Metrics-2: 21
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S1> Key: A, Metrics-1: 50
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - STL> Key: A, Value: Metrics-1: 50, Metrics-2: 10
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S2> Key: S, Metrics-2: 5
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .T2> Key: M, Metrics-2: 21
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .T2> Key: S, Metrics-2: 5
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S1> Key: T, Metrics-1: 83
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - STL> Key: T, Value: Metrics-1: 83, Metrics-2: 14
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S2> Key: T, Metrics-2: 24
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S1> Key: S, Metrics-1: 82
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - STL> Key: S, Value: Metrics-1: 82, Metrics-2: 5
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S2> Key: A, Metrics-2: 30
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .T2> Key: T, Metrics-2: 24
[events-one-events-two-895a43fe-51a7-4f20-af66-3f08b81ff9b4-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .T2> Key: A, Metrics-2: 30

Analyzing the lines from the Output.4 above we can see the left join working as expected.

Once again, we need to reset the two Kafka topics events-one and events-one for the final demonstration. To do that, execute the following commands:

docker run --rm --net=host confluentinc/cp-kafka:7.0.0 kafka-topics --delete --topic events-one --bootstrap-server localhost:20001

docker run --rm --net=host confluentinc/cp-kafka:7.0.0 kafka-topics --create --topic events-one --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server localhost:20001

docker run --rm --net=host confluentinc/cp-kafka:7.0.0 kafka-topics --delete --topic events-two --bootstrap-server localhost:20001

docker run --rm --net=host confluentinc/cp-kafka:7.0.0 kafka-topics --create --topic events-two --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server localhost:20001

Finally, to run the code from Listing.2 to demonstrate KTable-KTable outer join, open a terminal window and execute the following commands:

$ $HOME/java/KafkaStreams/Sixth

$ mvn exec:java -Dexec.mainClass=com.polarsparc.kstreams.StreamsJoiner -Dexec.args="events-one events-two TTO"

We will not show the output as it will be similar in nature to Output.1 above (except will indicate KTable-KTable Outer Join).

Now, re-run the producer code from Listing.1.

The following would be the additional output in the terminal running the application from Listing.2 above (for the KTable-KTable outer join):

Output.5

[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S1> Key: T, Metrics-1: 33
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S2> Key: A, Metrics-2: 30
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S1> Key: B, Metrics-1: 32
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S2> Key: A, Metrics-2: 14
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .T1> Key: T, Metrics-1: 33
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - TTO> Key: T, Value: Metrics-1: 33, Metrics-2: null
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .T1> Key: B, Metrics-1: 32
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - TTO> Key: B, Value: Metrics-1: 32, Metrics-2: null
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .T2> Key: A, Metrics-2: 14
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - TTO> Key: A, Value: Metrics-1: 30, Metrics-2: 14
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S1> Key: N, Metrics-1: 84
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S2> Key: T, Metrics-2: 14
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S1> Key: B, Metrics-1: 74
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S2> Key: M, Metrics-2: 9
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .T1> Key: N, Metrics-1: 84
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - TTO> Key: N, Value: Metrics-1: 84, Metrics-2: null
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .T1> Key: B, Metrics-1: 74
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - TTO> Key: B, Value: Metrics-1: 74, Metrics-2: null
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .T2> Key: T, Metrics-2: 14
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - TTO> Key: T, Value: Metrics-1: 33, Metrics-2: 14
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .T2> Key: M, Metrics-2: 9
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - TTO> Key: M, Value: Metrics-1: 21, Metrics-2: 9
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S1> Key: M, Metrics-1: 45
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S2> Key: B, Metrics-2: 14
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S1> Key: M, Metrics-1: 27
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S2> Key: A, Metrics-2: 10
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .T1> Key: M, Metrics-1: 27
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - TTO> Key: M, Value: Metrics-1: 27, Metrics-2: 9
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .T2> Key: B, Metrics-2: 14
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - TTO> Key: B, Value: Metrics-1: 74, Metrics-2: 14
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .T2> Key: A, Metrics-2: 10
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - TTO> Key: A, Value: Metrics-1: 30, Metrics-2: 10
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S1> Key: T, Metrics-1: 74
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S2> Key: M, Metrics-2: 21
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S1> Key: A, Metrics-1: 50
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S2> Key: S, Metrics-2: 5
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .T1> Key: T, Metrics-1: 74
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - TTO> Key: T, Value: Metrics-1: 74, Metrics-2: 14
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .T1> Key: A, Metrics-1: 50
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - TTO> Key: A, Value: Metrics-1: 50, Metrics-2: 10
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .T2> Key: M, Metrics-2: 21
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - TTO> Key: M, Value: Metrics-1: 27, Metrics-2: 21
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .T2> Key: S, Metrics-2: 5
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - TTO> Key: S, Value: Metrics-1: 5, Metrics-2: 5
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S1> Key: T, Metrics-1: 83
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S2> Key: T, Metrics-2: 24
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S1> Key: S, Metrics-1: 82
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .S2> Key: A, Metrics-2: 30
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .T1> Key: T, Metrics-1: 83
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - TTO> Key: T, Value: Metrics-1: 83, Metrics-2: 24
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .T1> Key: S, Metrics-1: 82
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - TTO> Key: S, Value: Metrics-1: 82, Metrics-2: 5
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .T2> Key: T, Metrics-2: 24
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - TTO> Key: T, Value: Metrics-1: 83, Metrics-2: 24
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - .T2> Key: A, Metrics-2: 30
[events-one-events-two-68441114-279c-4c52-89f8-8a6e5da1a57c-StreamThread-1] INFO com.polarsparc.kstreams.StreamsJoiner - TTO> Key: A, Value: Metrics-1: 50, Metrics-2: 30

Analyzing the lines from the Output.5 above we can see the outer join working as expected.

References

GitHub :: Source Code

Exploring Kafka Streams :: Part 6

Exploring Kafka Streams :: Part 5

Exploring Kafka Streams :: Part 4

Exploring Kafka Streams :: Part 3

Exploring Kafka Streams :: Part 2

Exploring Kafka Streams :: Part 1

Kafka Streams Developer Guide



© PolarSPARC