PolarSPARC

Introduction to Vert.x - Part 3


Bhaskar S 05/18/2019


Overview

In Part-2 of this series, we explored the use of a configuration file to externalize parameters (rather than hardcoding parameters) as well as simplify the handling of asynchronous callbacks (callback hell) using chaining of functions that return a Future in Vert.x.

In this part, we will continue with examples around the EventBus.

Hands-on with Vert.x - 3

As indicated in Part-1 of this series, the Event Bus is the messaging backbone through which different Verticle(s) communicate with each other. One Verticle instance can publish a message to a named destination address (basic string name) that could be consumed by other Verticle instance(s) via the Event Bus. The Event Bus supports the following 3 types of message passing patterns:

Vertx uses the Best-Effort delivery mechanism. What this means is that Vertx will do its best to deliver message(s); there is a possibility of message loss in case of any failures.

The following is the listing for Sample05.java, which demonstrates the Point-to-Point communication pattern:

Sample05.java
/*
 * Topic:  Introduction to Vert.x
 * 
 * Name:   Sample 5
 * 
 * Author: Bhaskar S
 * 
 * URL:    https://www.polarsparc.com
 */

package com.polarsparc.Vertx;

import java.util.logging.Level;
import java.util.logging.Logger;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Vertx;

public class Sample05 {
    private static Logger LOGGER = Logger.getLogger(Sample05.class.getName());
    
    private static String ADDRESS = "msg.address";
    private static String MESSAGE = "Hello from Vert.x";
    
    // Publisher verticle
    private static class MsgSendVerticle extends AbstractVerticle {
        @Override
        public void start() {
            vertx.eventBus().send(ADDRESS, MESSAGE);
            
            LOGGER.log(Level.INFO, "Message send to address " + ADDRESS);
        }
    }
    
    // Consumer verticle
    private static class MsgConsumerVerticle extends AbstractVerticle {
        @Override
        public void start() {
            vertx.eventBus().consumer(ADDRESS, res -> {
                 LOGGER.log(Level.INFO, "Received message - " + res.body());
            });
        }
    }
    
    public static void main(String[] args) {
        Vertx vertx = Vertx.vertx();
        vertx.deployVerticle(new MsgConsumerVerticle(), res1 -> {
            if (res1.succeeded()) {
                LOGGER.log(Level.INFO, "Deployed consumer instance ID: " + res1.result());
                
                vertx.deployVerticle(new MsgSendVerticle(), res2 -> {
                    if (res2.succeeded()) {
                        LOGGER.log(Level.INFO, "Deployed sender instance ID: " + res2.result());
                    } else {
                        res2.cause().printStackTrace();
                    }
                });
            } else {
                res1.cause().printStackTrace();
            }
        });
    }
}

Let us explain and understand the code from Sample05 listed above.

The method eventBus() on the class io.vertx.core.Vertx returns an instance of the class io.vertx.core.eventbus.EventBus, which is the messaging system of Vertx.

The call to the method send() on the instance of EventBus takes two arguments - a destination address (which is a simple string) and the message object. The message object is serialized and sent to the specified address. The message is delivered to *AT MOST* one of the handlers registered at the specified address.

The call to the method consumer() on the instance of EventBus takes two arguments - a destination address (which is a simple string) and the callback handler of type io.vertx.core.Handler<E>, where <E> is an interface of type io.vertx.core.eventbus.Message<T>. When a message arrives at the specified address, the registered handler(s) are invoked with the incoming message.

Executing the Java program Sample05 listed above should generate an output similar to the following:

Output.1

May 18, 2019 11:45:04 AM com.polarsparc.Vertx.Sample05 lambda$0
INFO: Deployed consumer instance ID: 748cbbca-6db9-4c46-bba0-f0775d830526
May 18, 2019 11:45:04 AM com.polarsparc.Vertx.Sample05$MsgSendVerticle start
INFO: Message send to address msg.address
May 18, 2019 11:45:04 AM com.polarsparc.Vertx.Sample05 lambda$1
INFO: Deployed sender instance ID: f5b8e7d7-eba5-41fe-ba02-35f414becdbb
May 18, 2019 11:45:04 AM com.polarsparc.Vertx.Sample05$MsgConsumerVerticle lambda$0
INFO: Received message - Hello from Vert.x

Moving on to the next example, we will demonstrate the Request-Reply communication pattern.

The following is the listing for Sample06.java:

Sample06.java
/*
 * Topic:  Introduction to Vert.x
 * 
 * Name:   Sample 6
 * 
 * Author: Bhaskar S
 * 
 * URL:    https://www.polarsparc.com
 */

package com.polarsparc.Vertx;

import java.util.logging.Level;
import java.util.logging.Logger;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Vertx;

public class Sample06 {
    private static Logger LOGGER = Logger.getLogger(Sample06.class.getName());
    
    private static String ADDRESS = "msg.address";
    private static String MESSAGE = "Hola from Vert.x";
    
    // Producer verticle that expects a reply from the consumer
    private static class MsgSendVerticle extends AbstractVerticle {
        @Override
        public void start() {
            vertx.eventBus().send(ADDRESS, MESSAGE, reply -> {
                if (reply.succeeded()) {
                    LOGGER.log(Level.INFO, "Reply from " + ADDRESS + " => " + reply.result().body());
                } else {
                    reply.cause().printStackTrace();
                }
            });
            
            LOGGER.log(Level.INFO, "Message send to address " + ADDRESS);
        }
    }
    
    // Consumer verticle that sends a reply back to the producer
    private static class MsgConsumerVerticle extends AbstractVerticle {
        @Override
        public void start() {
            vertx.eventBus().consumer(ADDRESS, message -> {
                 LOGGER.log(Level.INFO, "Received message - " + message.body());
                 
                 message.reply("Fantastico !!!");
            });
        }
    }
    
    public static void main(String[] args) {
        Vertx vertx = Vertx.vertx();
        vertx.deployVerticle(new MsgConsumerVerticle(), res1 -> {
            if (res1.succeeded()) {
                LOGGER.log(Level.INFO, "Deployed consumer instance ID: " + res1.result());
                
                vertx.deployVerticle(new MsgSendVerticle(), res2 -> {
                    if (res2.succeeded()) {
                        LOGGER.log(Level.INFO, "Deployed sender instance ID: " + res2.result());
                    } else {
                        res2.cause().printStackTrace();
                    }
                });
            } else {
                res1.cause().printStackTrace();
            }
        });
    }
}

Let us explain and understand the code from Sample06 listed above.

The call to the method send() on the instance of EventBus in this example takes three arguments - a destination address (which is a simple string), the message object to dispatch, and a callback handler of type io.vertx.core.Handler<E>, where <E> is of type io.vertx.core.AsyncResult<Message<T>>. The type AsyncResult encapsulates the reply message.

The call to the method reply() on the instance of Message takes one argument - the reply message object. The message acknowledgement is targetted at the message sender. When the message arrives at the sender, the register reply callback handler is invoked.

Executing the Java program Sample06 listed above should generate an output similar to the following:

Output.2

May 18, 2019 12:05:02 PM com.polarsparc.Vertx.Sample06 lambda$0
INFO: Deployed consumer instance ID: 50fcd279-e23c-4a4f-8a9e-7a0dafd642b4
May 18, 2019 12:05:02 PM com.polarsparc.Vertx.Sample06$MsgSendVerticle start
INFO: Message send to address msg.address
May 18, 2019 12:05:02 PM com.polarsparc.Vertx.Sample06 lambda$1
INFO: Deployed sender instance ID: 6b1e1bd8-f08d-42a4-afe3-946e755b56a4
May 18, 2019 12:05:02 PM com.polarsparc.Vertx.Sample06$MsgConsumerVerticle lambda$0
INFO: Received message - Hola from Vert.x
May 18, 2019 12:05:02 PM com.polarsparc.Vertx.Sample06$MsgSendVerticle lambda$0
INFO: Reply from msg.address => Fantastico !!!

Moving on to the next example, we will demonstrate the Publish-Subscribe communication pattern.

The following is the listing for Sample07.java, which demonstrates a single publisher and multiple consumers:

Sample07.java
/*
 * Topic:  Introduction to Vert.x
 * 
 * Name:   Sample 7
 * 
 * Author: Bhaskar S
 * 
 * URL:    https://www.polarsparc.com
 */

package com.polarsparc.Vertx;

import java.util.logging.Level;
import java.util.logging.Logger;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Vertx;

public class Sample07 {
    private static Logger LOGGER = Logger.getLogger(Sample07.class.getName());
    
    private static String ADDRESS = "msg.address";
    private static String MESSAGE = "Bojour from Vert.x";
    
    // Publisher verticle
    private static class MsgPublisherVerticle extends AbstractVerticle {
        @Override
        public void start(Future<Void> fut) {
            vertx.eventBus().publish(ADDRESS, String.format("[1] %s", MESSAGE));
            vertx.eventBus().publish(ADDRESS, String.format("[2] %s", MESSAGE));
            vertx.eventBus().publish(ADDRESS, String.format("[3] %s", MESSAGE));
            
            LOGGER.log(Level.INFO, String.format("Messages published to address %s", ADDRESS));
            
            fut.complete();
        }
    }
    
    // Consumer verticle
    private static class MsgConsumerVerticle extends AbstractVerticle {
        String name;
        
        MsgConsumerVerticle(String str) {
            this.name = str;
        }
        
        @Override
        public void start() {
            vertx.eventBus().consumer(ADDRESS, res -> {
                 LOGGER.log(Level.INFO, String.format("[%s] - Received message - %s", name, res.body()));
            });
        }
    }
    
    private static Future<Void> deployConsumer(String name, Vertx vertx) {
        Future<Void> fut = Future.future();
        
        vertx.deployVerticle(new MsgConsumerVerticle(name), res -> {
            if (res.succeeded()) {
                LOGGER.log(Level.INFO, "Deployed consumer <" + name + "> with instance ID: " + res.result());
                
                fut.complete();
            } else {
                fut.fail(res.cause());
            }
        });
        
        return fut;
    }
    
    public static void main(String[] args) {
        Vertx vertx = Vertx.vertx();
        
        Future<Void> f1 = deployConsumer("C1", vertx);
        Future<Void> f2 = deployConsumer("C2", vertx);
        
        CompositeFuture.join(f1, f2).setHandler(res -> {
            if (res.succeeded()) {
                LOGGER.log(Level.INFO, "Deployed consumer instances");
            } else {
                res.cause().printStackTrace();
            }
        });
        
        // Wait for deployment of consumers
        try {
            Thread.sleep(1000);
        }
        catch (Exception ex) {
        }
        
        vertx.deployVerticle(new MsgPublisherVerticle(), res -> {
            if (res.succeeded()) {
                LOGGER.log(Level.INFO, "Deployed publisher instance ID: " + res.result());
            } else {
                res.cause().printStackTrace();
            }
        });
    }
}

Let us explain and understand the code from Sample07 listed above.

The call to the method publish() on the instance of EventBus takes two arguments - a destination address (which is a simple string) and the message object. The message object is serialized and sent to the specified address. The message is delivered to *ALL* the handlers that are registered at the specified address.

An instance of type io.vertx.core.CompositeFuture allows one to wrap and coordinate the completion of a list of Future instance(s).

The method join(Future<T>, Future<T>) on the instance of CompositeFuture succeeds when all the specified Future instance(s) complete successfully .

Executing the Java program Sample07 listed above should generate an output similar to the following:

Output.3

May 18, 2019 3:36:55 PM com.polarsparc.Vertx.Sample07 lambda$0
INFO: Deployed consumer  with instance ID: 98b2fee4-01ec-4698-8ddf-eb48155dbf18
May 18, 2019 3:36:55 PM com.polarsparc.Vertx.Sample07 lambda$0
INFO: Deployed consumer  with instance ID: 47afe704-6b7b-4691-a060-c916cc4a1be1
May 18, 2019 3:36:55 PM com.polarsparc.Vertx.Sample07 lambda$1
INFO: Deployed consumer instances
May 18, 2019 3:36:56 PM com.polarsparc.Vertx.Sample07$MsgPublisherVerticle start
INFO: Messages published to address msg.address
May 18, 2019 3:36:56 PM com.polarsparc.Vertx.Sample07 lambda$2
INFO: Deployed publisher instance ID: 0d6aebf9-6585-4aaf-9fa0-1f7b1fada241
May 18, 2019 3:36:56 PM com.polarsparc.Vertx.Sample07$MsgConsumerVerticle lambda$0
INFO: [C2] - Received message - [1] Bojour from Vert.x
May 18, 2019 3:36:56 PM com.polarsparc.Vertx.Sample07$MsgConsumerVerticle lambda$0
INFO: [C1] - Received message - [1] Bojour from Vert.x
May 18, 2019 3:36:56 PM com.polarsparc.Vertx.Sample07$MsgConsumerVerticle lambda$0
INFO: [C2] - Received message - [2] Bojour from Vert.x
May 18, 2019 3:36:56 PM com.polarsparc.Vertx.Sample07$MsgConsumerVerticle lambda$0
INFO: [C1] - Received message - [2] Bojour from Vert.x
May 18, 2019 3:36:56 PM com.polarsparc.Vertx.Sample07$MsgConsumerVerticle lambda$0
INFO: [C2] - Received message - [3] Bojour from Vert.x
May 18, 2019 3:36:56 PM com.polarsparc.Vertx.Sample07$MsgConsumerVerticle lambda$0
INFO: [C1] - Received message - [3] Bojour from Vert.x

More to be covered in the next part of this series ... 😎

References

[1] Introduction to Vert.x - Part-1

[2] Introduction to Vert.x - Part-2

[3] Vert.x Core Manual (Java)



© PolarSPARC