Quick Tour of RabbitMQ - Part 3


Bhaskar S 11/19/2017


Overview

Continuing from Part-2 of this series, we will demonstrate the Topic Exchange type in this final part of the series.

Hands-on with RabbitMQ using Java

A Topic Exchange routes messages to Queue(s) based on some Routing Key pattern match. Consumer(s) bind to a Topic Exchange specifying a wildcard pattern for the Routing Key. A wildcard pattern can be formed using a list of one or more words plus the charaters '*' (asterisk) or '#' (hash), each separated by a '.' (period).

The wildcard character '*' (asterisk) matches a word at the specified position in the Routing Key, while the wildcard character '#' (hash) matches zero or more words.

For example, the Routing Key wildcard pattern deal.tech.* will match the Routing Keys 'deal.tech.mobile' or 'deal.tech.laptop' but not 'deal' or 'deal.tech'. On the other hand, the Routing Key wildcard pattern deal.# will match the Routing Keys 'deal', 'deal.tech', 'deal.furniture', 'deal.tech.mobile', etc.

The following picture illustrates the high level view of the Topic Exchange:

Topic
Topic

Now, to demonstrate the hypothetical deals alerting system with a deals alerter (publisher) and two consumers - one for all deals and the other for tech deals, we will implement the publisher and consumer classes DealAlerter and DealListener.

The following is the common class that captures and exposes some constants:

DealsConstants.java
/*
 *
 *  Name:        DealsConstants
 *  
 *  Description: Common constants used between the deals alerter (publisher) and the two listeners (consumers)
 *  
 */

package com.polarsparc.rabbitmq.common;

public interface DealsConstants {
    public final static String DEALS_EXCHANGE        = "deals";
    public final static String ROUTING_KEY_PATTERN_1 = "deal.#";
    public final static String ROUTING_KEY_PATTERN_2 = "deal.tech.*";
    public final static String ALL_DEALS_QUEUE_NAME  = "alldeals";
    public final static String TECH_DEALS_QUEUE_NAME = "techdeals";
}

The following is the code for the publisher of the deal alert messages:

DealAlerter.java
/*
 *
 *  Name:        DealAlerter
 *  
 *  Description: The hypothetical deals alerting system that will publish deal alert messages
 *               as a string in a CSV format to a topic exchange
 *  
 */

package com.polarsparc.rabbitmq.topic;

import java.util.logging.Logger;
import java.util.logging.Level;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.AMQP;
import com.polarsparc.rabbitmq.common.DealsConstants;
import com.polarsparc.rabbitmq.common.RabbitMqUtils;

public class DealAlerter {
    private static String[][] _DEAL_ALERTS = {
        { "deal.tech.mobile", "iPhone 8 64GB" , "699.99" , "T-Mobile" },
        { "deal.tech.laptop" , "Dell 15.6 i7 16GB" , "899.99", "Best Buy" },
        { "deal.furniture.desk", "Bush Computer Desk", "249.99", "Amazon" },
        { "deal.airfare", "Round-trip NY JFK", "79.99", "CheapAir" }
    };
    
    public static void main(String[] args) {
        if (args.length != 3) {
            System.out.printf("Usage: java com.polarsparc.rabbitmq.topic.DealAlerter <host> <user> <password>\n");
            System.exit(1);
        }
        
        Logger _logger = Logger.getLogger(DealAlerter.class.getName());
        
        try {
            String exchange = DealsConstants.DEALS_EXCHANGE;
            
            RabbitMqUtils.initRabbitMq(args[0], args[1], args[2]);
            
            _logger.log(Level.INFO, "Ready to create a communication channel ...");
            
            Channel channel = RabbitMqUtils.getRabbitMqChannel();
            
            _logger.log(Level.INFO, "Ready to create a topic exchange " + exchange);
            
            channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC);
            
            _logger.log(Level.INFO, "Ready to publish test deal alert messages");
            
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .appId("DealAlerter")
                    .contentType("text/plain")
                    .expiration("5000")
                    .build();
            
            for (String[] deal : _DEAL_ALERTS) {
                String msg = String.join(",", deal);
                channel.basicPublish(exchange, deal[0], properties, msg.getBytes());
            }
            
            _logger.log(Level.INFO, "Ready to close the communication channel");
            
            RabbitMqUtils.closeRabbitMqChannel(channel);
            
            _logger.log(Level.INFO, "Deal alerts processing done !!!");
        }
        catch (Exception ex) {
            _logger.log(Level.SEVERE, "Error in DealAlerter!", ex);
        }
        finally {
            RabbitMqUtils.cleanRabbitMq();
        }
    }
}

The following is the code for the consumer that will receive specific deal alert messages based on the wildcard Routing Key pattern:

DealListener.java
/*
 *
 *  Name:        DealListener
 *  
 *  Description: A hypothetical deal alert messages listener that will use routing key
 *               wildcard patterns when binding queues
 *  
 */

package com.polarsparc.rabbitmq.topic;

import java.util.logging.Logger;
import java.util.logging.Level;
import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP;
import com.polarsparc.rabbitmq.common.DealsConstants;
import com.polarsparc.rabbitmq.common.RabbitMqUtils;

public class DealListener {
    public static void main(String[] args) {
        if (args.length != 4) {
            System.out.printf("Usage: java com.polarsparc.rabbitmq.topic.DealListener <host> <user> <password> 'all|tech'\n");
            System.exit(1);
        }
        
        Logger _logger = Logger.getLogger(DealListener.class.getName());
        
        try {
            String queue = "all";
            if (args[3].equalsIgnoreCase("tech")) {
                queue = args[3];
            }
            String exchange = DealsConstants.DEALS_EXCHANGE;
            String routingKey = DealsConstants.ROUTING_KEY_PATTERN_1;
            if (args[3].equalsIgnoreCase("tech")) {
                routingKey = DealsConstants.ROUTING_KEY_PATTERN_2;
            }
            
            RabbitMqUtils.initRabbitMq(args[0], args[1], args[2]);
            
            _logger.log(Level.INFO, "Ready to create communication channel for " + queue);
            
            Channel channel = RabbitMqUtils.getRabbitMqChannel();
            
            _logger.log(Level.INFO, "Ready to create a topic exchange " + exchange);
            
            channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC);
            
            _logger.log(Level.INFO, "Ready to bind the queue " + queue + " to exchange " + exchange 
                    + " with routing key " + routingKey);
            
            channel.queueDeclare(queue, false, false, false, null);
            channel.queueBind(queue, exchange, routingKey);
            
            _logger.log(Level.INFO, "Ready to create a consumer for " + queue);
            
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, 
                        Envelope envelope,
                        AMQP.BasicProperties properties,
                        byte[] body) throws IOException {
                    String msg = new String(body, "UTF-8");
                    _logger.log(Level.INFO, "Received deal alert message (body): " + msg);
                }
            };
            
            _logger.log(Level.INFO, "Ready to consume test deal alert messages for " + routingKey);
            
            channel.basicConsume(queue, true, consumer);
        }
        catch (Exception ex) {
            _logger.log(Level.SEVERE, "Error in DealListener!", ex);
        }
    }
}

Now, for the demostration of messaging using Topic Exchange, open three terminal windows. Lets refer to them as the publisher, consumer-all, and consumer-tech respectively.

In the consumer-all terminal, execute the following command:

java -cp ./build/classes:./lib/amqp-client-5.0.0.jar:./lib/slf4j-api-1.7.25.jar:./lib/slf4j-simple-1.7.25.jar com.polarsparc.rabbitmq.topic.DealListener 172.17.0.2 rabbitusr s3cr3t all

The following should be the typical output:

Output.1

Nov 19, 2017 3:38:35 PM com.polarsparc.rabbitmq.topic.DealListener main
INFO: Ready to create communication channel for all
Nov 19, 2017 3:38:35 PM com.polarsparc.rabbitmq.topic.DealListener main
INFO: Ready to create a topic exchange deals
Nov 19, 2017 3:38:35 PM com.polarsparc.rabbitmq.topic.DealListener main
INFO: Ready to bind the queue all to exchange deals with routing key deal.#
Nov 19, 2017 3:38:35 PM com.polarsparc.rabbitmq.topic.DealListener main
INFO: Ready to create a consumer for all
Nov 19, 2017 3:38:35 PM com.polarsparc.rabbitmq.topic.DealListener main
INFO: Ready to consume test deal alert messages for deal.#

In the consumer-tech terminal, execute the following command:

java -cp ./build/classes:./lib/amqp-client-5.0.0.jar:./lib/slf4j-api-1.7.25.jar:./lib/slf4j-simple-1.7.25.jar com.polarsparc.rabbitmq.topic.DealListener 172.17.0.2 rabbitusr s3cr3t tech

The following should be the typical output:

Output.2

Nov 19, 2017 3:39:12 PM com.polarsparc.rabbitmq.topic.DealListener main
INFO: Ready to create communication channel for tech
Nov 19, 2017 3:39:12 PM com.polarsparc.rabbitmq.topic.DealListener main
INFO: Ready to create a topic exchange deals
Nov 19, 2017 3:39:12 PM com.polarsparc.rabbitmq.topic.DealListener main
INFO: Ready to bind the queue tech to exchange deals with routing key deal.tech.*
Nov 19, 2017 3:39:12 PM com.polarsparc.rabbitmq.topic.DealListener main
INFO: Ready to create a consumer for tech
Nov 19, 2017 3:39:12 PM com.polarsparc.rabbitmq.topic.DealListener main
INFO: Ready to consume test deal alert messages for deal.tech.*

In the publisher terminal, execute the following command:

java -cp ./build/classes:./lib/amqp-client-5.0.0.jar:./lib/slf4j-api-1.7.25.jar:./lib/slf4j-simple-1.7.25.jar com.polarsparc.rabbitmq.topic.DealAlerter 172.17.0.2 rabbitusr s3cr3t

The following should be the typical output:

Output.3

Nov 19, 2017 3:41:12 PM com.polarsparc.rabbitmq.topic.DealAlerter main
INFO: Ready to create a communication channel ...
Nov 19, 2017 3:41:12 PM com.polarsparc.rabbitmq.topic.DealAlerter main
INFO: Ready to create a topic exchange deals
Nov 19, 2017 3:41:12 PM com.polarsparc.rabbitmq.topic.DealAlerter main
INFO: Ready to publish test deal alert messages
Nov 19, 2017 3:41:12 PM com.polarsparc.rabbitmq.topic.DealAlerter main
INFO: Ready to close the communication channel
Nov 19, 2017 3:41:12 PM com.polarsparc.rabbitmq.topic.DealAlerter main
INFO: Deal alerts processing done !!!

In the consumer-all terminal, we should see the following messages pop-up:

Output.4

Nov 19, 2017 3:41:12 PM com.polarsparc.rabbitmq.topic.DealListener$1 handleDelivery
INFO: Received deal alert message (body): deal.tech.mobile,iPhone 8 64GB,699.99,T-Mobile
Nov 19, 2017 3:41:12 PM com.polarsparc.rabbitmq.topic.DealListener$1 handleDelivery
INFO: Received deal alert message (body): deal.tech.laptop,Dell 15.6 i7 16GB,899.99,Best Buy
Nov 19, 2017 3:41:12 PM com.polarsparc.rabbitmq.topic.DealListener$1 handleDelivery
INFO: Received deal alert message (body): deal.furniture.desk,Bush Computer Desk,249.99,Amazon
Nov 19, 2017 3:41:12 PM com.polarsparc.rabbitmq.topic.DealListener$1 handleDelivery
INFO: Received deal alert message (body): deal.airfare,Round-trip NY JFK,79.99,CheapAir

In the consumer-tech terminal, we should see the following messages pop-up:

Output.5

Nov 19, 2017 3:41:12 PM com.polarsparc.rabbitmq.topic.DealListener$1 handleDelivery
INFO: Received deal alert message (body): deal.tech.mobile,iPhone 8 64GB,699.99,T-Mobile
Nov 19, 2017 3:41:12 PM com.polarsparc.rabbitmq.topic.DealListener$1 handleDelivery
INFO: Received deal alert message (body): deal.tech.laptop,Dell 15.6 i7 16GB,899.99,Best Buy

From the consumer-all terminal, we infer that the DealListener consumer received all the deal alert messages that were published by the DealAlerter.

From the consumer-tech terminal, we infer that the DealListener consumer received *ONLY* the tech deal alert messages that were published by the DealAlerter.

We have successfully demonstrated RabbitMQ messaging using the Topic Exchange !!!

References

Quick Tour of RabbitMQ - Part 1

Quick Tour of RabbitMQ - Part 2

RabbitMQ Official Site