Quick Tour of RabbitMQ - Part 1


Bhaskar S 11/12/2017


Overview

RabbitMQ is an open source, language neutral Message-oriented Middleware that implements the open standard Advanced Message Queuing Protocol (AMQP) specification. AMQP is a wire-level application layer protocol, that specifies and mandates how messages should be queued, routed, and delivered in a reliable and secure way.

Terminology

The following is a high level architectural view of RabbitMQ:

Architecture
Architecture

In this section, we will list and briefly describe some of the terms referred to in this article.

Term Description
Broker server that receives messages from producers and routes the messages to the appropriate consumers
Exchange server component through which a producer connects to send messages. It is like a Mailbox
Queue server component from which consumers get messages. It is the storage buffer (memory and/or disk) for messages
Binding the logical relationship between Exchanges and Queues. It establishes the rules to route messages from Exchanges to appropriate Queues
Channel logical connection between a client (producer or consumer) and the message broker allowing for isolation of communication
Routing Key a string attribute of the message that an Exchange will look at to determine which Queue(s) to route the incoming message to

Setup

The setup will be on a Ubuntu 17.04 LTS based Linux desktop.

Ensure Docker is installed and setup. Else, refer to the article Introduction to Docker.

Assume a hypothetical user alice with the home directory located at /home/alice.

Create a directory called rabbitmq under /home/alice by executing the following command:

mkdir rabbitmq

Create another directory called lib under /home/alice by executing the following command:

mkdir lib

Since we will be implementing code in Java (for demonstration), we need to download some dependent JARs. Download and copy the Slf4J API and Slf4J Simple JARs to /home/alice/lib. Also, download and copy the RabbitMQ Java Client JAR to /home/alice/lib.

For our exploration, we will be downloading and using the official docker image rabbitmq:management.

To pull and download the docker image for RabbitMQ with the management plugin enabled, execute the following command:

docker pull rabbitmq:management

The following should be the typical output:

Output.1

management: Pulling from library/rabbitmq
bc95e04b23c0: Pull complete 
41a230aa3726: Pull complete 
9190ffbd2271: Pull complete 
69e829d8f0d2: Pull complete 
2404945c98d3: Pull complete 
629c315aa810: Pull complete 
cff2b1994ea2: Pull complete 
3246485e4a5d: Pull complete 
dd004dbdde7c: Pull complete 
ef3f0c64cb0d: Pull complete 
f7d9dcfb8c20: Pull complete 
30545194ec57: Pull complete 
Digest: sha256:3840069fcc704db533cd83ec7fad30daabdd2fda321df4fb788b81b6ed2323b2
Status: Downloaded newer image for rabbitmq:management

To launch the docker instance for RabbitMQ with the management plugin enabled, execute the following command:

docker run -d --hostname rabbit-dev --name rabbit-dev -v /home/alice/rabbitmq:/var/lib/rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=rabbitusr -e RABBITMQ_DEFAULT_PASS=s3cr3t rabbitmq:management

The following should be the typical output:

Output.2

f7cfd461ce68256b91e387cc37dde864f752b5c909e70e50c5f1b20c20bf0701

To check if the docker instance for RabbitMQ is up and running, execute the following command:

docker ps

The following should be the typical output:

Output.3

CONTAINER ID    IMAGE                 COMMAND                  CREATED             STATUS              PORTS                                                                     NAMES
f7cfd461ce68    rabbitmq:management   "docker-entrypoint.sh"   7 seconds ago       Up 4 seconds        4369/tcp, 5671-5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp   rabbit-dev

Fire up a web-browser and open the URL http://localhost:15672. The following should be the typical output:

Login
Login

Login to the management console with the user-id rabbitusr and password s3cr3t. The following should be the typical output:

Management
Management

Hands-on with RabbitMQ using Java

An Exchange type controls how incoming messages are routed to the appropriate Queue(s). The simplest of these is the Direct Exchange.

A Direct Exchange routes messages to Queues based on a Routing key. A Routing key is just a string to allow for directed routing. For example, if a payment processor accepts only a Mastercard or a Visa card, then the Routing key with "mastercard" will be routed to the Mastercard verifier, while the Routing key with "visa" will be routed to the Visa verifier.

The following picture illustrates the high level view of the Direct Exchange:

Direct
Direct

Now, to demonstrate the hypothetical Payments system with a payment processor (publisher) and two payment verifiers (consumers - Mastercard and Visa), we will implement the corresponding PaymentProcessor and PaymentVerifier classes.

The following is the common class that captures and exposes some constants:

PaymentConstants.java
/*
 *
 *  Name:        PaymentConstants
 *  
 *  Description: Common constants used between the Payment Processor (publisher) and the payment
 *               verifiers for Mastercard and Visa (consumers)
 *  
 */

package com.polarsparc.rabbitmq.common;

public interface PaymentConstants {
    public final static String DEFAULT_EXCHANGE = "";
    public final static String MASTERCARD_QUEUE_NAME = "mastercard";
    public final static String VISA_QUEUE_NAME       = "visa";
}

The following is the common utility class that implements the commonly used methods between the producer and the consumer classes:

RabbitMqUtils.java
/*
 *
 *  Name:        RabbitMqUtils
 *  
 *  Description: Common utility methods used by producers as well as consumers
 *  
 */

package com.polarsparc.rabbitmq.common;

import java.util.logging.Logger;
import java.util.logging.Level;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public final class RabbitMqUtils {
    private static  Logger _logger = Logger.getLogger(RabbitMqUtils.class.getName());
    
    private static Connection _connection = null;
    
    private RabbitMqUtils() {
    }
    
    public static void initRabbitMq(String host, String user, String pass) 
            throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setUsername(user);
        factory.setPassword(pass);
        
        _connection = factory.newConnection();
    }
    
    public static Channel getRabbitMqChannel() 
            throws Exception {
        if (_connection == null) {
            throw new Exception("RabbitMQ not initialized!");
        }
        
        return _connection.createChannel();
    }
    
    public static void closeRabbitMqChannel(final Channel channel) {
        try {
            if (channel != null) {
                channel.close();
            }
        }
        catch (Exception ex) {
            _logger.log(Level.SEVERE, "Error on channel close!", ex);
        }
    }
    
    public static void cleanRabbitMq() {
        try {
            if (_connection != null) {
                _connection.close();
                _connection = null;
            }
        }
        catch (Exception ex) {
            _logger.log(Level.SEVERE, "Error on connection close!", ex);
        }
    }
}

Let us explain and understand some of the classes/methods used in the RabbitMqUtils code shown above.

The class com.rabbitmq.client.ConnectionFactory is a factory class that is used to configure various options for creating a connection instance to the RabbitMQ message broker. In our case, we are configuring the host name on which the message broker is running, the user-id with which to connect, and the user credential to use.

The method setHost(String) on the factory class instance configures the host name of the RabbitMQ message broker.

The method setUsername(String) on the factory class instance configures the user name to use when connecting to the RabbitMQ message broker.

The method setPassword(String) on the factory class instance configures the user name credential to use when connecting to the RabbitMQ message broker.

The class com.rabbitmq.client.Connection represents a connection to the RabbitMQ message broker, which under-the-hood uses the lower level reliable TCP protocol.

The method newConnection() on the factory class instance creates and returns an instance of the connection to the RabbitMQ message broker.

The class com.rabbitmq.client.Channel represents a logical communication path that is multiplexed on top of an instance of a connection to the RabbitMQ message broker. In some use-cases, an application may need multiple communication paths to the RabbitMQ message broker. Rather than creating multiple com.rabbitmq.client.Connection instances (which is expensive as it uses system network resources), it is much more efficient to create multiple channels.

The method createChannel() on the connection class instance creates and returns an instance of the channel (a communication path to the RabbitMQ message broker).

The method close() on the connection class instance closes the connection path to the RabbitMQ message broker.

The method close() on the channel class instance closes the communication to the RabbitMQ message broker.

The following is the code for the payments processor, which acts as a publisher of payment messages. We declare two channels and corresponding queues and publish two sample messages (one for Mastercard and one for Visa):

PaymentProcessor.java
/*
 *
 *  Name:        PaymentProcessor
 *  
 *  Description: RabbitMQ publisher that generates hypothetical payment messages for either Mastercard
 *               or Visa payment 
 *  
 */

package com.polarsparc.rabbitmq.direct;

import java.util.logging.Logger;
import java.util.logging.Level;

import com.rabbitmq.client.Channel;

import com.polarsparc.rabbitmq.common.PaymentConstants;
import com.polarsparc.rabbitmq.common.PaymentsUtils;

public class PaymentProcessor {
    private static String _MASTERCARD_PAYMENT_MSG = "12345,19.99";
    private static String _VISA_PAYMENT_MSG       = "98765,21.99";
    
    public static void main(String[] args) {
        if (args.length != 3) {
            System.out.printf("Usage: java com.polarsparc.rabbitmq.direct.PaymentProcessor <host> <user> <password>\n");
            System.exit(1);
        }
        
        Logger _logger = Logger.getLogger(PaymentProcessor.class.getName());
        
        try {
            PaymentsUtils.initRabbitMq(args[0], args[1], args[2]);
            
            _logger.log(Level.INFO, "Ready to create communication channels ...");
            
            Channel channel1 = PaymentsUtils.getRabbitMqChannel();
            Channel channel2 = PaymentsUtils.getRabbitMqChannel();
            
            _logger.log(Level.INFO, "Ready to create queues for Mastercard and Visa");
            
            channel1.queueDeclare(PaymentConstants.MASTERCARD_QUEUE_NAME, false, false, false, null);
            channel2.queueDeclare(PaymentConstants.VISA_QUEUE_NAME, false, false, false, null);
            
            _logger.log(Level.INFO, "Ready to publish test messages for Mastercard and Visa");
            
            channel1.basicPublish(PaymentConstants.DEFAULT_EXCHANGE,
                    PaymentConstants.MASTERCARD_QUEUE_NAME,
                    null,
                    _MASTERCARD_PAYMENT_MSG.getBytes());
            channel2.basicPublish(PaymentConstants.DEFAULT_EXCHANGE,
                    PaymentConstants.VISA_QUEUE_NAME,
                    null,
                    _VISA_PAYMENT_MSG.getBytes());
            
            _logger.log(Level.INFO, "Ready to close the communication channels");
            
            PaymentsUtils.closeRabbitMqChannel(channel1);
            PaymentsUtils.closeRabbitMqChannel(channel2);
            
            _logger.log(Level.INFO, "Payment processing done !!!");
        }
        catch (Exception ex) {
            _logger.log(Level.SEVERE, "Error in PaymentProcessor!", ex);
        }
        finally {
            PaymentsUtils.cleanRabbitMq();
        }
    }
}

Let us explain and understand some of the classes/methods used in the PaymentProcessor code shown above.

The method queueDeclare(String, boolean, boolean, boolean, Map) on the channel class instance creates (if it does not already exist) and declares a queue in the RabbitMQ message broker. The first parameter is the queue name. If the second parameter is true, then we are declaring a durable queue. Non-durable queues get cleaned up if the RabbitMQ message broker is shutdown or crashes. If the third parameter is true, then we are declaring an exclusive queue, meaning it is restricted to just this connection. If the fourth parameter is true, then we are declaring an auto delete queue, meaning the server will delete the queue if no longer in use. The last parameter can be used to specify additional properties, such as queue specify message TTL, etc.

The method basicPublish(String, String, AMQP.BasicProperties, byte[]) on the channel class instance allows one to publish a message (specified as a byte array in the last parameter) to the specified routing key (second parameter). The routing key should match the destined queue name. The first parameter specifies the exchange to use. In our case, we use the default RabbitMQ exchange that has no name. The third parameter can be used to specify additional message properties, such as the message priority, correlation id, etc.

The following is the code for the payments verifier, which acts as a consumer of payment messages:

PaymentVerifier.java
/*
 *
 *  Name:        PaymentVerifier
 *  
 *  Description: RabbitMQ Mastercard or Visa (consumer) that receives the hypothetical payment messages
 *  
 */

package com.polarsparc.rabbitmq.direct;

import java.util.logging.Logger;
import java.util.logging.Level;
import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP;

import com.polarsparc.rabbitmq.common.PaymentConstants;
import com.polarsparc.rabbitmq.common.PaymentsUtils;

public class PaymentVerifier {
    public static void main(String[] args) {
        if (args.length != 4) {
            System.out.printf("Usage: java com.polarsparc.rabbitmq.direct.PaymentVerifier <host> <user> <password> <queue>\n");
            System.exit(1);
        }
        
        Logger _logger = Logger.getLogger(PaymentVerifier.class.getName());
        
        try {
            String queue = PaymentConstants.VISA_QUEUE_NAME;
            if (args[3].equalsIgnoreCase(PaymentConstants.MASTERCARD_QUEUE_NAME)) {
                queue = PaymentConstants.MASTERCARD_QUEUE_NAME;
                        
            }
            PaymentsUtils.initRabbitMq(args[0], args[1], args[2]);
            
            _logger.log(Level.INFO, "Ready to create communication channel for " + queue);
            
            Channel channel = PaymentsUtils.getRabbitMqChannel();
            
            _logger.log(Level.INFO, "Ready to create a queue for " + queue);
            
            channel.queueDeclare(queue, false, false, false, null);
            
            _logger.log(Level.INFO, "Ready to create a consumer for " + args[3]);
            
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, 
                        Envelope envelope,
                        AMQP.BasicProperties properties,
                        byte[] body) throws IOException {
                    String msg = new String(body, "UTF-8");
                    _logger.log(Level.INFO, "Received message: " + msg);
                }
            };
            
            _logger.log(Level.INFO, "Ready to consume test messages for " + args[3]);
            
            channel.basicConsume(queue, true, consumer);
        }
        catch (Exception ex) {
            _logger.log(Level.SEVERE, "Error in PaymentVerifier!", ex);
        }
    }
}

Let us explain and understand some of the classes/methods used in the PaymentVerifier code shown above.

The interface com.rabbitmq.client.Consumer defines the application callbacks for messages received on a queue.

The class com.rabbitmq.client.DefaultConsumer is a convenience class that provides a default implementation of the interface com.rabbitmq.client.Consumer. The constructor takes an instance of com.rabbitmq.client.Channel that is associated with the corresponding queue. This class provides dummy (no-op) implementations of the callback methods.

The class com.rabbitmq.client.Envelope encapsulates some of the basic properties such as the exchange through which the message the message was published, the routing key, etc.

The class com.rabbitmq.client.AMQP.BasicProperties encapsulates some of the basic properties associated with the message such as the message id, priority, timestamp, message expiration, etc.

The method handleDelivery(String, Envelope, AMQP.BasicProperties, byte[]) on the class com.rabbitmq.client.DefaultConsumer is called every time a message is received on the corresponding message queue.

The method basicConsume(String, boolean, Consumer) on the channel class instance associates the specified com.rabbitmq.client.DefaultConsumer instance with the specified queue.

Now, for the demostration of messaging using Direct Exchange, open three terminal windows. Lets refer to them as the publisher, consumer-mc, and consumer-v respectively.

In the consumer-mc terminal, execute the following command:

java -cp ./build/classes:./lib/amqp-client-5.0.0.jar:./lib/slf4j-api-1.7.25.jar:./lib/slf4j-simple-1.7.25.jar com.polarsparc.rabbitmq.direct.PaymentVerifier 172.17.0.2 rabbitusr s3cr3t mastercard

The following should be the typical output:

Output.4

Nov 12, 2017 2:51:52 PM com.polarsparc.rabbitmq.direct.PaymentVerifier main
INFO: Ready to create communication channel for mastercard
Nov 12, 2017 2:51:52 PM com.polarsparc.rabbitmq.direct.PaymentVerifier main
INFO: Ready to create a queue for mastercard
Nov 12, 2017 2:51:52 PM com.polarsparc.rabbitmq.direct.PaymentVerifier main
INFO: Ready to create a consumer for mastercard
Nov 12, 2017 2:51:52 PM com.polarsparc.rabbitmq.direct.PaymentVerifier main
INFO: Ready to consume test messages for mastercard

In the consumer-v terminal, execute the following command:

java -cp ./build/classes:./lib/amqp-client-5.0.0.jar:./lib/slf4j-api-1.7.25.jar:./lib/slf4j-simple-1.7.25.jar com.polarsparc.rabbitmq.direct.PaymentVerifier 172.17.0.2 rabbitusr s3cr3t visa

The following should be the typical output:

Output.5

Nov 12, 2017 2:52:32 PM com.polarsparc.rabbitmq.direct.PaymentVerifier main
INFO: Ready to create communication channel for visa
Nov 12, 2017 2:52:32 PM com.polarsparc.rabbitmq.direct.PaymentVerifier main
INFO: Ready to create a queue for visa
Nov 12, 2017 2:52:32 PM com.polarsparc.rabbitmq.direct.PaymentVerifier main
INFO: Ready to create a consumer for visa
Nov 12, 2017 2:52:32 PM com.polarsparc.rabbitmq.direct.PaymentVerifier main
INFO: Ready to consume test messages for visa

In the publisher terminal, execute the following command:

java -cp ./build/classes:./lib/amqp-client-5.0.0.jar:./lib/slf4j-api-1.7.25.jar:./lib/slf4j-simple-1.7.25.jar com.polarsparc.rabbitmq.direct.PaymentProcessor 172.17.0.2 rabbitusr s3cr3t

The following should be the typical output:

Output.6

Nov 12, 2017 2:53:33 PM com.polarsparc.rabbitmq.direct.PaymentProcessor main
INFO: Ready to create communication channels ...
Nov 12, 2017 2:53:33 PM com.polarsparc.rabbitmq.direct.PaymentProcessor main
INFO: Ready to create queues for Mastercard and Visa
Nov 12, 2017 2:53:33 PM com.polarsparc.rabbitmq.direct.PaymentProcessor main
INFO: Ready to publish test messages for Mastercard and Visa
Nov 12, 2017 2:53:33 PM com.polarsparc.rabbitmq.direct.PaymentProcessor main
INFO: Ready to close the communication channels
Nov 12, 2017 2:53:33 PM com.polarsparc.rabbitmq.direct.PaymentProcessor main
INFO: Payment processing done !!!

In the consumer-mc terminal, we should see the following message pop-up:

Output.7

Nov 12, 2017 2:55:41 PM com.polarsparc.rabbitmq.direct.PaymentVerifier$1 handleDelivery
INFO: Received message: 12345,19.99

In the consumer-v terminal, we should see the following message pop-up:

Output.8

Nov 12, 2017 2:55:41 PM com.polarsparc.rabbitmq.direct.PaymentVerifier$1 handleDelivery
INFO: Received message: 98765,21.99

The following picture shows the screenshot of the RabbitMQ web management console:

Web Console
Web Console

We have successfully demonstrated RabbitMQ messaging using the Direct Exchange !!!

References

Introduction to Docker

RabbitMQ Official Site