Quick Tour of RabbitMQ - Part 2


Bhaskar S 11/18/2017


Overview

Continuing from Part-1 of this series, we will demonstrate the Fanout Exchange type in this part.

Hands-on with RabbitMQ using Java

A Fanout Exchange routes a copy of a message to all the Queues bound to it (ignoring the Routing key). In other words, it broadcats incoming messages to all the Queues bound to it. For example, when a credit-card processor processes an incoming transaction message, it publishes the transaction to both the fraud verifier and the analytics engine.

The following picture illustrates the high level view of the Fanout Exchange:

Fanout
Fanout

Now, to demonstrate the hypothetical credit-card processing system with a credit-card processor (publisher) and two consumers - a fraud verifier and an analytics engine, we will implement the corresponding CreditCardProcessor, FraudVerifier, and AnalyticsEngine classes.

The following is the common class that captures and exposes some constants:

CreditConstants.java
/*
 *
 *  Name:        CreditConstants
 *  
 *  Description: Common constants used between the CreditCard Processor (publisher) and the two consumers
 *               Fraud Verifier and Analytics Engine
 *  
 */

package com.polarsparc.rabbitmq.common;

public interface CreditConstants {
    public final static String CREDIT_EXCHANGE      = "credit";
    public final static String ROUTING_KEY          = "";
    public final static String ANALYTICS_QUEUE_NAME = "analytics";
    public final static String FRAUD_QUEUE_NAME     = "fraud";
}

The following is the code for the credit-card processor, which acts as a publisher of credit transaction messages:

CreditCardProcessor.java
/*
 *
 *  Name:        CreditCardProcessor
 *  
 *  Description: The hypothetical credit-card processor that publishes credit transaction messages in a JSON
 *               format to a fanout exchange
 *  
 */

package com.polarsparc.rabbitmq.fanout;

import java.util.logging.Logger;
import java.util.logging.Level;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.AMQP;

import com.polarsparc.rabbitmq.common.CreditConstants;
import com.polarsparc.rabbitmq.common.RabbitMqUtils;

public class CreditCardProcessor {
    private static String _CC_TXN_MESSAGE_1 = String.join("\n",
            "{",
            "'cc_no': '1234-5678-9012-3456'",
            "'txn_id': '0000000001'",
            "'txn_dt': '11/01/2017 10:25:34'",
            "'txn_amt': '112.75'",
            "'merchant_id': '123'",
            "}"
        );
    private static String _CC_TXN_MESSAGE_2 = String.join("\n",
            "{",
            "'cc_no': '9876-5432-2109-8765'",
            "'txn_id': '0000000002'",
            "'txn_dt': '11/02/2017 16:44:21'",
            "'txn_amt': '33.09'",
            "'merchant_id': '456'",
            "}"
        );
    
    public static void main(String[] args) {
        if (args.length != 3) {
            System.out.printf("Usage: java com.polarsparc.rabbitmq.fanout.CreditCardProcessor <host> <user> <password>\n");
            System.exit(1);
        }
        
        Logger _logger = Logger.getLogger(CreditCardProcessor.class.getName());
        
        try {
            String exchange = CreditConstants.CREDIT_EXCHANGE;
            String routingKey = CreditConstants.ROUTING_KEY;
            
            RabbitMqUtils.initRabbitMq(args[0], args[1], args[2]);
            
            _logger.log(Level.INFO, "Ready to create a communication channel ...");
            
            Channel channel = RabbitMqUtils.getRabbitMqChannel();
            
            _logger.log(Level.INFO, "Ready to create a fanout exchange " + exchange);
            
            channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT);
            
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .appId("CreditCardProcessor")
                    .contentType("text/json")
                    .priority(1)
                    .build();
            
            _logger.log(Level.INFO, "Ready to publish test credit transaction messages");
            
            channel.basicPublish(exchange, routingKey, properties, _CC_TXN_MESSAGE_1.getBytes());
            channel.basicPublish(exchange, routingKey, properties, _CC_TXN_MESSAGE_2.getBytes());
            
            _logger.log(Level.INFO, "Ready to close the communication channel");
            
            RabbitMqUtils.closeRabbitMqChannel(channel);
            
            _logger.log(Level.INFO, "Credit-card processing done !!!");
        }
        catch (Exception ex) {
            _logger.log(Level.SEVERE, "Error in CreditCardProcessor!", ex);
        }
        finally {
            RabbitMqUtils.cleanRabbitMq();
        }
    }
}

Let us explain and understand some of the classes/methods used in the CreditCardProcessor code shown above.

The enum com.rabbitmq.client.BuiltinExchangeType defines the exchange types supported in RabbitMQ.

The method exchangeDeclare(String, BuiltinExchangeType) on the channel class instance declares a non-auto-deleteable and non-durable Exchange of the specified type.

The class com.rabbitmq.client.AMQP.BasicProperties.Builder is a builder class, an instance of which allows one to create instance of com.rabbitmq.client.AMQP.BasicProperties with the message properties, such as the message app id, content type, priority, etc.

The following is the code for the fraud verifier, which acts as a consumer of credit-card transaction messages:

FraudVerifier.java
/*
 *
 *  Name:        FraudVerifier
 *  
 *  Description: A hypothetical credit card fraud verifier
 *  
 */

package com.polarsparc.rabbitmq.fanout;

import java.util.logging.Logger;
import java.util.logging.Level;
import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP;

import com.polarsparc.rabbitmq.common.CreditConstants;
import com.polarsparc.rabbitmq.common.RabbitMqUtils;

public class FraudVerifier {
    public static void main(String[] args) {
        if (args.length != 3) {
            System.out.printf("Usage: java com.polarsparc.rabbitmq.fanout.FraudVerifier <host> <user> <password>\n");
            System.exit(1);
        }
        
        Logger _logger = Logger.getLogger(FraudVerifier.class.getName());
        
        try {
            String queue = CreditConstants.FRAUD_QUEUE_NAME;
            String exchange = CreditConstants.CREDIT_EXCHANGE;
            String routingKey = CreditConstants.ROUTING_KEY;
            
            RabbitMqUtils.initRabbitMq(args[0], args[1], args[2]);
            
            _logger.log(Level.INFO, "Ready to create communication channel for " + queue);
            
            Channel channel = RabbitMqUtils.getRabbitMqChannel();
            
            _logger.log(Level.INFO, "Ready to create a fanout exchange " + exchange);
            
            channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT);
            
            _logger.log(Level.INFO, "Ready to bind the queue " + queue + " to exchange " + exchange);
            
            channel.queueDeclare(queue, false, false, false, null);
            channel.queueBind(queue, exchange, routingKey);
            
            _logger.log(Level.INFO, "Ready to create a consumer for " + queue);
            
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, 
                        Envelope envelope,
                        AMQP.BasicProperties properties,
                        byte[] body) throws IOException {
                    String msg = new String(body, "UTF-8");
                    _logger.log(Level.INFO, "Received credit-card message (properties): " + properties);
                    _logger.log(Level.INFO, "Received credit-card message (body): " + msg);
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            
            _logger.log(Level.INFO, "Ready to consume test credit-card messages for " + queue);
            
            channel.basicConsume(queue, false, consumer);
        }
        catch (Exception ex) {
            _logger.log(Level.SEVERE, "Error in FraudVerifier!", ex);
        }
    }
}

Let us explain and understand some of the classes/methods used in the FraudVerifier code shown above.

The method queueBind(String, String, String) binds the specified Queue to the specified Exchange.

The method basicAck(long, boolean) allows one to explicitly acknowledge one or more messages. If the second parameter is true, it means acknowledge all the messages up to and including this message with the specified delivery tag. If false, means just acknowledge this message with the specified delivery tag. In some cases, we may want to acknowledge the receipt of message(s) only after successful processing. In such scenarios, we want to disable auto-acknowledgement by specifying false as the second parameter to the basicConsume method and explicitly calling basicAck after processing the message(s).

The following is the code for the analytics engine, which acts as another consumer of credit-card transaction messages:

AnalyticsEngine.java
/*
 *
 *  Name:        AnalyticsEngine
 *  
 *  Description: A hypothetical analytics engine that consumes all transactions
 *  
 */

package com.polarsparc.rabbitmq.fanout;

import java.util.logging.Logger;
import java.util.logging.Level;
import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP;

import com.polarsparc.rabbitmq.common.CreditConstants;
import com.polarsparc.rabbitmq.common.RabbitMqUtils;

public class AnalyticsEngine {
    public static void main(String[] args) {
        if (args.length != 3) {
            System.out.printf("Usage: java com.polarsparc.rabbitmq.fanout.AnalyticsEngine <host> <user> <password>\n");
            System.exit(1);
        }
        
        Logger _logger = Logger.getLogger(AnalyticsEngine.class.getName());
        
        try {
            String queue = CreditConstants.ANALYTICS_QUEUE_NAME;
            String exchange = CreditConstants.CREDIT_EXCHANGE;
            String routingKey = CreditConstants.ROUTING_KEY;
            
            RabbitMqUtils.initRabbitMq(args[0], args[1], args[2]);
            
            _logger.log(Level.INFO, "Ready to create communication channel for " + queue);
            
            Channel channel = RabbitMqUtils.getRabbitMqChannel();
            
            _logger.log(Level.INFO, "Ready to create a fanout exchange " + exchange);
            
            channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT);
            
            _logger.log(Level.INFO, "Ready to bind the queue " + queue + " to exchange " + exchange);
            
            channel.queueDeclare(queue, false, false, false, null);
            channel.queueBind(queue, exchange, routingKey);
            
            _logger.log(Level.INFO, "Ready to create a consumer for " + queue);
            
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, 
                        Envelope envelope,
                        AMQP.BasicProperties properties,
                        byte[] body) throws IOException {
                    String msg = new String(body, "UTF-8");
                    _logger.log(Level.INFO, "Received credit-card message (body): " + msg);
                }
            };
            
            _logger.log(Level.INFO, "Ready to consume test credit-card messages for " + queue);
            
            channel.basicConsume(queue, true, consumer);
        }
        catch (Exception ex) {
            _logger.log(Level.SEVERE, "Error in AnalyticsEngine!", ex);
        }
    }
}

Now, for the demostration of messaging using Fanout Exchange, open three terminal windows. Lets refer to them as the publisher, consumer-fv, and consumer-ae respectively.

In the consumer-fv terminal, execute the following command:

java -cp ./build/classes:./lib/amqp-client-5.0.0.jar:./lib/slf4j-api-1.7.25.jar:./lib/slf4j-simple-1.7.25.jar com.polarsparc.rabbitmq.fanout.FraudVerifier 172.17.0.2 rabbitusr s3cr3t

The following should be the typical output:

Output.1

Nov 18, 2017 8:01:45 PM com.polarsparc.rabbitmq.fanout.FraudVerifier main
INFO: Ready to create communication channel for fraud
Nov 18, 2017 8:01:45 PM com.polarsparc.rabbitmq.fanout.FraudVerifier main
INFO: Ready to create a fanout exchange credit
Nov 18, 2017 8:01:45 PM com.polarsparc.rabbitmq.fanout.FraudVerifier main
INFO: Ready to bind the queue fraud to exchange credit
Nov 18, 2017 8:01:45 PM com.polarsparc.rabbitmq.fanout.FraudVerifier main
INFO: Ready to create a consumer for fraud
Nov 18, 2017 8:01:45 PM com.polarsparc.rabbitmq.fanout.FraudVerifier main
INFO: Ready to consume test credit-card messages for fraud

In the consumer-ae terminal, execute the following command:

java -cp ./build/classes:./lib/amqp-client-5.0.0.jar:./lib/slf4j-api-1.7.25.jar:./lib/slf4j-simple-1.7.25.jar com.polarsparc.rabbitmq.fanout.AnalyticsEngine 172.17.0.2 rabbitusr s3cr3t

The following should be the typical output:

Output.2

Nov 18, 2017 8:01:50 PM com.polarsparc.rabbitmq.fanout.AnalyticsEngine main
INFO: Ready to create communication channel for analytics
Nov 18, 2017 8:01:50 PM com.polarsparc.rabbitmq.fanout.AnalyticsEngine main
INFO: Ready to create a fanout exchange credit
Nov 18, 2017 8:01:50 PM com.polarsparc.rabbitmq.fanout.AnalyticsEngine main
INFO: Ready to bind the queue analytics to exchange credit
Nov 18, 2017 8:01:50 PM com.polarsparc.rabbitmq.fanout.AnalyticsEngine main
INFO: Ready to create a consumer for analytics

In the publisher terminal, execute the following command:

java -cp ./build/classes:./lib/amqp-client-5.0.0.jar:./lib/slf4j-api-1.7.25.jar:./lib/slf4j-simple-1.7.25.jar com.polarsparc.rabbitmq.fanout.CreditCardProcessor 172.17.0.2 rabbitusr s3cr3t

The following should be the typical output:

Output.3

Nov 18, 2017 8:01:56 PM com.polarsparc.rabbitmq.fanout.CreditCardProcessor main
INFO: Ready to create a communication channel ...
Nov 18, 2017 8:01:56 PM com.polarsparc.rabbitmq.fanout.CreditCardProcessor main
INFO: Ready to create a fanout exchange credit
Nov 18, 2017 8:01:56 PM com.polarsparc.rabbitmq.fanout.CreditCardProcessor main
INFO: Ready to publish test credit transaction messages
Nov 18, 2017 8:01:56 PM com.polarsparc.rabbitmq.fanout.CreditCardProcessor main
INFO: Ready to close the communication channel
Nov 18, 2017 8:01:56 PM com.polarsparc.rabbitmq.fanout.CreditCardProcessor main
INFO: Payment processing done !!!

In the consumer-fv terminal, we should see the following messages pop-up:

Output.4

Nov 18, 2017 8:01:56 PM com.polarsparc.rabbitmq.fanout.FraudVerifier$1 handleDelivery
INFO: Received credit-card message (properties): #contentHeader(content-type=text/json, content-encoding=null, headers=null, delivery-mode=null, priority=1, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=CreditCardProcessor, cluster-id=null)
Nov 18, 2017 8:01:56 PM com.polarsparc.rabbitmq.fanout.FraudVerifier$1 handleDelivery
INFO: Received credit-card message (body): {
'cc_no': '1234-5678-9012-3456'
'txn_id': '0000000001'
'txn_dt': '11/01/2017 10:25:34'
'txn_amt': '112.75'
'merchant_id': '123'
}
Nov 18, 2017 8:01:56 PM com.polarsparc.rabbitmq.fanout.FraudVerifier$1 handleDelivery
INFO: Received credit-card message (properties): #contentHeader(content-type=text/json, content-encoding=null, headers=null, delivery-mode=null, priority=1, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=CreditCardProcessor, cluster-id=null)
Nov 18, 2017 8:01:56 PM com.polarsparc.rabbitmq.fanout.FraudVerifier$1 handleDelivery
INFO: Received credit-card message (body): {
'cc_no': '9876-5432-2109-8765'
'txn_id': '0000000002'
'txn_dt': '11/02/2017 16:44:21'
'txn_amt': '33.09'
'merchant_id': '456'
}

In the consumer-ae terminal, we should see the following messages pop-up:

Output.5

Nov 18, 2017 8:01:56 PM com.polarsparc.rabbitmq.fanout.AnalyticsEngine$1 handleDelivery
INFO: Received credit-card message (body): {
'cc_no': '1234-5678-9012-3456'
'txn_id': '0000000001'
'txn_dt': '11/01/2017 10:25:34'
'txn_amt': '112.75'
'merchant_id': '123'
}
Nov 18, 2017 8:01:56 PM com.polarsparc.rabbitmq.fanout.AnalyticsEngine$1 handleDelivery
INFO: Received credit-card message (body): {
'cc_no': '9876-5432-2109-8765'
'txn_id': '0000000002'
'txn_dt': '11/02/2017 16:44:21'
'txn_amt': '33.09'
'merchant_id': '456'
}

From the consumer-fv and consumer-ae terminals, we infer that both the FraudVerifier and the AnalyticsEngine consumers have received the two credit-card transaction messages that were published by the CreditCardProcessor.

The following picture shows the screenshot of the Exchange tab from the RabbitMQ web management console with our Exchange named credit highlighted:

Web Console
Web Console

We have successfully demonstrated RabbitMQ messaging using the Fanout Exchange !!!

References

Quick Tour of RabbitMQ - Part 1

RabbitMQ Official Site