PolarSPARC

Spring Integration Notes :: Part - 4


Bhaskar S 05/08/2021 (UPDATED)


Overview

In Part-3, we covered basic examples of Spring Integration relating to the File and SFTP channel adapters.

We will continue our journey on channel adapters by exploring examples in Spring Integration relating to the following:

Setup

To setup the Java directory structure for the demonstrations in this part, execute the following commands:

$ cd $HOME/java/SpringIntegration

$ mkdir -p src/main/java/com/polarsparc/si/p4

$ mkdir -p src/main/resources/p4

To setup the directory structure for the database server, execute the following command:

$ mkdir -p $HOME/Downloads/postgres

To download the required docker image for the PostgreSQL database server, execute the following command:

$ docker pull postgres:13.2

The following is the listing for the updated Maven project file pom.xml to support the jdbc and http channel adapters:


pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>com.polarsparc.si</groupId>
    <artifactId>SpringIntegration</artifactId>
    <version>1.0</version>
    <name>SpringIntegration</name>
    <description>Spring Integration Examples</description>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.8.1</version>
                    <configuration>
                        <fork>true</fork>
                        <meminitial>128m</meminitial>
                        <maxmem>512m</maxmem>
                        <source>11</source>
                        <target>11</target>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <dependencies>
        <dependency>
            <groupId>javax.annotation</groupId>
            <artifactId>javax.annotation-api</artifactId>
            <version>1.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>5.3.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.3.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>5.3.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
            <version>5.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-file</artifactId>
            <version>5.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-sftp</artifactId>
            <version>5.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-jdbc</artifactId>
            <version>5.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-http</artifactId>
            <version>5.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.30</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>log4j-over-slf4j</artifactId>
            <version>1.7.30</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.30</version>
        </dependency>
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>42.2.19</version>
        </dependency>
    </dependencies>

</project>

Hands-on Spring Integration

Polling a Database for Records

For this example, one needs access to a relational database, hence we will start a PostgreSQL database server on the localhost using docker. Open a terminal window and execute the following command:

$ docker run -d --rm --name postgres-13.2 -e POSTGRES_USER=polarsparc -e POSTGRES_PASSWORD=polarsparc\$123 -p 5432:5432 -v $HOME/Downloads/DATA/postgres:/var/lib/postgresql/data postgres:13.2

To create a database called my_test_db, execute the following command in the terminal:

$ docker exec -it postgres-13.2 sh

The prompt changes to # and continue to execute the following command:

# psql -U polarsparc

The prompt changes to polarsparc=# and continue to execute the following commands:

polarsparc=# CREATE DATABASE my_test_db;

polarsparc=# GRANT ALL PRIVILEGES ON DATABASE my_test_db TO polarsparc;

polarsparc=# \q

The prompt changes to # and continue to execute the following command:

# psql my_test_db -U polarsparc

The prompt changes to my_test_db=> and continue to execute the following commands:

my_test_db=> CREATE TABLE ORDERS_TBL (ORDER_NO VARCHAR(10) NOT NULL, ITEM VARCHAR(100) NOT NULL, SHIPPED CHAR(1) DEFAULT 'N', PRIMARY KEY (ORDER_NO));

my_test_db=> \q

The prompt changes to # and continue to execute the following command:

# exit

The following is the properties file jdbc.properties located in the resources/p4 directory:


Listing.44
#
# Properties for jdbc processing
#

jdbc.driver.class=org.postgresql.Driver
jdbc.url=jdbc:postgresql://localhost:5432/my_test_db
jdbc.username=polarsparc
jdbc.password=polarsparc$123

query.orders=SELECT * FROM ORDERS_TBL WHERE SHIPPED = 'N'
update.orders=UPDATE ORDERS_TBL SET SHIPPED = 'Y' WHERE ORDER_NO IN (:ORDER_NO)

XML based Approach

The following is the database handler POJO that displays the rows retrieved from the database:


Listing.45
/*
 * Name:   DbProcessHandler
 * Author: Bhaskar S
 * Date:   05/08/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p4;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;

public class DbProcessHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(DbProcessHandler.class);

    private final static String ORDER_NO = "ORDER_NO";
    private final static String ITEM = "ITEM";

    public void handler(List<Map<String, Object>> list) {
        LOGGER.info("No of records to process: {}", list.size());

        for (Map<String, Object> rec : list) {
            LOGGER.info("Processed order {} for item {}", rec.get(ORDER_NO), rec.get(ITEM));
        }
    }
}

The following is the XML based Spring Integration configuration that wires up the channels, the jdbc channel adapter, and the endpoint:


Listing.46
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:int="http://www.springframework.org/schema/integration"
        xmlns:jdbc="http://www.springframework.org/schema/integration/jdbc"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
                            http://www.springframework.org/schema/beans/spring-beans.xsd
                            http://www.springframework.org/schema/context
                            http://www.springframework.org/schema/context/spring-context.xsd
                            http://www.springframework.org/schema/integration
                            http://www.springframework.org/schema/integration/spring-integration.xsd
                            http://www.springframework.org/schema/integration/jdbc
                            http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd">

    <context:property-placeholder location="classpath:p4/jdbc.properties" />

    <bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
        <property name="driverClassName" value="${jdbc.driver.class}" />
        <property name="url" value="${jdbc.url}" />
        <property name="username" value="${jdbc.username}" />
        <property name="password" value="${jdbc.password}" />
    </bean>

    <bean id="inDbHandler" class="com.polarsparc.si.p4.DbProcessHandler" />

    <jdbc:inbound-channel-adapter channel="inChannel"
                                  data-source="dataSource"
                                  query="${query.orders}"
                                  update="${update.orders}"
                                  max-rows-per-poll="5">
        <int:poller id="poller" fixed-delay="10000" />
    </jdbc:inbound-channel-adapter>


    <int:service-activator input-channel="inChannel"
                            ref="inDbHandler"
                            method="handler" />

</beans>

Some aspects of the Listing.46 from the above needs a little explanation.

The following is our main application to test the jdbc channel adapter:


Listing.47
/*
 * Name:   JdbcProcessMainXml
 * Author: Bhaskar S
 * Date:   05/08/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p4;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class JdbcProcessMainXml {
    public static void main(String[] args) {
        new ClassPathXmlApplicationContext("p4/JdbcProcess.xml");
    }
}

To execute the code from Listing.47, open a terminal window and run the following commands:

$ cd $HOME/java/SpringIntegration

$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p4.JdbcProcessMainXml"

The following would be the typical output:

Output.19

[INFO] Scanning for projects...
[INFO] 
[INFO] ----------------< com.polarsparc.si:SpringIntegration >-----------------
[INFO] Building SpringIntegration 1.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration ---
2021-05-08 14:58:16:681 [com.polarsparc.si.p4.JdbcProcessMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-05-08 14:58:16:686 [com.polarsparc.si.p4.JdbcProcessMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-05-08 14:58:16:690 [com.polarsparc.si.p4.JdbcProcessMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-05-08 14:58:16:885 [com.polarsparc.si.p4.JdbcProcessMainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler'
2021-05-08 14:58:16:910 [com.polarsparc.si.p4.JdbcProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator} as a subscriber to the 'inChannel' channel
2021-05-08 14:58:16:910 [com.polarsparc.si.p4.JdbcProcessMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@3b71ca90.inChannel' has 1 subscriber(s).
2021-05-08 14:58:16:911 [com.polarsparc.si.p4.JdbcProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2021-05-08 14:58:16:912 [com.polarsparc.si.p4.JdbcProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-05-08 14:58:16:913 [com.polarsparc.si.p4.JdbcProcessMainXml.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@3b71ca90.errorChannel' has 1 subscriber(s).
2021-05-08 14:58:16:913 [com.polarsparc.si.p4.JdbcProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger'
2021-05-08 14:58:16:916 [com.polarsparc.si.p4.JdbcProcessMainXml.main()] INFO org.springframework.integration.endpoint.SourcePollingChannelAdapter - started bean 'org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0'; defined in: 'class path resource [p4/JdbcProcess.xml]'; from source: ''jdbc:inbound-channel-adapter''

Execute the following SQL commands using a database client such as DBeaver:

INSERT INTO ORDERS_TBL (ORDER_NO, ITEM, SHIPPED) VALUES('1', 'iPad Air 128G WiFi', 'N');

INSERT INTO ORDERS_TBL (ORDER_NO, ITEM, SHIPPED) VALUES('2', 'Roku Express 4K+', 'N');

INSERT INTO ORDERS_TBL (ORDER_NO, ITEM, SHIPPED) VALUES('3', '128GB MicroSD Card', 'N');

We will see the following update in the terminal output:

Output.20

2021-05-08 14:59:47:113 [task-scheduler-3] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
2021-05-08 14:59:47:118 [task-scheduler-3] INFO com.polarsparc.si.p4.DbProcessHandler - No of records to process: 3
2021-05-08 14:59:47:118 [task-scheduler-3] INFO com.polarsparc.si.p4.DbProcessHandler - Processed order 1 for item iPad Air 128G WiFi
2021-05-08 14:59:47:119 [task-scheduler-3] INFO com.polarsparc.si.p4.DbProcessHandler - Processed order 2 for item Roku Express 4K+
2021-05-08 14:59:47:119 [task-scheduler-3] INFO com.polarsparc.si.p4.DbProcessHandler - Processed order 3 for item 128GB MicroSD Card

The rows in the database are automatically updated (the column SHIPPED) is set to 'Y'.

Java Config based Approach

The following is the Java Config based POJO that defines the handler to display the rows retrieved from the database:


Listing.48
/*
 * Name:   DbProcessHandler2
 * Author: Bhaskar S
 * Date:   05/08/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p4;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;

import java.util.List;
import java.util.Map;

@Configuration
@EnableIntegration
public class DbProcessHandler2 {
    private static final Logger LOGGER = LoggerFactory.getLogger(DbProcessHandler2.class);

    private final static String ORDER_NO = "ORDER_NO";
    private final static String ITEM = "ITEM";

    @ServiceActivator(inputChannel = "inChannel")
    public void handler(List<Map<String, Object>> list) {
        LOGGER.info("No of records to process: {}", list.size());

        for (Map<String, Object> rec : list) {
            LOGGER.info("Processed order {} for item {}", rec.get(ORDER_NO), rec.get(ITEM));
        }
    }
}

The following is the Java Config based POJO that refers to the external jdbc.properties file and defines the jdbc data source and the input jdbc channel adapter similar to the way defined in the XML configuration file of Listing.46 above:


Listing.49
/*
 * Name:   JdbcProcessConfig
 * Author: Bhaskar S
 * Date:   05/08/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p4;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.jdbc.JdbcPollingChannelAdapter;
import org.springframework.jdbc.datasource.DriverManagerDataSource;

import javax.sql.DataSource;

@Configuration
@EnableIntegration
@PropertySource("classpath:p4/jdbc.properties")
public class JdbcProcessConfig {
    @Value("${jdbc.driver.class}")
    private String jdbcDriver;

    @Value("${jdbc.url}")
    private String jdbcUrl;

    @Value("${jdbc.username}")
    private String jdbcUser;

    @Value("${jdbc.password}")
    private String jdbcPassword;

    @Value("${query.orders}")
    private String selectOrders;

    @Value("${update.orders}")
    private String updateOrders;

    @Bean
    public DataSource jdbcDataSource() {
        DriverManagerDataSource source = new DriverManagerDataSource();
        source.setDriverClassName(jdbcDriver);
        source.setUrl(jdbcUrl);
        source.setUsername(jdbcUser);
        source.setPassword(jdbcPassword);
        return source;
    }

    @Bean
    @InboundChannelAdapter(channel = "inChannel", poller = @Poller(fixedDelay = "10000"))
    public MessageSource<Object> jdbcChannelAdapter() {
        JdbcPollingChannelAdapter adapter = new JdbcPollingChannelAdapter(jdbcDataSource(), selectOrders);
        adapter.setUpdateSql(updateOrders);
        adapter.setMaxRows(5);
        return adapter;
    }
}

Some aspects of the Listing.49 from the above needs a little explanation.

And finally, the following is the main application that uses the POJOs from Listing.48 and Listing.49 to test the jdbc channel adapter:


Listing.50
/*
 * Name:   JdbcProcessMainConfig
 * Author: Bhaskar S
 * Date:   05/08/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p4;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class JdbcProcessMainConfig {
    public static void main(String[] args) {
        new AnnotationConfigApplicationContext(DbProcessHandler2.class, JdbcProcessConfig.class);
    }
}

To execute the code from Listing.50, open a terminal window and run the following commands:

$ cd $HOME/java/SpringIntegration

$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p4.JdbcProcessMainConfig"

The following would be the typical output:

Output.21

[INFO] Scanning for projects...
[INFO] 
[INFO] ----------------< com.polarsparc.si:SpringIntegration >-----------------
[INFO] Building SpringIntegration 1.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration ---
2021-05-08 15:45:03:965 [com.polarsparc.si.p4.JdbcProcessMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-05-08 15:45:03:970 [com.polarsparc.si.p4.JdbcProcessMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-05-08 15:45:03:975 [com.polarsparc.si.p4.JdbcProcessMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-05-08 15:45:04:014 [com.polarsparc.si.p4.JdbcProcessMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-05-08 15:45:04:019 [com.polarsparc.si.p4.JdbcProcessMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-05-08 15:45:04:235 [com.polarsparc.si.p4.JdbcProcessMainConfig.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler'
2021-05-08 15:45:04:313 [com.polarsparc.si.p4.JdbcProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-05-08 15:45:04:314 [com.polarsparc.si.p4.JdbcProcessMainConfig.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@4c4f6598.errorChannel' has 1 subscriber(s).
2021-05-08 15:45:04:315 [com.polarsparc.si.p4.JdbcProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger'
2021-05-08 15:45:04:315 [com.polarsparc.si.p4.JdbcProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator:dbProcessHandler2.handler.serviceActivator} as a subscriber to the 'inChannel' channel
2021-05-08 15:45:04:316 [com.polarsparc.si.p4.JdbcProcessMainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@4c4f6598.inChannel' has 1 subscriber(s).
2021-05-08 15:45:04:317 [com.polarsparc.si.p4.JdbcProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'dbProcessHandler2.handler.serviceActivator'
2021-05-08 15:45:04:320 [com.polarsparc.si.p4.JdbcProcessMainConfig.main()] INFO org.springframework.integration.endpoint.SourcePollingChannelAdapter - started bean 'jdbcProcessConfig.jdbcChannelAdapter.inboundChannelAdapter'

Execute the following SQL commands using a database client such as DBeaver:

DELETE FROM ORDERS_TBL;

INSERT INTO ORDERS_TBL (ORDER_NO, ITEM, SHIPPED) VALUES('1', 'iPad Air 128G WiFi', 'N');

INSERT INTO ORDERS_TBL (ORDER_NO, ITEM, SHIPPED) VALUES('2', 'Roku Express 4K+', 'N');

INSERT INTO ORDERS_TBL (ORDER_NO, ITEM, SHIPPED) VALUES('3', '128GB MicroSD Card', 'N');

We will see the following update in the terminal output:

Output.22

2021-05-08 15:46:24:516 [task-scheduler-5] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
2021-05-08 15:46:24:519 [task-scheduler-5] INFO com.polarsparc.si.p4.DbProcessHandler2 - No of records to process: 3
2021-05-08 15:46:24:520 [task-scheduler-5] INFO com.polarsparc.si.p4.DbProcessHandler2 - Processed order 1 for item iPad Air 128G WiFi
2021-05-08 15:46:24:520 [task-scheduler-5] INFO com.polarsparc.si.p4.DbProcessHandler2 - Processed order 2 for item Roku Express 4K+
2021-05-08 15:46:24:521 [task-scheduler-5] INFO com.polarsparc.si.p4.DbProcessHandler2 - Processed order 3 for item 128GB MicroSD Card

The rows in the database are automatically updated (the column SHIPPED) is set to 'Y'.

As can be inferred from the Output.20 and Output.22 above, Spring Integration successfully processed the rows from the database.

Invoking a REST Service using HTTP

XML based Approach

For this example, we will invoke a free external web-service to get the spot prices on precious metals in a JSON format. The web-service endpoint to invoke is Spot Prices on Precious Metals.

The following is a simple interface to fetch the spot prices on precious metals:


Listing.51
/*
 * Name:   SpotPricesGateway
 * Author: Bhaskar S
 * Date:   05/08/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p4;

public interface SpotPricesGateway {
    public String preciousMetalPrices(String type);
}

The following is the XML based Spring Integration configuration that wires up the channels, the http channel adapter, and the endpoint:


Listing.52
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:int="http://www.springframework.org/schema/integration"
        xmlns:http="http://www.springframework.org/schema/integration/http"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
                            http://www.springframework.org/schema/beans/spring-beans.xsd
                            http://www.springframework.org/schema/integration
                            http://www.springframework.org/schema/integration/spring-integration.xsd
                            http://www.springframework.org/schema/integration/http
                            http://www.springframework.org/schema/integration/http/spring-integration-http.xsd">

    <int:logging-channel-adapter id="logger" level="INFO" log-full-message="true" />

    <int:channel id="inChannel">
        <int:interceptors>
            <int:wire-tap channel="logger" />
        </int:interceptors>
    </int:channel>

    <int:channel id="outChannel">
        <int:queue capacity="5" />
    </int:channel>

    <int:gateway id="gateway"
                  service-interface="com.polarsparc.si.p4.SpotPricesGateway">
        <int:method name="preciousMetalPrices"
                    request-channel="inChannel"
                    reply-channel="outChannel">
            <int:header name="f" expression="#args[0]" />
            <int:header name="Content-Type" value="application/json" />
        </int:method>
    </int:gateway>

    <http:outbound-gateway id="httpGateway"
                            request-channel="inChannel"
                            url="http://services.packetizer.com/spotprices/?f={f}"
                            http-method="GET"
                            expected-response-type="java.lang.String"
                            reply-timeout="10000"
                            reply-channel="outChannel">
        <http:uri-variable name="f" expression="headers.f" />
    </http:outbound-gateway>

</beans>

Some aspects of the Listing.52 from the above needs a little explanation.

The following is our main application to test the http channel adapter:


Listing.53
/*
 * Name:   SpotPricesMainXml
 * Author: Bhaskar S
 * Date:   05/08/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p4;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class SpotPricesMainXml {
    private static final Logger LOGGER = LoggerFactory.getLogger(SpotPricesMainXml.class);

    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("p4/SpotPrices.xml");

        SpotPricesGateway gateway = (SpotPricesGateway) context.getBean("gateway");

        LOGGER.info("Precious Metal Prices = {}", gateway.preciousMetalPrices("json"));
    }
}

To execute the code from Listing.53, open a terminal window and run the following commands:

$ cd $HOME/java/SpringIntegration

$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p4.SpotPricesMainXml"

The following would be the typical output:

Output.23

[INFO] Scanning for projects...
[INFO] 
[INFO] ----------------< com.polarsparc.si:SpringIntegration >-----------------
[INFO] Building SpringIntegration 1.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration ---
2021-05-08 16:26:37:579 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-05-08 16:26:37:582 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-05-08 16:26:37:588 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-05-08 16:26:37:807 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler'
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.springframework.util.ReflectionUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to constructor java.lang.invoke.MethodHandles$Lookup(java.lang.Class)
WARNING: Please consider reporting this to the maintainers of org.springframework.util.ReflectionUtils
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2021-05-08 16:26:37:909 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:logger.adapter} as a subscriber to the 'logger' channel
2021-05-08 16:26:37:909 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@761cb475.logger' has 1 subscriber(s).
2021-05-08 16:26:37:910 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'logger.adapter'; defined in: 'class path resource [p4/SpotPrices.xml]'; from source: ''int:logging-channel-adapter' with id='logger''
2021-05-08 16:26:37:911 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {http:outbound-gateway:httpGateway} as a subscriber to the 'inChannel' channel
2021-05-08 16:26:37:911 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@761cb475.inChannel' has 1 subscriber(s).
2021-05-08 16:26:37:912 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'httpGateway'
2021-05-08 16:26:37:913 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-05-08 16:26:37:913 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@761cb475.errorChannel' has 1 subscriber(s).
2021-05-08 16:26:37:914 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger'
2021-05-08 16:26:37:915 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#preciousMetalPrices(String)'
2021-05-08 16:26:37:915 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway'
2021-05-08 16:26:37:926 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.endpoint.PollingConsumer - started org.springframework.integration.endpoint.PollingConsumer@3ccf0884
2021-05-08 16:26:37:933 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.handler.LoggingHandler - GenericMessage [payload=json, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@4b153b1d, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@4b153b1d, id=2a42c117-1c9c-0b9a-0745-83dcc71aff6e, f=json, Content-Type=application/json, timestamp=1620505597932}]
2021-05-08 16:26:38:075 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO com.polarsparc.si.p4.SpotPricesMainXml - Precious Metal Prices = {
    "date" : "2021-05-07",
    "gold" : "1831.10",
    "silver" : "27.46",
    "platinum" : "1256.75"
}
--- CTRL-C ---
Java Config based Approach

The following is the Java Config based POJO that defines the gateway similar to the one defined in the XML configuration file of Listing.52 above:


Listing.54
/*
 * Name:   SpotPricesGateway2
 * Author: Bhaskar S
 * Date:   05/08/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p4;

import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.GatewayHeader;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.config.EnableIntegration;

@Configuration
@EnableIntegration
@IntegrationComponentScan
@MessagingGateway(name = "gateway", defaultRequestChannel = "inChannel", defaultReplyChannel = "outChannel")
public interface SpotPricesGateway2 {
    @Gateway(requestChannel = "inChannel", replyChannel = "outChannel",
            headers = { @GatewayHeader(name = "f", expression = "#args[0]"),
                    @GatewayHeader(name = "Content-Type", value = "application/json") })
    public String preciousMetalPrices(String type);
}

Some aspects of the Listing.54 from the above needs a little explanation.

The following is the Java Config based POJO that defines the input and output channels, a logging wiretap, and the out bound http channel adapter similar to the way defined in the XML configuration file of Listing.52 above:


Listing.55
/*
 * Name:   SpotPricesConfig
 * Author: Bhaskar S
 * Date:   05/08/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p4;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.http.HttpMethod;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.channel.interceptor.WireTap;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.http.outbound.HttpRequestExecutingMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableIntegration
public class SpotPricesConfig {
    @Bean
    @ServiceActivator(inputChannel = "logger")
    public LoggingHandler logHandler() {
        LoggingHandler logger = new LoggingHandler(LoggingHandler.Level.INFO);
        logger.setShouldLogFullMessage(true);
        return logger;
    }

    @Bean
    public MessageChannel inChannel() {
        AbstractMessageChannel channel = new DirectChannel();
        channel.addInterceptor(wireTap());
        return channel;
    }

    @Bean
    public WireTap wireTap() {
        return new WireTap("logger");
    }

    @Bean
    public MessageChannel outChannel() {
        return new QueueChannel(5);
    }

    @Bean
    @ServiceActivator(inputChannel = "inChannel")
    public MessageHandler httpGateway() {
        HttpRequestExecutingMessageHandler handler =
                new HttpRequestExecutingMessageHandler("http://services.packetizer.com/spotprices/?f={f}");
        handler.setHttpMethod(HttpMethod.GET);
        handler.setExpectedResponseType(String.class);
        handler.setSendTimeout(10000);
        handler.setOutputChannel(outChannel());

        ExpressionParser parser = new SpelExpressionParser();
        Map<String, Expression> variables = new HashMap<>();
        variables.put("f", parser.parseExpression("headers.f"));

        handler.setUriVariableExpressions(variables);
        return handler;
    }
}

Some aspects of the Listing.55 from the above needs a little explanation.

And finally, the following is the main application that uses the POJOs from Listing.54 and Listing.55 to test the http channel adapter:


Listing.56
/*
 * Name:   SpotPricesMainConfig
 * Author: Bhaskar S
 * Date:   05/08/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p4;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class SpotPricesMainConfig {
    private static final Logger LOGGER = LoggerFactory.getLogger(SpotPricesMainConfig.class);

    public static void main(String[] args) {
        ApplicationContext context = new AnnotationConfigApplicationContext(SpotPricesGateway2.class,
                SpotPricesConfig.class);

        SpotPricesGateway2 gateway = (SpotPricesGateway2) context.getBean("gateway");

        LOGGER.info("Precious Metal Prices with Config = {}", gateway.preciousMetalPrices("json"));
    }
}

To execute the code from Listing.56, open a terminal window and run the following commands:

$ cd $HOME/java/SpringIntegration

$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p4.SpotPricesMainConfig"

The following would be the typical output:

Output.24

[INFO] Scanning for projects...
[INFO] 
[INFO] ----------------< com.polarsparc.si:SpringIntegration >-----------------
[INFO] Building SpringIntegration 1.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration ---
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.springframework.cglib.core.ReflectUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to method java.lang.ClassLoader.defineClass(java.lang.String,byte[],int,int,java.security.ProtectionDomain)
WARNING: Please consider reporting this to the maintainers of org.springframework.cglib.core.ReflectUtils
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2021-05-08 20:09:11:006 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-05-08 20:09:11:010 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-05-08 20:09:11:014 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-05-08 20:09:11:048 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-05-08 20:09:11:081 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-05-08 20:09:11:211 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler'
2021-05-08 20:09:11:383 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-05-08 20:09:11:384 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@7bb01ac9.errorChannel' has 1 subscriber(s).
2021-05-08 20:09:11:385 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger'
2021-05-08 20:09:11:385 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {message-handler:spotPricesConfig.logHandler.serviceActivator} as a subscriber to the 'logger' channel
2021-05-08 20:09:11:386 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@7bb01ac9.logger' has 1 subscriber(s).
2021-05-08 20:09:11:387 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'spotPricesConfig.logHandler.serviceActivator'
2021-05-08 20:09:11:387 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {http:outbound-gateway:spotPricesConfig.httpGateway.serviceActivator} as a subscriber to the 'inChannel' channel
2021-05-08 20:09:11:388 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@7bb01ac9.inChannel' has 1 subscriber(s).
2021-05-08 20:09:11:389 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'spotPricesConfig.httpGateway.serviceActivator'
2021-05-08 20:09:11:390 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#preciousMetalPrices(String)'
2021-05-08 20:09:11:390 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway'
2021-05-08 20:09:11:403 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.endpoint.PollingConsumer - started org.springframework.integration.endpoint.PollingConsumer@3d38adba
2021-05-08 20:09:11:410 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.handler.LoggingHandler - GenericMessage [payload=json, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@bcdb17b, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@bcdb17b, id=22abb0b7-fccc-d9cd-649e-7f1741c4eaa7, f=json, Content-Type=application/json, timestamp=1620565391409}]
2021-05-08 20:09:11:564 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO com.polarsparc.si.p4.SpotPricesMainConfig - Precious Metal Prices with Config = {
    "date" : "2021-05-07",
    "gold" : "1831.10",
    "silver" : "27.46",
    "platinum" : "1256.75"
}
--- CTRL-C ---

As can be inferred from the Output.23 and Output.24 above, Spring Integration successfully invoked the web-service and fetched the current spot prices of precious metals.

References

Spring Integration Notes :: Part - 3

Spring Integration Notes :: Part - 2

Spring Integration Notes :: Part - 1

Spring Integration - JDBC Support

Spring Integration - HTTP Support



© PolarSPARC