PolarSPARC

Spring Integration Notes :: Part - 2


Bhaskar S 04/24/2021 (UPDATED)


Overview

In Part-1, we covered the basics of Spring Integration and got our hands dirty with two simple examples.

We will continue our journey to explore the following capabilities in Spring Integration:

Setup

To setup the Java directory structure for the demonstrations in this part, execute the following commands:

$ cd $HOME/java/SpringIntegration

$ mkdir -p src/main/java/com/polarsparc/si/p2

$ mkdir -p src/main/resources/p2


Hands-on Spring Integration

Logging Messages

Spring Integration provides facilities for intercepting and logging messages that flow through a channel. This can be useful for audit or debugging purposes.

XML based Approach

For demonstrating the logging capability, we will leverage the handler class defined in Listing.1 and the custom gateway interface defined in Listing.7 from Part-1.

The following is the XML based Spring Integration configuration that wires up the channels and the endpoint:


Listing.12
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:int="http://www.springframework.org/schema/integration"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
                            http://www.springframework.org/schema/beans/spring-beans.xsd
                            http://www.springframework.org/schema/integration
                            http://www.springframework.org/schema/integration/spring-integration.xsd">

    <bean id="inMsgHandler" class="com.polarsparc.si.p1.StrUpCaseHandler" />

    <int:channel id="inChannel">
        <int:interceptors>
            <int:wire-tap channel="logger1" />
        </int:interceptors>
    </int:channel>

    <int:channel id="outChannel">
        <int:queue capacity="5" />

        <int:interceptors>
            <int:wire-tap channel="logger2" />
        </int:interceptors>
    </int:channel>

    <int:logging-channel-adapter id="logger1" level="INFO" />

    <int:logging-channel-adapter id="logger2" level="INFO" log-full-message="true" />

    <!--
        The primary purpose of a Gateway is to hide the messaging API provided
        by Spring Integration
    -->

    <int:gateway id="gateway"
                  service-interface="com.polarsparc.si.p1.StrUpCaseGateway"
                  default-request-channel="inChannel"
                  default-reply-channel="outChannel" />

    <int:service-activator input-channel="inChannel"
                            output-channel="outChannel"
                            ref="inMsgHandler"
                            method="handler" />

</beans>

Some aspects of the Listing.12 from the above needs a little explanation.

The following is our main application to test logging of messages:


Listing.13
/*
 * Name:   StrUpCase3MainXml
 * Author: Bhaskar S
 * Date:   04/24/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p2;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.polarsparc.si.p1.StrUpCaseGateway;

public class StrUpCase3MainXml {
    private static final Logger LOGGER = LoggerFactory.getLogger(StrUpCase3MainXml.class);

    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("p2/StrUpCase3.xml");

        StrUpCaseGateway gateway = (StrUpCaseGateway) context.getBean("gateway");
        gateway.send("Spring Integration using WireTap with Xml");

        String msg = gateway.receive();

        LOGGER.info("OUTPUT: {}", msg);
    }
}

To execute the code from Listing.13, open a terminal window and run the following commands:

$ cd $HOME/java/SpringIntegration

$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p2.StrUpCase3MainXml"

The following would be the typical output:

Output.5

[INFO] Scanning for projects...
[INFO] 
[INFO] ----------------< com.polarsparc.si:SpringIntegration >-----------------
[INFO] Building SpringIntegration 1.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration ---
2021-04-24 11:31:27:533 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-04-24 11:31:27:537 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-04-24 11:31:27:544 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-04-24 11:31:27:790 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler'
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.springframework.util.ReflectionUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to constructor java.lang.invoke.MethodHandles$Lookup(java.lang.Class)
WARNING: Please consider reporting this to the maintainers of org.springframework.util.ReflectionUtils
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2021-04-24 11:31:27:881 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:logger1.adapter} as a subscriber to the 'logger1' channel
2021-04-24 11:31:27:882 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7a15dd75.logger1' has 1 subscriber(s).
2021-04-24 11:31:27:883 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'logger1.adapter'; defined in: 'class path resource [p2/StrUpCase3.xml]'; from source: ''int:logging-channel-adapter' with id='logger1''
2021-04-24 11:31:27:884 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:logger2.adapter} as a subscriber to the 'logger2' channel
2021-04-24 11:31:27:884 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7a15dd75.logger2' has 1 subscriber(s).
2021-04-24 11:31:27:885 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'logger2.adapter'; defined in: 'class path resource [p2/StrUpCase3.xml]'; from source: ''int:logging-channel-adapter' with id='logger2''
2021-04-24 11:31:27:886 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator} as a subscriber to the 'inChannel' channel
2021-04-24 11:31:27:887 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7a15dd75.inChannel' has 1 subscriber(s).
2021-04-24 11:31:27:887 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2021-04-24 11:31:27:888 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-04-24 11:31:27:889 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7a15dd75.errorChannel' has 1 subscriber(s).
2021-04-24 11:31:27:890 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger'
2021-04-24 11:31:27:891 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#send(String)'
2021-04-24 11:31:27:891 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#receive()'
2021-04-24 11:31:27:892 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway'
2021-04-24 11:31:27:905 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.handler.LoggingHandler - Spring Integration using WireTap with Xml
2021-04-24 11:31:27:906 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
2021-04-24 11:31:27:911 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO com.polarsparc.si.p1.StrUpCaseHandler - INPUT: msg = Spring Integration using WireTap with Xml
2021-04-24 11:31:27:913 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.handler.LoggingHandler - GenericMessage [payload=SPRING INTEGRATION USING WIRETAP WITH XML, headers={replyChannel=nullChannel, id=3423703a-f024-185c-d469-bc49e1750f1c, timestamp=1619278287913}]
2021-04-24 11:31:27:914 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO com.polarsparc.si.p2.StrUpCase3MainXml - OUTPUT: SPRING INTEGRATION USING WIRETAP WITH XML
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  1.552 s
[INFO] Finished at: 2021-04-24T11:31:27-04:00
[INFO] ------------------------------------------------------------------------
Java Config based Approach

The following is the Java Config based POJO that defines the input and output channels, the two logging-channel-adapters, and the two wire-taps similar to the one defined in the XML configuration file of Listing.12 above:


Listing.14
/*
 * Name:   StrUpCase3Config
 * Author: Bhaskar S
 * Date:   04/24/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p2;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.channel.interceptor.WireTap;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.messaging.MessageChannel;

@Configuration
@EnableIntegration
public class StrUpCase3Config {
    @Bean
    public MessageChannel inChannel() {
        AbstractMessageChannel channel = new DirectChannel();
        channel.addInterceptor(wireTapOne());
        return channel;
    }

    @Bean
    public MessageChannel outChannel() {
        AbstractMessageChannel channel = new QueueChannel(5);
        channel.addInterceptor(wireTapTwo());
        return channel;
    }

    @Bean
    @ServiceActivator(inputChannel = "logger1")
    public LoggingHandler loggerOne() {
        return new LoggingHandler(LoggingHandler.Level.INFO);
    }

    @Bean
    @ServiceActivator(inputChannel = "logger2")
    public LoggingHandler loggerTwo() {
        LoggingHandler logger = new LoggingHandler(LoggingHandler.Level.INFO);
        logger.setShouldLogFullMessage(true);
        return logger;
    }

    @Bean
    public WireTap wireTapOne() {
        return new WireTap("logger1");
    }

    @Bean
    public WireTap wireTapTwo() {
        return new WireTap("logger2");
    }
}

Some aspects of the Listing.14 from the above needs a little explanation.

And finally, the following is the main application that uses the POJOs from Listing.5, Listing.10 and Listing.14 to publish and consume data:


Listing.15
/*
 * Name:   StrUpCase3MainConfig
 * Author: Bhaskar S
 * Date:   04/24/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p2;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import com.polarsparc.si.p1.StrUpCaseGateway2;
import com.polarsparc.si.p1.StrUpCaseHandler2;

public class StrUpCase3MainConfig {
    private static final Logger LOGGER = LoggerFactory.getLogger(StrUpCase3MainConfig.class);

    public static void main(String[] args) {
        ApplicationContext context = new AnnotationConfigApplicationContext(StrUpCase3Config.class,
                StrUpCaseHandler2.class, StrUpCaseGateway2.class);

        StrUpCaseGateway2 gateway = (StrUpCaseGateway2) context.getBean("gateway");
        gateway.send("Spring Integration using WireTap with Config");

        String msg = gateway.receive();

        LOGGER.info("OUTPUT: {}", msg);
    }
}

To execute the code from Listing.15, open a terminal window and run the following commands:

$ cd $HOME/java/SpringIntegration

$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p2.StrUpCase3MainConfig"

The following would be the typical output:

Output.6

[INFO] Scanning for projects...
[INFO] 
[INFO] ----------------< com.polarsparc.si:SpringIntegration >-----------------
[INFO] Building SpringIntegration 1.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration ---
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.springframework.cglib.core.ReflectUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to method java.lang.ClassLoader.defineClass(java.lang.String,byte[],int,int,java.security.ProtectionDomain)
WARNING: Please consider reporting this to the maintainers of org.springframework.cglib.core.ReflectUtils
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2021-04-24 11:33:51:221 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-04-24 11:33:51:225 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-04-24 11:33:51:230 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-04-24 11:33:51:268 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-04-24 11:33:51:307 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-04-24 11:33:51:470 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler'
2021-04-24 11:33:51:568 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-04-24 11:33:51:569 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@396abb96.errorChannel' has 1 subscriber(s).
2021-04-24 11:33:51:570 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger'
2021-04-24 11:33:51:571 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {message-handler:strUpCase3Config.loggerOne.serviceActivator} as a subscriber to the 'logger1' channel
2021-04-24 11:33:51:571 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@396abb96.logger1' has 1 subscriber(s).
2021-04-24 11:33:51:572 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'strUpCase3Config.loggerOne.serviceActivator'
2021-04-24 11:33:51:573 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {message-handler:strUpCase3Config.loggerTwo.serviceActivator} as a subscriber to the 'logger2' channel
2021-04-24 11:33:51:574 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@396abb96.logger2' has 1 subscriber(s).
2021-04-24 11:33:51:574 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'strUpCase3Config.loggerTwo.serviceActivator'
2021-04-24 11:33:51:575 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator:strUpCaseHandler2.handler.serviceActivator} as a subscriber to the 'inChannel' channel
2021-04-24 11:33:51:576 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@396abb96.inChannel' has 1 subscriber(s).
2021-04-24 11:33:51:577 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'strUpCaseHandler2.handler.serviceActivator'
2021-04-24 11:33:51:578 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#send(String)'
2021-04-24 11:33:51:579 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#receive()'
2021-04-24 11:33:51:579 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway'
2021-04-24 11:33:51:592 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.handler.LoggingHandler - Spring Integration using WireTap with Config
2021-04-24 11:33:51:593 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
2021-04-24 11:33:51:597 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO com.polarsparc.si.p1.StrUpCaseHandler2 - INPUT: msg = Spring Integration using WireTap with Config
2021-04-24 11:33:51:598 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.handler.LoggingHandler - GenericMessage [payload=SPRING INTEGRATION USING WIRETAP WITH CONFIG, headers={replyChannel=nullChannel, id=ef5b1978-35d8-7064-e500-bdf5da953e8f, timestamp=1619278431598}]
2021-04-24 11:33:51:600 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO com.polarsparc.si.p2.StrUpCase3MainConfig - OUTPUT: SPRING INTEGRATION USING WIRETAP WITH CONFIG
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  1.258 s
[INFO] Finished at: 2021-04-24T11:33:51-04:00
[INFO] ------------------------------------------------------------------------

As can be inferred from the Output.5 and Output.6 above, Spring Integration is using LoggingHandler to log messages intercepted by the wire-tap.

Multi-Threading

By default, Spring Integration uses the caller thread to process messages from end-to-end. There are situations when one needs to use a pool of thread(s) to process a stream of messages.

XML based Approach

For demonstrating the multi-threading capability, we will leverage the custom gateway interface defined in Listing.7 from Part-1.

The following is the POJO that converts the input text to upper-case and introduces an artificial delay of 1000ms. This will act as an endpoint:


Listing.16
/*
 * Name:   StrUpCaseHandler3
 * Author: Bhaskar S
 * Date:   04/16/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p2;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StrUpCaseHandler3 {
    private static final Logger LOGGER = LoggerFactory.getLogger(StrUpCaseHandler3.class);

    public String handler(String msg) {
        LOGGER.info("INPUT: msg = {}", msg);

        try {
            Thread.sleep(1000);
        }
        catch (Exception ignored) {
        }

        return msg.toUpperCase();
    }
}

The following is the XML based Spring Integration configuration that wires up the channels and the endpoint:


Listing.17
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:task="http://www.springframework.org/schema/task"
        xmlns:int="http://www.springframework.org/schema/integration"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
                            http://www.springframework.org/schema/beans/spring-beans.xsd
                            http://www.springframework.org/schema/task
                            http://www.springframework.org/schema/task/spring-task.xsd
                            http://www.springframework.org/schema/integration
                            http://www.springframework.org/schema/integration/spring-integration.xsd">

    <bean id="inMsgHandler" class="com.polarsparc.si.p2.StrUpCaseHandler3" />

    <task:executor id="executor" queue-capacity="5" pool-size="3" />

    <int:channel id="inChannel1">
        <int:dispatcher task-executor="executor" />
    </int:channel>

    <int:channel id="outChannel1">
        <int:queue capacity="5" />
    </int:channel>

    <!--
        The primary purpose of a Gateway is to hide the messaging API provided
        by Spring Integration
    -->

    <int:gateway id="gateway"
                  service-interface="com.polarsparc.si.p1.StrUpCaseGateway"
                  default-request-channel="inChannel1"
                  default-reply-channel="outChannel1" />

    <int:service-activator input-channel="inChannel1"
                            output-channel="outChannel1"
                            ref="inMsgHandler"
                            method="handler" />

</beans>

Some aspects of the Listing.17 from the above needs a little explanation.

The following is our main application to test the multi-threaded execution:


Listing.18
/*
 * Name:   StrUpCase4MainXml
 * Author: Bhaskar S
 * Date:   04/24/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p2;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.polarsparc.si.p1.StrUpCaseGateway;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

public class StrUpCase4MainXml {
    private static final Logger LOGGER = LoggerFactory.getLogger(StrUpCase4MainXml.class);

    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("p2/StrUpCase4.xml");

        StrUpCaseGateway gateway = (StrUpCaseGateway) context.getBean("gateway");

        for (int i = 1; i <= 5; i++) {
            gateway.send("Spring Integration using Executor - " + i + " with Xml");
        }

        for (int i = 1; i <= 5; i++) {
            String msg = gateway.receive();

            LOGGER.info("OUTPUT: {}", msg);
        }

        ThreadPoolTaskExecutor executor = (ThreadPoolTaskExecutor) context.getBean("executor");
        while (executor.getActiveCount() != 0) {
            try {
                Thread.sleep(1000);
            }
            catch (Exception ignored) {
            }
        }
        executor.shutdown();
    }
}

To execute the code from Listing.18, open a terminal window and run the following commands:

$ cd $HOME/java/SpringIntegration

$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p2.StrUpCase4MainXml"

The following would be the typical output:

Output.7

[INFO] Scanning for projects...
[INFO] 
[INFO] ----------------< com.polarsparc.si:SpringIntegration >-----------------
[INFO] Building SpringIntegration 1.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration ---
2021-04-24 12:22:50:277 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-04-24 12:22:50:280 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-04-24 12:22:50:284 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-04-24 12:22:50:322 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService
2021-04-24 12:22:50:465 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler'
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.springframework.util.ReflectionUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to constructor java.lang.invoke.MethodHandles$Lookup(java.lang.Class)
WARNING: Please consider reporting this to the maintainers of org.springframework.util.ReflectionUtils
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2021-04-24 12:22:50:558 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator} as a subscriber to the 'inChannel1' channel
2021-04-24 12:22:50:558 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO org.springframework.integration.channel.ExecutorChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7a15dd75.inChannel1' has 1 subscriber(s).
2021-04-24 12:22:50:559 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2021-04-24 12:22:50:560 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-04-24 12:22:50:560 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7a15dd75.errorChannel' has 1 subscriber(s).
2021-04-24 12:22:50:561 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger'
2021-04-24 12:22:50:562 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#send(String)'
2021-04-24 12:22:50:563 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#receive()'
2021-04-24 12:22:50:563 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway'
2021-04-24 12:22:50:578 [executor-1] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
2021-04-24 12:22:50:580 [executor-3] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
2021-04-24 12:22:50:581 [executor-2] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
2021-04-24 12:22:50:583 [executor-1] INFO com.polarsparc.si.p2.StrUpCaseHandler3 - INPUT: msg = Spring Integration using Executor - 1 with Xml
2021-04-24 12:22:50:583 [executor-3] INFO com.polarsparc.si.p2.StrUpCaseHandler3 - INPUT: msg = Spring Integration using Executor - 3 with Xml
2021-04-24 12:22:50:583 [executor-2] INFO com.polarsparc.si.p2.StrUpCaseHandler3 - INPUT: msg = Spring Integration using Executor - 2 with Xml
2021-04-24 12:22:51:586 [executor-3] INFO com.polarsparc.si.p2.StrUpCaseHandler3 - INPUT: msg = Spring Integration using Executor - 4 with Xml
2021-04-24 12:22:51:586 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO com.polarsparc.si.p2.StrUpCase4MainXml - OUTPUT: SPRING INTEGRATION USING EXECUTOR - 1 WITH XML
2021-04-24 12:22:51:586 [executor-2] INFO com.polarsparc.si.p2.StrUpCaseHandler3 - INPUT: msg = Spring Integration using Executor - 5 with Xml
2021-04-24 12:22:51:588 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO com.polarsparc.si.p2.StrUpCase4MainXml - OUTPUT: SPRING INTEGRATION USING EXECUTOR - 3 WITH XML
2021-04-24 12:22:51:590 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO com.polarsparc.si.p2.StrUpCase4MainXml - OUTPUT: SPRING INTEGRATION USING EXECUTOR - 2 WITH XML
2021-04-24 12:22:52:588 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO com.polarsparc.si.p2.StrUpCase4MainXml - OUTPUT: SPRING INTEGRATION USING EXECUTOR - 4 WITH XML
2021-04-24 12:22:52:589 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO com.polarsparc.si.p2.StrUpCase4MainXml - OUTPUT: SPRING INTEGRATION USING EXECUTOR - 5 WITH XML
2021-04-24 12:22:52:590 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Shutting down ExecutorService
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  3.221 s
[INFO] Finished at: 2021-04-24T12:22:52-04:00
[INFO] ------------------------------------------------------------------------
Java Config based Approach

The following is the Java Config based POJO that acts as an endpoint that converts the input text to upper-case (with an artificially introduced delay):


Listing.19
/*
 * Name:   StrUpCaseHandler4
 * Author: Bhaskar S
 * Date:   04/24/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p2;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;

@Configuration
@EnableIntegration
public class StrUpCaseHandler4 {
    private static final Logger LOGGER = LoggerFactory.getLogger(StrUpCaseHandler4.class);

    @ServiceActivator(inputChannel = "inChannel", outputChannel = "outChannel")
    public String handler(String msg) {
        LOGGER.info("INPUT: msg = {}", msg);

        try {
            Thread.sleep(1000);
        }
        catch (Exception ignored) {
        }

        return msg.toUpperCase();
    }
}

The following is the Java Config based POJO that defines the input and output channels and the executor thread pool similar to the one defined in the XML configuration file of Listing.17 above:


Listing.20
/*
 * Name:   StrUpCase4Config
 * Author: Bhaskar S
 * Date:   04/24/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p2;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.messaging.MessageChannel;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
@EnableIntegration
public class StrUpCase4Config {
    @Bean
    public MessageChannel inChannel() {
        return new ExecutorChannel(threadPoolTaskExecutor());
    }

    @Bean
    public MessageChannel outChannel() {
        return new QueueChannel(5);
    }

    @Bean(name = "executor")
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setQueueCapacity(5);
        executor.setCorePoolSize(3);
        executor.setBeanName("executor");
        return executor;
    }
}

Some aspects of the Listing.20 from the above needs a little explanation.

And finally, the following is the main application that uses the POJOs from Listing.10, Listing.19, and Listing.20 to publish data that is consumed and processed by a pool of threads:


Listing.21
/*
 * Name:   StrUpCase4MainConfig
 * Author: Bhaskar S
 * Date:   04/24/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p2;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import com.polarsparc.si.p1.StrUpCaseGateway2;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

public class StrUpCase4MainConfig {
    private static final Logger LOGGER = LoggerFactory.getLogger(StrUpCase4MainConfig.class);

    public static void main(String[] args) {
        ApplicationContext context = new AnnotationConfigApplicationContext(StrUpCase4Config.class,
                StrUpCaseHandler4.class, StrUpCaseGateway2.class);

        StrUpCaseGateway2 gateway = (StrUpCaseGateway2) context.getBean("gateway");

        for (int i = 1; i <= 5; i++) {
            gateway.send("Spring Integration using Executor - " + i + " with Config");
        }

        for (int i = 1; i <= 5; i++) {
            String msg = gateway.receive();

            LOGGER.info("OUTPUT: {}", msg);
        }

        ThreadPoolTaskExecutor executor = (ThreadPoolTaskExecutor) context.getBean("executor");
        while (executor.getActiveCount() != 0) {
            try {
                Thread.sleep(1000);
            }
            catch (Exception ignored) {
            }
        }
        executor.shutdown();
    }
}

To execute the code from Listing.21, open a terminal window and run the following commands:

$ cd $HOME/java/SpringIntegration

$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p2.StrUpCase4MainConfig"

The following would be the typical output:

Output.8

[INFO] Scanning for projects...
[INFO] 
[INFO] ----------------< com.polarsparc.si:SpringIntegration >-----------------
[INFO] Building SpringIntegration 1.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration ---
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.springframework.cglib.core.ReflectUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to method java.lang.ClassLoader.defineClass(java.lang.String,byte[],int,int,java.security.ProtectionDomain)
WARNING: Please consider reporting this to the maintainers of org.springframework.cglib.core.ReflectUtils
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2021-04-24 12:27:25:339 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-04-24 12:27:25:345 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-04-24 12:27:25:348 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-04-24 12:27:25:384 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-04-24 12:27:25:423 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-04-24 12:27:25:465 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService 'executor'
2021-04-24 12:27:25:589 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler'
2021-04-24 12:27:25:688 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-04-24 12:27:25:690 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@396abb96.errorChannel' has 1 subscriber(s).
2021-04-24 12:27:25:691 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger'
2021-04-24 12:27:25:691 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator:strUpCaseHandler4.handler.serviceActivator} as a subscriber to the 'inChannel' channel
2021-04-24 12:27:25:692 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.integration.channel.ExecutorChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@396abb96.inChannel' has 1 subscriber(s).
2021-04-24 12:27:25:693 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'strUpCaseHandler4.handler.serviceActivator'
2021-04-24 12:27:25:695 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#send(String)'
2021-04-24 12:27:25:695 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#receive()'
2021-04-24 12:27:25:696 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway'
2021-04-24 12:27:25:715 [executor-1] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
2021-04-24 12:27:25:718 [executor-3] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
2021-04-24 12:27:25:718 [executor-2] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
2021-04-24 12:27:25:721 [executor-3] INFO com.polarsparc.si.p2.StrUpCaseHandler4 - INPUT: msg = Spring Integration using Executor - 3 with Config
2021-04-24 12:27:25:721 [executor-2] INFO com.polarsparc.si.p2.StrUpCaseHandler4 - INPUT: msg = Spring Integration using Executor - 2 with Config
2021-04-24 12:27:25:721 [executor-1] INFO com.polarsparc.si.p2.StrUpCaseHandler4 - INPUT: msg = Spring Integration using Executor - 1 with Config
2021-04-24 12:27:26:723 [executor-1] INFO com.polarsparc.si.p2.StrUpCaseHandler4 - INPUT: msg = Spring Integration using Executor - 4 with Config
2021-04-24 12:27:26:724 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO com.polarsparc.si.p2.StrUpCase4MainConfig - OUTPUT: SPRING INTEGRATION USING EXECUTOR - 3 WITH CONFIG
2021-04-24 12:27:26:724 [executor-2] INFO com.polarsparc.si.p2.StrUpCaseHandler4 - INPUT: msg = Spring Integration using Executor - 5 with Config
2021-04-24 12:27:26:726 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO com.polarsparc.si.p2.StrUpCase4MainConfig - OUTPUT: SPRING INTEGRATION USING EXECUTOR - 1 WITH CONFIG
2021-04-24 12:27:26:727 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO com.polarsparc.si.p2.StrUpCase4MainConfig - OUTPUT: SPRING INTEGRATION USING EXECUTOR - 2 WITH CONFIG
2021-04-24 12:27:27:725 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO com.polarsparc.si.p2.StrUpCase4MainConfig - OUTPUT: SPRING INTEGRATION USING EXECUTOR - 4 WITH CONFIG
2021-04-24 12:27:27:726 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO com.polarsparc.si.p2.StrUpCase4MainConfig - OUTPUT: SPRING INTEGRATION USING EXECUTOR - 5 WITH CONFIG
2021-04-24 12:27:27:728 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Shutting down ExecutorService 'executor'
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  3.220 s
[INFO] Finished at: 2021-04-24T12:27:27-04:00
[INFO] ------------------------------------------------------------------------

As can be inferred from the Output.7 and Output.8 above, Spring Integration is using the configured thread pool executor to process messages.

Exception Handling

In Spring Integration, when a message is processed by an endpoint via a synchronous channel (without any queue or executor), any exception(s) thrown by the endpoint is sent back to the caller in the callers thread. In the case of an asynchronous channel, the message is processed by the endpoint in a thread that is different from the callers thread and the only way to handle the exception(s) is to configure an error-channel, where the exception(s) can be logged.

XML based Approach

For demonstrating the error handling capability, we will use a different example - the use-case of checking if a user-id is valid and throwing an exception if not valid.

The following is the POJO that checks if the specified user-id is valid. Else, it throws a run-time exception:


Listing.22
/*
 * Name:   CheckUserHandler
 * Author: Bhaskar S
 * Date:   04/24/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p2;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;

public class CheckUserHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(CheckUserHandler.class);

    List users = Arrays.asList("alice", "bob", "charlie");

    public boolean checkUser(String id) {
        LOGGER.info("Check user = {}", id);

        if (!users.contains(id)) {
            throw new RuntimeException("User " + id + " is invalid");
        }

        return true;
    }
}

The following is the POJO that handles the run-time exception:


Listing.23
/*
 * Name:   CheckUserErrorHandler
 * Author: Bhaskar S
 * Date:   04/24/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p2;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandlingException;

public class CheckUserErrorHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(CheckUserErrorHandler.class);

    @ServiceActivator(inputChannel = "errChannel")
    public void handleError(Message<MessageHandlingException> msg) {
        LOGGER.error("Exception message = {}", msg);
    }
}

Spring Integration wraps all exceptions into the class org.springframework.messaging.MessageHandlingException.

The following is the simple interface to check the validity of an user-id:


Listing.24
/*
 * Name:   CheckUserGateway
 * Author: Bhaskar S
 * Date:   04/24/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p2;

public interface CheckUserGateway {
    public boolean checkUser(String id);
}

The following is the XML based Spring Integration configuration that wires up the channels, a gateway with with the appropriate error-channel, and the endpoints:


Listing.25
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:task="http://www.springframework.org/schema/task"
        xmlns:int="http://www.springframework.org/schema/integration"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
                            http://www.springframework.org/schema/beans/spring-beans.xsd
                            http://www.springframework.org/schema/task
                            http://www.springframework.org/schema/task/spring-task.xsd
                            http://www.springframework.org/schema/integration
                            http://www.springframework.org/schema/integration/spring-integration.xsd">

    <bean id="inMsgHandler" class="com.polarsparc.si.p2.CheckUserHandler" />
    <bean id="errMsgHandler" class="com.polarsparc.si.p2.CheckUserErrorHandler" />

    <task:executor id="executor" queue-capacity="5" pool-size="3" />

    <int:channel id="inChannel">
        <int:dispatcher task-executor="executor" />
    </int:channel>

    <!--
        The primary purpose of a Gateway is to hide the messaging API provided
        by Spring Integration
    -->

    <int:gateway id="gateway"
                  service-interface="com.polarsparc.si.p2.CheckUserGateway"
                  default-request-channel="inChannel"
                  error-channel="errChannel" />

    <int:service-activator input-channel="inChannel"
                            ref="inMsgHandler"
                            method="checkUser" />

    <int:service-activator input-channel="errChannel"
                            ref="errMsgHandler"
                            method="handleError" />

</beans>

Only one aspect of the Listing.25 from the above needs a little explanation.

The following is our main application to test exception handling in Spring Integration:


Listing.26
/*
 * Name:   CheckUserMainXml
 * Author: Bhaskar S
 * Date:   04/24/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p2;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class CheckUserMainXml {
    private static final Logger LOGGER = LoggerFactory.getLogger(CheckUserMainXml.class);

    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("p2/CheckUser.xml");

        CheckUserGateway gateway = (CheckUserGateway) context.getBean("gateway");

        LOGGER.info("CheckUser for alice: {} with Xml", Boolean.toString(gateway.checkUser("alice")));

        LOGGER.info("CheckUser for john: {} with Xml", Boolean.toString(gateway.checkUser("john")));
    }
}

To execute the code from Listing.26, open a terminal window and run the following commands:

$ cd $HOME/java/SpringIntegration

$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p2.CheckUserMainXml"

The following would be the typical output:

Output.9

[INFO] Scanning for projects...
[INFO] 
[INFO] ----------------< com.polarsparc.si:SpringIntegration >-----------------
[INFO] Building SpringIntegration 1.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration ---
2021-04-24 12:48:16:705 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-04-24 12:48:16:710 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-04-24 12:48:16:714 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-04-24 12:48:16:759 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService
2021-04-24 12:48:16:908 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler'
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.springframework.util.ReflectionUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to constructor java.lang.invoke.MethodHandles$Lookup(java.lang.Class)
WARNING: Please consider reporting this to the maintainers of org.springframework.util.ReflectionUtils
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2021-04-24 12:48:17:012 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator} as a subscriber to the 'inChannel' channel
2021-04-24 12:48:17:013 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.integration.channel.ExecutorChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@4278a583.inChannel' has 1 subscriber(s).
2021-04-24 12:48:17:014 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2021-04-24 12:48:17:014 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator} as a subscriber to the 'errChannel' channel
2021-04-24 12:48:17:015 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@4278a583.errChannel' has 1 subscriber(s).
2021-04-24 12:48:17:016 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#1'
2021-04-24 12:48:17:016 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-04-24 12:48:17:017 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@4278a583.errorChannel' has 1 subscriber(s).
2021-04-24 12:48:17:018 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger'
2021-04-24 12:48:17:020 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#checkUser(String)'
2021-04-24 12:48:17:021 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway'
2021-04-24 12:48:17:036 [executor-1] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
2021-04-24 12:48:17:043 [executor-1] INFO com.polarsparc.si.p2.CheckUserHandler - Check user = alice
2021-04-24 12:48:17:045 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO com.polarsparc.si.p2.CheckUserMainXml - CheckUser for alice: true with Xml
2021-04-24 12:48:17:047 [executor-2] INFO com.polarsparc.si.p2.CheckUserHandler - Check user = john
2021-04-24 12:48:17:054 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
2021-04-24 12:48:17:056 [com.polarsparc.si.p2.CheckUserMainXml.main()] ERROR com.polarsparc.si.p2.CheckUserErrorHandler - Exception message = ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@62b3a0b]; nested exception is java.lang.RuntimeException: User john is invalid, failedMessage=GenericMessage [payload=john, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@d242962, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@d242962, id=4d37bdf5-2f87-461b-9ca3-069fd7b3b9bc, timestamp=1619282897047}], headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e640abb, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e640abb, id=7486dc01-1677-14eb-688a-93632a1aafa5, timestamp=1619282897054}]
--- CTRL-C ---
Java Config based Approach

The following is the Java Config based POJO that acts as an endpoint that checks if the specified user-id is valid. Else, it throws a run-time exception:


Listing.27
/*
 * Name:   CheckUserHandler2
 * Author: Bhaskar S
 * Date:   04/24/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p2;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;

import java.util.Arrays;
import java.util.List;

@Configuration
@EnableIntegration
public class CheckUserHandler2 {
    private static final Logger LOGGER = LoggerFactory.getLogger(CheckUserHandler2.class);

    List users = Arrays.asList("alice", "bob", "charlie");

    @ServiceActivator(inputChannel = "inChannel")
    public boolean checkUser(String id) {
        LOGGER.info("Check user = {}", id);

        if (!users.contains(id)) {
            throw new RuntimeException("User " + id + " is invalid");
        }

        return true;
    }
}

The following is the Java Config based POJO that acts as an endpoint that handles the run-time exception:


Listing.28
/*
 * Name:   CheckUserErrorHandler2
 * Author: Bhaskar S
 * Date:   04/24/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p2;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandlingException;

@Configuration
@EnableIntegration
public class CheckUserErrorHandler2 {
    private static final Logger LOGGER = LoggerFactory.getLogger(CheckUserErrorHandler2.class);

    @ServiceActivator(inputChannel = "errChannel")
    public void handleError(Message<MessageHandlingException> msg) {
        LOGGER.error("Exception message = {}", msg);
    }
}

The following is the Java Config based POJO that acts as a custom gateway interface that checks the validity of an user-id and defines the request and error channels:


Listing.29
/*
 * Name:   CheckUserGateway2
 * Author: Bhaskar S
 * Date:   04/24/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p2;

import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.config.EnableIntegration;

@Configuration
@EnableIntegration
@IntegrationComponentScan
@MessagingGateway(name = "gateway", defaultRequestChannel = "inChannel", errorChannel = "errChannel")
public interface CheckUserGateway2 {
    public boolean checkUser(String id);
}

And finally, the following is the main application that uses the POJOs from Listing.27, Listing.28, and Listing.29 to test exception handling:


Listing.30
/*
 * Name:   CheckUserMainConfig
 * Author: Bhaskar S
 * Date:   04/24/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p2;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class CheckUserMainConfig {
    private static final Logger LOGGER = LoggerFactory.getLogger(CheckUserMainConfig.class);

    public static void main(String[] args) {
        ApplicationContext context = new AnnotationConfigApplicationContext(CheckUserHandler2.class,
                CheckUserErrorHandler2.class, CheckUserGateway2.class);

        CheckUserGateway2 gateway = (CheckUserGateway2) context.getBean("gateway");

        LOGGER.info("CheckUser for alice: {} with Config", Boolean.toString(gateway.checkUser("alice")));

        LOGGER.info("CheckUser for john: {} with Config", Boolean.toString(gateway.checkUser("john")));
    }
}

To execute the code from Listing.30, open a terminal window and run the following commands:

$ cd $HOME/java/SpringIntegration

$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p2.CheckUserMainConfig"

The following would be the typical output:

Output.10

[INFO] Scanning for projects...
[INFO] 
[INFO] ----------------< com.polarsparc.si:SpringIntegration >-----------------
[INFO] Building SpringIntegration 1.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration ---
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.springframework.cglib.core.ReflectUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to method java.lang.ClassLoader.defineClass(java.lang.String,byte[],int,int,java.security.ProtectionDomain)
WARNING: Please consider reporting this to the maintainers of org.springframework.cglib.core.ReflectUtils
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2021-04-24 12:55:26:520 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-04-24 12:55:26:526 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-04-24 12:55:26:530 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-04-24 12:55:26:571 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-04-24 12:55:26:607 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-04-24 12:55:26:735 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler'
2021-04-24 12:55:26:846 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-04-24 12:55:26:847 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@b1f9496.errorChannel' has 1 subscriber(s).
2021-04-24 12:55:26:848 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger'
2021-04-24 12:55:26:848 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator:checkUserHandler2.checkUser.serviceActivator} as a subscriber to the 'inChannel' channel
2021-04-24 12:55:26:849 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@b1f9496.inChannel' has 1 subscriber(s).
2021-04-24 12:55:26:850 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'checkUserHandler2.checkUser.serviceActivator'
2021-04-24 12:55:26:850 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator:checkUserErrorHandler2.handleError.serviceActivator} as a subscriber to the 'errChannel' channel
2021-04-24 12:55:26:851 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@b1f9496.errChannel' has 1 subscriber(s).
2021-04-24 12:55:26:851 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'checkUserErrorHandler2.handleError.serviceActivator'
2021-04-24 12:55:26:852 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#checkUser(String)'
2021-04-24 12:55:26:853 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway'
2021-04-24 12:55:26:865 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
2021-04-24 12:55:26:868 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO com.polarsparc.si.p2.CheckUserHandler2 - Check user = alice
2021-04-24 12:55:26:870 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO com.polarsparc.si.p2.CheckUserMainConfig - CheckUser for alice: true with Config
2021-04-24 12:55:26:871 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO com.polarsparc.si.p2.CheckUserHandler2 - Check user = john
2021-04-24 12:55:26:878 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
2021-04-24 12:55:26:879 [com.polarsparc.si.p2.CheckUserMainConfig.main()] ERROR com.polarsparc.si.p2.CheckUserErrorHandler2 - Exception message = ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@1ee88cfc]; nested exception is java.lang.RuntimeException: User john is invalid, failedMessage=GenericMessage [payload=john, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@183d1409, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@183d1409, id=b174d92e-3559-9d82-7ff6-b75a2c389034, timestamp=1619283326871}], headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@6d477d5a, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@6d477d5a, id=9317d5f2-87f8-5add-bb8a-533050d57d32, timestamp=1619283326878}]
--- CTRL-C ---

As can be inferred from the Output.9 and Output.10 above, Spring Integration is wrapping the generated exception into MessageHandlingException and routing it to our exception handler.

References

Spring Integration Notes :: Part - 1

Integration - Error Handling



© PolarSPARC