PolarSPARC

Spring Integration Notes :: Part - 6


Bhaskar S 05/22/2021 (UPDATED)


Overview

In Part-5, we covered basic examples of Spring Integration relating to the AMQP (using RabbitMQ) and Apache Kafka channel adapters.

We will continue our journey by exploring some basic examples relating to the following concepts in Spring Integration:

Setup

To setup the Java directory structure for the demonstrations in this part, execute the following commands:

$ cd $HOME/java/SpringIntegration

$ mkdir -p src/main/java/com/polarsparc/si/p6

$ mkdir -p src/main/resources/p6

The following is the listing for the updated Maven project file pom.xml to add the dependencies for the stream and Lombok support:


pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>com.polarsparc.si</groupId>
    <artifactId>SpringIntegration</artifactId>
    <version>1.0</version>
    <name>SpringIntegration</name>
    <description>Spring Integration Examples</description>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.8.1</version>
                    <configuration>
                        <fork>true</fork>
                        <meminitial>128m</meminitial>
                        <maxmem>512m</maxmem>
                        <source>11</source>
                        <target>11</target>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <dependencies>
        <dependency>
            <groupId>javax.annotation</groupId>
            <artifactId>javax.annotation-api</artifactId>
            <version>1.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>5.3.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.3.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>5.3.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
            <version>5.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-file</artifactId>
            <version>5.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-sftp</artifactId>
            <version>5.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-jdbc</artifactId>
            <version>5.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-http</artifactId>
            <version>5.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-amqp</artifactId>
            <version>5.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-kafka</artifactId>
            <version>5.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
            <version>5.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.3.6</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.30</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>log4j-over-slf4j</artifactId>
            <version>1.7.30</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.30</version>
        </dependency>
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>42.2.19</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

</project>

Hands-on Spring Integration

We will consider a hypothetical car service use-case to demonstrate the various concepts in this part.

Chaining Endpoints

In this hypothetical example, as cars come in for servicing, we want to assign each car, a reference number on arrival, and then based on the current miles on the car, assign recommended services.

XML based Approach

The following is the Car POJO that encapsulates the car make, the current miles, the reference number, and a java.util.Map of services:


Listing.73
/*
 * Name:   Car
 * Author: Bhaskar S
 * Date:   05/22/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p6;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import java.util.Map;
import java.util.TreeMap;

@Getter
@Setter
@ToString
public class Car {
    private String ref;
    private String make;

    private long miles;

    private Map<String, Float> services = new TreeMap<>();

    public Car(String make, long miles) {
        this.make = make;
        this.miles = miles;
    }

    public static Car makeClone(Car car) {
      return new Car(car.make, car.miles);
    }
}

The following handler is a Java POJO that assigns a reference number to an arriving Car:


Listing.74
/*
 * Name:   RefNoHandler
 * Author: Bhaskar S
 * Date:   05/22/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p6;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;

public class RefNoHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(RefNoHandler.class);

    private static int nextNo = 1;

    private static String REF_NO_FMT = "%s-%05d";

    public Message<Car> assignRefNo(Message<Car> input) {
        input.getPayload().setRef(String.format(REF_NO_FMT, input.getPayload().getMake().substring(0, 1), nextNo++));

        LOGGER.info("Input: {} (in Xml)", input.toString());

        return input;
    }
}

The following handler is a Java POJO that assigns the recommended services for an arriving Car based on the current miles:


Listing.75
/*
 * Name:   CarServiceHandler
 * Author: Bhaskar S
 * Date:   05/22/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p6;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;

public class CarServiceHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(CarServiceHandler.class);

    private static String OIL_CHANGE = "OIL_CHANGE";
    private static String AIR_FILTER = "AIR_FILTER";
    private static String WHEEL_BALANCE = "WHEEL_BALANCE";

    public Message<?> recommendedServices(Message<Car> input) {
        input.getPayload().getServices().put(OIL_CHANGE, (float) 29.95);
        input.getPayload().getServices().put(AIR_FILTER, (float) 19.95);

        if (input.getPayload().getMiles() % 15000 == 0) {
            input.getPayload().getServices().put(WHEEL_BALANCE, (float) 69.95);
        }

        LOGGER.info("Input: {} (in Xml)", input.toString());

        return input;
    }
}

The following is a simple interface to service a Car:


Listing.76
/*
 * Name:   CarServiceGateway
 * Author: Bhaskar S
 * Date:   05/22/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p6;

public interface CarServiceGateway {
    public void serviceCar(Car car);
}

The following is the XML based Spring Integration configuration that wires up the channels and the endpoints as a chain:


Listing.77
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:int="http://www.springframework.org/schema/integration"
        xmlns:stream="http://www.springframework.org/schema/integration/stream"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
                            http://www.springframework.org/schema/beans/spring-beans.xsd
                            http://www.springframework.org/schema/integration
                            http://www.springframework.org/schema/integration/spring-integration.xsd
                            http://www.springframework.org/schema/integration/stream
                            http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">

    <bean id="refNoHandler" class="com.polarsparc.si.p6.RefNoHandler" />
    <bean id="carServiceHandler" class="com.polarsparc.si.p6.CarServiceHandler" />

    <int:channel id="inChannel" />
    <int:channel id="outChannel" />

    <int:gateway id="gateway"
                  service-interface="com.polarsparc.si.p6.CarServiceGateway"
                  default-request-channel="inChannel" />

    <int:chain input-channel="inChannel" output-channel="outChannel">
        <int:service-activator ref="refNoHandler"
                                method="assignRefNo" />

        <int:service-activator ref="carServiceHandler"
                                method="recommendedServices" />
    </int:chain>

    <stream:stdout-channel-adapter channel="outChannel"
                                    append-newline="true" />
</beans>

Some aspects of the Listing.77 from the above needs a little explanation.

The following is our main application to test the hypothetical car service:


Listing.78
/*
 * Name:   CarServiceMainXml
 * Author: Bhaskar S
 * Date:   05/22/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p6;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class CarServiceMainXml {
    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext(args[0]);

        CarServiceGateway gateway = (CarServiceGateway) context.getBean("gateway");

        gateway.serviceCar(new Car("Toyota", 30000));
        gateway.serviceCar(new Car("Honda", 5000));
    }
}

To execute the code from Listing.78 with the argument from Listing.77, open a terminal window and run the following commands:

$ cd $HOME/java/SpringIntegration

$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p6.CarServiceMainXml" -Dexec.args="p6/CarService.xml"

The following would be the typical output:

Output.35

[INFO] Scanning for projects...
[INFO] 
[INFO] ----------------< com.polarsparc.si:SpringIntegration >-----------------
[INFO] Building SpringIntegration 1.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration ---
2021-05-22 14:01:38:134 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.config.xml.ChainParser - It is useful to provide an explicit 'id' attribute on 'chain' elements to simplify the identification of child elements in logs etc.
2021-05-22 14:01:38:211 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-05-22 14:01:38:214 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-05-22 14:01:38:222 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-05-22 14:01:38:411 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler'
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.springframework.util.ReflectionUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to constructor java.lang.invoke.MethodHandles$Lookup(java.lang.Class)
WARNING: Please consider reporting this to the maintainers of org.springframework.util.ReflectionUtils
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2021-05-22 14:01:38:623 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.handler.MessageHandlerChain - started bean 'org.springframework.integration.handler.MessageHandlerChain#0'; defined in: 'class path resource [p6/CarService.xml]'; from source: ''int:chain''
2021-05-22 14:01:38:624 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {chain} as a subscriber to the 'inChannel' channel
2021-05-22 14:01:38:625 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@346f8b44.inChannel' has 1 subscriber(s).
2021-05-22 14:01:38:626 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2021-05-22 14:01:38:626 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {stream:outbound-channel-adapter(character)} as a subscriber to the 'outChannel' channel
2021-05-22 14:01:38:627 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@346f8b44.outChannel' has 1 subscriber(s).
2021-05-22 14:01:38:628 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#1'; defined in: 'class path resource [p6/CarService.xml]'; from source: ''stream:stdout-channel-adapter''
2021-05-22 14:01:38:628 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-05-22 14:01:38:629 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@346f8b44.errorChannel' has 1 subscriber(s).
2021-05-22 14:01:38:630 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger'
2021-05-22 14:01:38:631 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#serviceCar(Car)'
2021-05-22 14:01:38:631 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway'
2021-05-22 14:01:38:643 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
2021-05-22 14:01:38:654 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.RefNoHandler - Input: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={}), headers={replyChannel=nullChannel, id=25ea080f-0449-8eb3-5a08-e42712779f76, timestamp=1621706498643}] (in Xml)
2021-05-22 14:01:38:656 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
2021-05-22 14:01:38:657 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.CarServiceHandler - Input: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95}), headers={replyChannel=nullChannel, id=25ea080f-0449-8eb3-5a08-e42712779f76, timestamp=1621706498643}] (in Xml)
Car(ref=T-00001, make=Toyota, miles=30000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95})
2021-05-22 14:01:38:660 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.RefNoHandler - Input: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={}), headers={replyChannel=nullChannel, id=9b7c4304-639a-4189-2596-2152694af8e2, timestamp=1621706498659}] (in Xml)
2021-05-22 14:01:38:661 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.CarServiceHandler - Input: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95}), headers={replyChannel=nullChannel, id=9b7c4304-639a-4189-2596-2152694af8e2, timestamp=1621706498659}] (in Xml)
Car(ref=H-00002, make=Honda, miles=5000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95})
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  1.429 s
[INFO] Finished at: 2021-05-22T14:01:38-04:00
[INFO] ------------------------------------------------------------------------
Java Config based Approach

The following is the Java Config based POJO handler that assigns a reference number to an arriving Car:


Listing.79
/*
 * Name:   RefNoHandler2
 * Author: Bhaskar S
 * Date:   05/22/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p6;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.messaging.Message;

@Configuration
@EnableIntegration
public class RefNoHandler2 {
    private static final Logger LOGGER = LoggerFactory.getLogger(RefNoHandler2.class);

    private static int nextNo = 1;

    private static String REF_NO_FMT = "%s-%05d";

    @ServiceActivator
    public Message<Car> assignRefNo(Message<Car> input) {
        input.getPayload().setRef(String.format(REF_NO_FMT, input.getPayload().getMake().substring(0, 1), nextNo++));

        LOGGER.info("Input: {} (in Config)", input.toString());

        return input;
    }
}

The following is the Java Config based POJO handler that assigns the recommended services for an arriving Car based on the current miles:


Listing.80
/*
 * Name:   CarServiceHandler2
 * Author: Bhaskar S
 * Date:   05/22/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p6;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.messaging.Message;

@Configuration
@EnableIntegration
public class CarServiceHandler2 {
    private static final Logger LOGGER = LoggerFactory.getLogger(CarServiceHandler2.class);

    private static String OIL_CHANGE = "OIL_CHANGE";
    private static String AIR_FILTER = "AIR_FILTER";
    private static String WHEEL_BALANCE = "WHEEL_BALANCE";

    @ServiceActivator
    public Message<?> recommendedServices(Message<Car> input) {
        input.getPayload().getServices().put(OIL_CHANGE, (float) 29.95);
        input.getPayload().getServices().put(AIR_FILTER, (float) 19.95);

        if (input.getPayload().getMiles() % 15000 == 0) {
            input.getPayload().getServices().put(WHEEL_BALANCE, (float) 69.95);
        }

        LOGGER.info("Input: {} (in Config)", input.toString());

        return input;
    }
}

The following is the Java Config based simple interface to service a Car:


Listing.81
/*
 * Name:   CarServiceGateway2
 * Author: Bhaskar S
 * Date:   05/22/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p6;

import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.config.EnableIntegration;

@Configuration
@EnableIntegration
@IntegrationComponentScan
@MessagingGateway(name = "gateway", defaultRequestChannel = "inChannel")
public interface CarServiceGateway2 {
    public void serviceCar(Car car);
}

The following is the Java Config based POJO that defines the channels and the endpoints as a chain:


Listing.82
/*
 * Name:   CarServiceConfig
 * Author: Bhaskar S
 * Date:   05/22/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p6;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.handler.MessageHandlerChain;
import org.springframework.integration.handler.MethodInvokingMessageProcessor;
import org.springframework.integration.handler.ServiceActivatingHandler;
import org.springframework.integration.stream.CharacterStreamWritingMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import java.util.Arrays;

@Configuration
@EnableIntegration
public class CarServiceConfig {
    private RefNoHandler2 refNoHandler2;
    private CarServiceHandler2 carServiceHandler2;

    @Autowired
    public void setRefNoHandler2(RefNoHandler2 refNoHandler2) {
        this.refNoHandler2 = refNoHandler2;
    }

    @Autowired
    public void setCarServiceHandler2(CarServiceHandler2 carServiceHandler2) {
        this.carServiceHandler2 = carServiceHandler2;
    }

    @Bean
    public MessageChannel inChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel outChannel() {
        return new DirectChannel();
    }

    /* --- BEGIN: For RefNoHandler2 --- */
    @Bean
    public MethodInvokingMessageProcessor<?> refNoMessageHandler2() {
        return new MethodInvokingMessageProcessor<>(refNoHandler2, "assignRefNo");
    }

    @Bean
    public MessageHandler refNoEndpoint2() {
        return new ServiceActivatingHandler(refNoMessageHandler2());
    }
    /* --- END: For RefNoHandler2 --- */

    /* --- BEGIN: For CarServiceHandler2 --- */
    @Bean
    public MethodInvokingMessageProcessor<?> carServiceMessageHandler2() {
        return new MethodInvokingMessageProcessor<>(carServiceHandler2, "recommendedServices");
    }

    @Bean
    public MessageHandler carServiceEndpoint2() {
        return new ServiceActivatingHandler(carServiceMessageHandler2());
    }
    /* --- END: For CarServiceHandler2 --- */

    @Bean
    @ServiceActivator(inputChannel = "inChannel")
    public MessageHandler chainEndpoints() {
        MessageHandlerChain chain = new MessageHandlerChain();
        chain.setHandlers(Arrays.asList(refNoEndpoint2(),
                carServiceEndpoint2()));
        chain.setOutputChannel(outChannel());
        return chain;
    }

    @Bean
    @ServiceActivator(inputChannel = "outChannel")
    public MessageHandler stdoutAdapter() {
        CharacterStreamWritingMessageHandler handler = CharacterStreamWritingMessageHandler.stdout();
        handler.setShouldAppendNewLine(true);
        return handler;
    }
}

Some aspects of the Listing.82 from the above needs a little explanation.

And finally, the following is the main application that uses the POJOs from Listing.79, Listing.80, Listing.81, and Listing.82 to test the hypothetical car service:


Listing.83
/*
 * Name:   CarServiceMainConfig
 * Author: Bhaskar S
 * Date:   05/22/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p6;

import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class CarServiceMainConfig {
    public static void main(String[] args) {
        ApplicationContext context = new AnnotationConfigApplicationContext(RefNoHandler2.class,
                CarServiceHandler2.class,
                CarServiceGateway2.class,
                CarServiceConfig.class);

        CarServiceGateway2 gateway = (CarServiceGateway2) context.getBean("gateway");

        gateway.serviceCar(new Car("Toyota", 30000));
        gateway.serviceCar(new Car("Honda", 5000));
    }
}

To execute the code from Listing.83, open a terminal window and run the following commands:

$ cd $HOME/java/SpringIntegration

$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p6.CarServiceMainConfig"

The following would be the typical output:

Output.36

[INFO] Scanning for projects...
[INFO] 
[INFO] ----------------< com.polarsparc.si:SpringIntegration >-----------------
[INFO] Building SpringIntegration 1.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration ---
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.springframework.cglib.core.ReflectUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to method java.lang.ClassLoader.defineClass(java.lang.String,byte[],int,int,java.security.ProtectionDomain)
WARNING: Please consider reporting this to the maintainers of org.springframework.cglib.core.ReflectUtils
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2021-05-22 14:10:06:603 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-05-22 14:10:06:609 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-05-22 14:10:06:618 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-05-22 14:10:06:674 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-05-22 14:10:06:690 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-05-22 14:10:06:824 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler'
2021-05-22 14:10:07:063 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-05-22 14:10:07:064 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23c0f7ba.errorChannel' has 1 subscriber(s).
2021-05-22 14:10:07:065 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger'
2021-05-22 14:10:07:066 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {message-handler:carServiceConfig.chainEndpoints.serviceActivator} as a subscriber to the 'inChannel' channel
2021-05-22 14:10:07:067 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23c0f7ba.inChannel' has 1 subscriber(s).
2021-05-22 14:10:07:068 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.handler.MessageHandlerChain - started bean 'chainEndpoints'; defined in: 'com.polarsparc.si.p6.CarServiceConfig'; from source: 'org.springframework.core.type.StandardMethodMetadata@6826ec1a'
2021-05-22 14:10:07:069 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'carServiceConfig.chainEndpoints.serviceActivator'
2021-05-22 14:10:07:070 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {message-handler:carServiceConfig.stdoutAdapter.serviceActivator} as a subscriber to the 'outChannel' channel
2021-05-22 14:10:07:071 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23c0f7ba.outChannel' has 1 subscriber(s).
2021-05-22 14:10:07:071 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'carServiceConfig.stdoutAdapter.serviceActivator'
2021-05-22 14:10:07:073 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#serviceCar(Car)'
2021-05-22 14:10:07:073 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway'
2021-05-22 14:10:07:088 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
2021-05-22 14:10:07:107 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO com.polarsparc.si.p6.RefNoHandler2 - Input: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={}), headers={replyChannel=nullChannel, id=0db165fe-48ac-bbcb-a94b-d4a9f631b1b2, timestamp=1621707007088}] (in Config)
2021-05-22 14:10:07:109 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
2021-05-22 14:10:07:110 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO com.polarsparc.si.p6.CarServiceHandler2 - Input: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95}), headers={replyChannel=nullChannel, id=0db165fe-48ac-bbcb-a94b-d4a9f631b1b2, timestamp=1621707007088}] (in Config)
Car(ref=T-00001, make=Toyota, miles=30000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95})
2021-05-22 14:10:07:112 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO com.polarsparc.si.p6.RefNoHandler2 - Input: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={}), headers={replyChannel=nullChannel, id=e99e996f-22a4-023b-52cd-7516a8774c1a, timestamp=1621707007112}] (in Config)
2021-05-22 14:10:07:113 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO com.polarsparc.si.p6.CarServiceHandler2 - Input: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95}), headers={replyChannel=nullChannel, id=e99e996f-22a4-023b-52cd-7516a8774c1a, timestamp=1621707007112}] (in Config)
Car(ref=H-00002, make=Honda, miles=5000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95})
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  1.363 s
[INFO] Finished at: 2021-05-22T14:10:07-04:00
[INFO] ------------------------------------------------------------------------

Header Enrichment

In this hypothetical car service example, we will add an additional header property to the incoming message.

XML based Approach

The following is the XML based Spring Integration configuration that wires up the channels and the endpoints along with header enrichment (add a header property called CAR_MAKE with the value of the Car make from the message payload) as a chain:


Listing.84
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:int="http://www.springframework.org/schema/integration"
        xmlns:stream="http://www.springframework.org/schema/integration/stream"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
                            http://www.springframework.org/schema/beans/spring-beans.xsd
                            http://www.springframework.org/schema/integration
                            http://www.springframework.org/schema/integration/spring-integration.xsd
                            http://www.springframework.org/schema/integration/stream
                            http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">

    <bean id="refNoHandler" class="com.polarsparc.si.p6.RefNoHandler" />
    <bean id="carServiceHandler" class="com.polarsparc.si.p6.CarServiceHandler" />

    <int:channel id="inChannel" />
    <int:channel id="outChannel" />

    <int:gateway id="gateway"
                  service-interface="com.polarsparc.si.p6.CarServiceGateway"
                  default-request-channel="inChannel" />

    <int:chain input-channel="inChannel" output-channel="outChannel">
        <int:service-activator ref="refNoHandler" method="assignRefNo" />

        <int:header-enricher>
            <int:header name="CAR_MAKE" expression="payload.getMake()"/>
        </int:header-enricher>

        <int:service-activator ref="carServiceHandler" method="recommendedServices" />
    </int:chain>

    <stream:stdout-channel-adapter channel="outChannel"
                                    append-newline="true" />
</beans>

Some aspects of the Listing.84 from the above needs a little explanation.

To execute the code from Listing.78 with the argument from Listing.84, open a terminal window and run the following commands:

$ cd $HOME/java/SpringIntegration

$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p6.CarServiceMainXml" -Dexec.args="p6/CarService2.xml"

The following would be the typical output:

Output.37

[INFO] Scanning for projects...
[INFO] 
[INFO] ----------------< com.polarsparc.si:SpringIntegration >-----------------
[INFO] Building SpringIntegration 1.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration ---
2021-05-22 14:11:53:956 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.config.xml.ChainParser - It is useful to provide an explicit 'id' attribute on 'chain' elements to simplify the identification of child elements in logs etc.
2021-05-22 14:11:54:035 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-05-22 14:11:54:038 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-05-22 14:11:54:046 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-05-22 14:11:54:240 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler'
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.springframework.util.ReflectionUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to constructor java.lang.invoke.MethodHandles$Lookup(java.lang.Class)
WARNING: Please consider reporting this to the maintainers of org.springframework.util.ReflectionUtils
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2021-05-22 14:11:54:453 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.handler.MessageHandlerChain - started bean 'org.springframework.integration.handler.MessageHandlerChain#0'; defined in: 'class path resource [p6/CarService2.xml]'; from source: ''int:chain''
2021-05-22 14:11:54:454 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {chain} as a subscriber to the 'inChannel' channel
2021-05-22 14:11:54:454 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7504efe4.inChannel' has 1 subscriber(s).
2021-05-22 14:11:54:455 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2021-05-22 14:11:54:456 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {stream:outbound-channel-adapter(character)} as a subscriber to the 'outChannel' channel
2021-05-22 14:11:54:456 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7504efe4.outChannel' has 1 subscriber(s).
2021-05-22 14:11:54:457 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#1'; defined in: 'class path resource [p6/CarService2.xml]'; from source: ''stream:stdout-channel-adapter''
2021-05-22 14:11:54:457 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-05-22 14:11:54:458 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7504efe4.errorChannel' has 1 subscriber(s).
2021-05-22 14:11:54:459 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger'
2021-05-22 14:11:54:460 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#serviceCar(Car)'
2021-05-22 14:11:54:460 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway'
2021-05-22 14:11:54:478 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
2021-05-22 14:11:54:488 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.RefNoHandler - Input: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={}), headers={replyChannel=nullChannel, id=ab6f3ea0-bcd1-f4e1-9057-2e64507766de, timestamp=1621707114477}] (in Xml)
2021-05-22 14:11:54:490 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
2021-05-22 14:11:54:492 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.CarServiceHandler - Input: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95}), headers={replyChannel=nullChannel, id=2da4a1d9-a7b3-82af-c9c6-7b5d115cad0a, CAR_MAKE=Toyota, timestamp=1621707114490}] (in Xml)
Car(ref=T-00001, make=Toyota, miles=30000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95})
2021-05-22 14:11:54:494 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.RefNoHandler - Input: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={}), headers={replyChannel=nullChannel, id=273bfdf0-3290-24ff-c604-fc4733ab13d2, timestamp=1621707114494}] (in Xml)
2021-05-22 14:11:54:496 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.CarServiceHandler - Input: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95}), headers={replyChannel=nullChannel, id=80c35f67-1b02-c039-fb2b-20974bb5ae02, CAR_MAKE=Honda, timestamp=1621707114496}] (in Xml)
Car(ref=H-00002, make=Honda, miles=5000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95})
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  1.446 s
[INFO] Finished at: 2021-05-22T14:11:54-04:00
[INFO] ------------------------------------------------------------------------
Java Config based Approach

The following is the Java Config based POJO that defines the channels and the endpoints along with header enrichment (add a header property called CAR_MAKE with the value of the Car make from the message payload) as a chain:


Listing.85
/*
 * Name:   CarServiceConfig2
 * Author: Bhaskar S
 * Date:   05/22/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p6;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.handler.MessageHandlerChain;
import org.springframework.integration.handler.MethodInvokingMessageProcessor;
import org.springframework.integration.handler.ServiceActivatingHandler;
import org.springframework.integration.stream.CharacterStreamWritingMessageHandler;
import org.springframework.integration.transformer.HeaderEnricher;
import org.springframework.integration.transformer.MessageTransformingHandler;
import org.springframework.integration.transformer.support.ExpressionEvaluatingHeaderValueMessageProcessor;
import org.springframework.integration.transformer.support.HeaderValueMessageProcessor;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableIntegration
public class CarServiceConfig2 {
    private RefNoHandler2 refNoHandler2;
    private CarServiceHandler2 carServiceHandler2;

    @Autowired
    public void setRefNoHandler2(RefNoHandler2 refNoHandler2) {
        this.refNoHandler2 = refNoHandler2;
    }

    @Autowired
    public void setCarServiceHandler2(CarServiceHandler2 carServiceHandler2) {
        this.carServiceHandler2 = carServiceHandler2;
    }

    @Bean
    public MessageChannel inChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel outChannel() {
        return new DirectChannel();
    }

    /* --- BEGIN: For RefNoHandler2 --- */
    @Bean
    public MethodInvokingMessageProcessor<?> refNoMessageHandler2() {
        return new MethodInvokingMessageProcessor<>(refNoHandler2, "assignRefNo");
    }

    @Bean
    public MessageHandler refNoEndpoint2() {
        return new ServiceActivatingHandler(refNoMessageHandler2());
    }
    /* --- END: For RefNoHandler2 --- */

    /* --- BEGIN: For Header Enricher --- */
    @Bean
    public HeaderEnricher carMakeHeader() {
        Map<String, HeaderValueMessageProcessor<?>> headers = new HashMap<>();
        Expression expression = new SpelExpressionParser().parseExpression("payload.getMake()");
        headers.put("CAR_MAKE", new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class));
        return new HeaderEnricher(headers);
    }

    @Bean
    public MessageHandler carMakeTransformer() {
        return new MessageTransformingHandler(carMakeHeader());
    }
    /* --- END: For Header Enricher --- */

    /* --- BEGIN: For CarServiceHandler2 --- */
    @Bean
    public MethodInvokingMessageProcessor<?> carServiceMessageHandler2() {
        return new MethodInvokingMessageProcessor<>(carServiceHandler2, "recommendedServices");
    }

    @Bean
    public MessageHandler carServiceEndpoint2() {
        return new ServiceActivatingHandler(carServiceMessageHandler2());
    }
    /* --- END: For CarServiceHandler2 --- */

    @Bean
    @ServiceActivator(inputChannel = "inChannel")
    public MessageHandler chainEndpoints() {
        MessageHandlerChain chain = new MessageHandlerChain();
        chain.setHandlers(Arrays.asList(refNoEndpoint2(),
                carMakeTransformer(),
                carServiceEndpoint2()));
        chain.setOutputChannel(outChannel());
        return chain;
    }

    @Bean
    @ServiceActivator(inputChannel = "outChannel")
    public MessageHandler stdoutAdapter() {
        CharacterStreamWritingMessageHandler handler = CharacterStreamWritingMessageHandler.stdout();
        handler.setShouldAppendNewLine(true);
        return handler;
    }
}

Some aspects of the Listing.85 from the above needs a little explanation.

And finally, the following is the main application that uses the POJOs from Listing.79, Listing.80, Listing.81, and Listing.85 to test the hypothetical car service:


Listing.86
/*
 * Name:   CarServiceMainConfig2
 * Author: Bhaskar S
 * Date:   05/22/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p6;

import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class CarServiceMainConfig2 {
    public static void main(String[] args) {
        ApplicationContext context = new AnnotationConfigApplicationContext(RefNoHandler2.class,
                CarServiceHandler2.class,
                CarServiceGateway2.class,
                CarServiceConfig2.class);

        CarServiceGateway2 gateway = (CarServiceGateway2) context.getBean("gateway");

        gateway.serviceCar(new Car("Toyota", 30000));
        gateway.serviceCar(new Car("Honda", 5000));
    }
}

To execute the code from Listing.86, open a terminal window and run the following commands:

$ cd $HOME/java/SpringIntegration

$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p6.CarServiceMainConfig2"

The following would be the typical output:

Output.38

[INFO] Scanning for projects...
[INFO] 
[INFO] ----------------< com.polarsparc.si:SpringIntegration >-----------------
[INFO] Building SpringIntegration 1.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration ---
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.springframework.cglib.core.ReflectUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to method java.lang.ClassLoader.defineClass(java.lang.String,byte[],int,int,java.security.ProtectionDomain)
WARNING: Please consider reporting this to the maintainers of org.springframework.cglib.core.ReflectUtils
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2021-05-22 14:14:46:222 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-05-22 14:14:46:228 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-05-22 14:14:46:235 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-05-22 14:14:46:285 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-05-22 14:14:46:302 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-05-22 14:14:46:437 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler'
2021-05-22 14:14:46:689 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-05-22 14:14:46:690 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23c0f7ba.errorChannel' has 1 subscriber(s).
2021-05-22 14:14:46:691 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger'
2021-05-22 14:14:46:691 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {message-handler:carServiceConfig2.chainEndpoints.serviceActivator} as a subscriber to the 'inChannel' channel
2021-05-22 14:14:46:692 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23c0f7ba.inChannel' has 1 subscriber(s).
2021-05-22 14:14:46:692 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.handler.MessageHandlerChain - started bean 'chainEndpoints'; defined in: 'com.polarsparc.si.p6.CarServiceConfig2'; from source: 'org.springframework.core.type.StandardMethodMetadata@4d9800b2'
2021-05-22 14:14:46:693 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'carServiceConfig2.chainEndpoints.serviceActivator'
2021-05-22 14:14:46:694 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {message-handler:carServiceConfig2.stdoutAdapter.serviceActivator} as a subscriber to the 'outChannel' channel
2021-05-22 14:14:46:694 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23c0f7ba.outChannel' has 1 subscriber(s).
2021-05-22 14:14:46:695 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'carServiceConfig2.stdoutAdapter.serviceActivator'
2021-05-22 14:14:46:696 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#serviceCar(Car)'
2021-05-22 14:14:46:697 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway'
2021-05-22 14:14:46:709 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
2021-05-22 14:14:46:729 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO com.polarsparc.si.p6.RefNoHandler2 - Input: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={}), headers={replyChannel=nullChannel, id=abf81eba-d821-6df1-29d2-0024f0f92e94, timestamp=1621707286709}] (in Config)
2021-05-22 14:14:46:732 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
2021-05-22 14:14:46:733 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO com.polarsparc.si.p6.CarServiceHandler2 - Input: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95}), headers={replyChannel=nullChannel, id=4660e666-25c1-abb7-b134-be652b126bd3, CAR_MAKE=Toyota, timestamp=1621707286732}] (in Config)
Car(ref=T-00001, make=Toyota, miles=30000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95})
2021-05-22 14:14:46:734 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO com.polarsparc.si.p6.RefNoHandler2 - Input: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={}), headers={replyChannel=nullChannel, id=ec7fefc1-0e62-2d3d-467a-16e3dc843dcf, timestamp=1621707286734}] (in Config)
2021-05-22 14:14:46:736 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO com.polarsparc.si.p6.CarServiceHandler2 - Input: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95}), headers={replyChannel=nullChannel, id=39f97081-c90a-2bf6-807f-29f9527a593c, CAR_MAKE=Honda, timestamp=1621707286735}] (in Config)
Car(ref=H-00002, make=Honda, miles=5000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95})
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  1.427 s
[INFO] Finished at: 2021-05-22T14:14:46-04:00
[INFO] ------------------------------------------------------------------------

References

Spring Integration Notes :: Part - 5

Spring Integration Notes :: Part - 4

Spring Integration Notes :: Part - 3

Spring Integration Notes :: Part - 2

Spring Integration Notes :: Part - 1

Spring Integration - Message Handler Chain

Spring Integration - Content Enricher



© PolarSPARC