PolarSPARC

Spring Integration Notes :: Part - 3


Bhaskar S 05/01/2021 (UPDATED)


Overview

In Part-2, we covered basic examples of Spring Integration relating to Logging of Messages, Multi-Threading, and Exception Handling.

We will continue our journey to explore some basic examples in Spring Integration relating to the following:

In Spring Integration, to connect to an external system such as a Filesystem, or an SFTP, or a Database, or a Messaging System etc., one needs to use the appropriate Channel Adapter.

A Channel Adapter is an endpoint that connects a channel to the external system(s) or transport(s).

A Channel Adapter can be:

Spring Integration comes with a number of out-of-the-box Channel Adapters.

Setup

Ensure Docker is setup in the Linux system as we will be demonstrating the channel adapters relating to SFTP using the appropriate docker image.

To setup the Java directory structure for the demonstrations in this part, execute the following commands:

$ cd $HOME/java/SpringIntegration

$ mkdir -p src/main/java/com/polarsparc/si/p3

$ mkdir -p src/main/resources/p3

$ mkdir -p src/main/resources/META-INF/keys

To setup the other directory structure(s), execute the following commands:

$ mkdir -p /tmp/in-dir

$ mkdir -p /tmp/out-dir

To download the required docker image for the sftp server, execute the following command:

$ docker pull atmoz/sftp

To create the asymmetric public-private keys for sftp, execute the following commands:

$ mkdir -p $HOME/Downloads/sftp/share

$ cd $HOME/Downloads/sftp

$ ssh-keygen -m pem -t rsa -b 2048 -f sftp_rsa_key -C noname < /dev/null

The private key is stored in the file $HOME/Downloads/sftp/sftp_rsa_key and the public key is stored in the file $HOME/Downloads/sftp/sftp_rsa_key.pub.

To setup the private key for the sftp client, execute the following commands:

$ cd $HOME/Downloads/sftp

$ mv sftp_rsa_key $HOME/java/SpringIntegration/src/main/resources/META-INF/keys


The following is the listing for the updated Maven project file pom.xml to support the file and sftp channel adapters:


pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>com.polarsparc.si</groupId>
    <artifactId>SpringIntegration</artifactId>
    <version>1.0</version>
    <name>SpringIntegration</name>
    <description>Spring Integration Examples</description>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.8.1</version>
                    <configuration>
                        <fork>true</fork>
                        <meminitial>128m</meminitial>
                        <maxmem>512m</maxmem>
                        <source>11</source>
                        <target>11</target>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <dependencies>
        <dependency>
            <groupId>javax.annotation</groupId>
            <artifactId>javax.annotation-api</artifactId>
            <version>1.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>5.3.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.3.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>5.3.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
            <version>5.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-file</artifactId>
            <version>5.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-sftp</artifactId>
            <version>5.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.30</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>log4j-over-slf4j</artifactId>
            <version>1.7.30</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.30</version>
        </dependency>
    </dependencies>

</project>

Hands-on Spring Integration

Polling for Files in a Directory

In this example, we poll for file(s) matching a regular expression file name pattern in a specified input directory. On finding new file(s), they are processed by a handler, moved to a specified output directory, and deleted from the input directory.

We will use an external properties file to configure the locations of the source (input) and target (output) directories versus hardcoding the locations in Spring Integration configuration.

The following is the properties file file.properties located in the resources/p3 directory:


Listing.31
#
# Properties for file processing
#

input.files.directory=/tmp/in-dir
output.files.directory=/tmp/out-dir
files.regex.pattern="[a-z]+.dat"

XML based Approach

The following is the file handler POJO that displays the file name and file size:


Listing.32
/*
 * Name:   FileProcessHandler
 * Author: Bhaskar S
 * Date:   05/01/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p3;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;

public class FileProcessHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(FileProcessHandler.class);

    public File handler(File input) {
        LOGGER.info("Processed input file: {}, size: {} (using Xml)", input.getAbsolutePath(), input.length());

        return input;
    }
}

The following is the XML based Spring Integration configuration that refers to the external file.properties file and wires up the channels, the input and output file channel adapters, and the endpoint:


Listing.33
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:int="http://www.springframework.org/schema/integration"
        xmlns:file="http://www.springframework.org/schema/integration/file"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
                            http://www.springframework.org/schema/beans/spring-beans.xsd
                            http://www.springframework.org/schema/context
                            http://www.springframework.org/schema/context/spring-context.xsd
                            http://www.springframework.org/schema/integration
                            http://www.springframework.org/schema/integration/spring-integration.xsd
                            http://www.springframework.org/schema/integration/file
                            http://www.springframework.org/schema/integration/file/spring-integration-file.xsd">

    <context:property-placeholder location="classpath:p3/file.properties" />

    <bean id="inFileHandler" class="com.polarsparc.si.p3.FileProcessHandler" />

    <file:inbound-channel-adapter id="inChannel"
                                  directory="file:${input.files.directory}"
                                  filename-regex="${files.regex.pattern}"
                                  prevent-duplicates="true">
        <int:poller id="poller" fixed-delay="5000" />
    </file:inbound-channel-adapter>

    <file:outbound-channel-adapter id="outChannel"
                                    directory="file:${output.files.directory}"
                                    delete-source-files="true" />

    <int:service-activator input-channel="inChannel"
                            output-channel="outChannel"
                            ref="inFileHandler"
                            method="handler" />

</beans>

Some aspects of the Listing.33 from the above needs a little explanation.

The following is our main application to test the file channel adapter:


Listing.34
/*
 * Name:   FileProcessMainXml
 * Author: Bhaskar S
 * Date:   05/01/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p3;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class FileProcessMainXml {
    public static void main(String[] args) {
        new ClassPathXmlApplicationContext("p3/FileProcess.xml");
    }
}

To execute the code from Listing.34, open a terminal window and run the following commands:

$ cd $HOME/java/SpringIntegration

$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p3.FileProcessMainXml"

The following would be the typical output:

Output.11

[INFO] Scanning for projects...
[INFO] 
[INFO] ----------------< com.polarsparc.si:SpringIntegration >-----------------
[INFO] Building SpringIntegration 1.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration ---
2021-05-01 11:56:37:290 [com.polarsparc.si.p3.FileProcessMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-05-01 11:56:37:293 [com.polarsparc.si.p3.FileProcessMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-05-01 11:56:37:298 [com.polarsparc.si.p3.FileProcessMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-05-01 11:56:37:518 [com.polarsparc.si.p3.FileProcessMainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler'
2021-05-01 11:56:37:543 [com.polarsparc.si.p3.FileProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {file:outbound-channel-adapter:outChannel.adapter} as a subscriber to the 'outChannel' channel
2021-05-01 11:56:37:544 [com.polarsparc.si.p3.FileProcessMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@42062658.outChannel' has 1 subscriber(s).
2021-05-01 11:56:37:545 [com.polarsparc.si.p3.FileProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'outChannel.adapter'; defined in: 'class path resource [p3/FileProcess.xml]'; from source: ''file:outbound-channel-adapter' with id='outChannel''
2021-05-01 11:56:37:546 [com.polarsparc.si.p3.FileProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator} as a subscriber to the 'inChannel' channel
2021-05-01 11:56:37:547 [com.polarsparc.si.p3.FileProcessMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@42062658.inChannel' has 1 subscriber(s).
2021-05-01 11:56:37:547 [com.polarsparc.si.p3.FileProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2021-05-01 11:56:37:548 [com.polarsparc.si.p3.FileProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-05-01 11:56:37:549 [com.polarsparc.si.p3.FileProcessMainXml.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@42062658.errorChannel' has 1 subscriber(s).
2021-05-01 11:56:37:550 [com.polarsparc.si.p3.FileProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger'
2021-05-01 11:56:37:554 [com.polarsparc.si.p3.FileProcessMainXml.main()] INFO org.springframework.integration.endpoint.SourcePollingChannelAdapter - started bean 'inChannel.adapter'; defined in: 'class path resource [p3/FileProcess.xml]'; from source: ''file:inbound-channel-adapter' with id='inChannel''

Create a file called abc.dat in the directory /tmp/in-dir by executing the following command:

$ echo 'Spring Integration' > /tmp/in-dir/abc.dat

We will see the following update in the terminal output:

Output.12

2021-05-01 11:58:02:569 [task-scheduler-6] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
2021-05-01 11:58:02:575 [task-scheduler-6] INFO com.polarsparc.si.p3.FileProcessHandler - Processed input file: /tmp/in-dir/abc.dat, size: 19 (using Xml)

Now, create another file called def.txt in the directory /tmp/in-dir by executing the following command:

$ echo 'Ignored' > /tmp/in-dir/def.txt

Nothing will happen as the file name does not match the pattern.

Java Config based Approach

The following is the Java Config based POJO that defines the file handler endpoint that displays the file name and file size:


Listing.35
/*
 * Name:   FileProcessHandler2
 * Author: Bhaskar S
 * Date:   05/01/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p3;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;

import java.io.File;

@Configuration
@EnableIntegration
public class FileProcessHandler2 {
    private static final Logger LOGGER = LoggerFactory.getLogger(FileProcessHandler2.class);

    @ServiceActivator(inputChannel = "inChannel", outputChannel = "outChannel")
    public File handler(File input) {
        LOGGER.info("Processed input file: {}, size: {} (using Config)", input.getAbsolutePath(), input.length());

        return input;
    }
}

The following is the Java Config based POJO that refers to the external file.properties file and defines the input and output file channel adapters as well as the endpoint similar to the way defined in the XML configuration file of Listing.33 above:


Listing.36
/*
 * Name:   FileProcessConfig
 * Author: Bhaskar S
 * Date:   05/01/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p3;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.FileWritingMessageHandler;
import org.springframework.integration.file.filters.AcceptOnceFileListFilter;
import org.springframework.integration.file.filters.CompositeFileListFilter;
import org.springframework.integration.file.filters.RegexPatternFileListFilter;
import org.springframework.messaging.MessageHandler;

import java.io.File;

@Configuration
@EnableIntegration
@PropertySource("classpath:p3/file.properties")
public class FileProcessConfig {
    @Value("${input.files.directory}")
    private String filesInDir;

    @Value("${output.files.directory}")
    private String filesOutDir;

    @Bean
    @InboundChannelAdapter(value = "inChannel", poller = @Poller(fixedDelay = "5000"))
    public MessageSource<File> fileInboundChannelAdapter() {
        FileReadingMessageSource adapter = new FileReadingMessageSource();
        adapter.setDirectory(new File(filesInDir));
        CompositeFileListFilter<File> filter = new CompositeFileListFilter<>();
        filter.addFilter(new AcceptOnceFileListFilter<>());
        filter.addFilter(new RegexPatternFileListFilter("[a-z]+.dat"));
        adapter.setFilter(filter);
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "outChannel")
    public MessageHandler fileOutboundChannelAdapter() {
        FileWritingMessageHandler adapter = new FileWritingMessageHandler(new File(filesOutDir));
        adapter.setDeleteSourceFiles(true);
        adapter.setExpectReply(false);
        return adapter;
    }
}

Some aspects of the Listing.36 from the above needs a little explanation.

And finally, the following is the main application that uses the POJOs from Listing.35 and Listing.36 to test the file channel adapter:


Listing.37
/*
 * Name:   FileProcessMainConfig
 * Author: Bhaskar S
 * Date:   05/01/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p3;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class FileProcessMainConfig {
    public static void main(String[] args) {
        new AnnotationConfigApplicationContext(FileProcessHandler2.class, FileProcessConfig.class);
    }
}

To execute the code from Listing.37, open a terminal window and run the following commands:

$ cd $HOME/java/SpringIntegration

$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p3.FileProcessMainConfig"

The following would be the typical output:

Output.13

[INFO] Scanning for projects...
[INFO] 
[INFO] ----------------< com.polarsparc.si:SpringIntegration >-----------------
[INFO] Building SpringIntegration 1.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration ---
2021-05-01 12:11:33:439 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-05-01 12:11:33:444 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-05-01 12:11:33:448 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-05-01 12:11:33:485 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-05-01 12:11:33:489 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-05-01 12:11:33:702 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler'
2021-05-01 12:11:33:792 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-05-01 12:11:33:793 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@396abb96.errorChannel' has 1 subscriber(s).
2021-05-01 12:11:33:794 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger'
2021-05-01 12:11:33:794 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator:fileProcessHandler2.handler.serviceActivator} as a subscriber to the 'inChannel' channel
2021-05-01 12:11:33:795 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@396abb96.inChannel' has 1 subscriber(s).
2021-05-01 12:11:33:796 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'fileProcessHandler2.handler.serviceActivator'
2021-05-01 12:11:33:796 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {file:outbound-channel-adapter:fileProcessConfig.fileOutboundChannelAdapter.serviceActivator} as a subscriber to the 'outChannel' channel
2021-05-01 12:11:33:797 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@396abb96.outChannel' has 1 subscriber(s).
2021-05-01 12:11:33:798 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'fileProcessConfig.fileOutboundChannelAdapter.serviceActivator'
2021-05-01 12:11:33:802 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.integration.endpoint.SourcePollingChannelAdapter - started bean 'fileProcessConfig.fileInboundChannelAdapter.inboundChannelAdapter'

Once again, create a file called abc.dat in the directory /tmp/in-dir by executing the following command:

$ echo 'Spring Integration' > /tmp/in-dir/abc.dat

We will see the following update in the terminal output:

Output.14

2021-05-01 12:13:33:818 [task-scheduler-3] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
2021-05-01 12:13:33:824 [task-scheduler-3] INFO com.polarsparc.si.p3.FileProcessHandler2 - Processed input file: /tmp/in-dir/abc.dat, size: 19 (using Config)

Now, create another file called def.txt in the directory /tmp/in-dir by executing the following command:

$ echo 'Ignored' > /tmp/in-dir/def.txt

Nothing will happen as the file name does not match the pattern.

As can be inferred from the Output.12 and Output.14 above, Spring Integration successfully processed the two input files using the file channel adapter.

Transfer Files from a Remote Directory using SFTP

The Secure File Transfer Protocol (SFTP) is a network protocol that allows one to transfer file(s) between two computers over the network using a reliable network stream.

In order to transfer file(s) using SFTP, a client (consumer) initiates a connection to a remote host running the SFTP server. For this demonstration, we will run the SFTP server using docker.

Assuming the currently logged in user-id is polarsparc, to start the SFTP server, open a terminal window and execute the following command:

$ docker run --rm --name atmoz-sftp -v $HOME/Downloads/sftp/sftp_rsa_key.pub:/home/polarsparc/.ssh/keys/sftp_rsa_key.pub:ro -v $HOME/Downloads/sftp/share:/home/polarsparc/ -p 2222:22 -d atmoz/sftp polarsparc:polarsparc:1000

The following are the contents of the properties file sftp.properties:


Listing.38
#
# Properties for sftp processing
#

sftp.host=localhost
sftp.port=2222
sftp.username=polarsparc
sftp.privateKey=classpath:META-INF/keys/sftp_rsa_key
sftp.allowUnknownKeys=true
sftp.file.pattern=*.dat
sftp.remote.dir=.
sftp.local.dir=/tmp/out-dir

XML based Approach

For processing the files from the SFTP site, we will leverage the same POJO defined in Listing.32 above.

The following is the XML based Spring Integration configuration that wires up a SFTP session factory, the necessary channels, the sftp channel adapter, and the endpoint:


Listing.39
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:int="http://www.springframework.org/schema/integration"
        xmlns:sftp="http://www.springframework.org/schema/integration/sftp"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
                            http://www.springframework.org/schema/beans/spring-beans.xsd
                            http://www.springframework.org/schema/context
                            http://www.springframework.org/schema/context/spring-context.xsd
                            http://www.springframework.org/schema/integration
                            http://www.springframework.org/schema/integration/spring-integration.xsd
                            http://www.springframework.org/schema/integration/sftp
                            http://www.springframework.org/schema/integration/sftp/spring-integration-sftp.xsd">

    <context:property-placeholder location="classpath:p3/sftp.properties" />

    <bean id="inFileHandler" class="com.polarsparc.si.p3.FileProcessHandler" />

    <bean id="sftpSessionFactory" class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
        <property name="host" value="${sftp.host}" />
        <property name="port" value="${sftp.port}" />
        <property name="user" value="${sftp.username}" />
        <property name="privateKey" value="${sftp.privateKey}" />
        <property name="allowUnknownKeys" value="${sftp.allowUnknownKeys}" />
    </bean>

    <sftp:inbound-channel-adapter id="sftpInBound"
                                  channel="inChannel"
                                  session-factory="sftpSessionFactory"
                                  filename-pattern="${sftp.file.pattern}"
                                  delete-remote-files="false"
                                  remote-directory="${sftp.remote.dir}"
                                  local-directory="${sftp.local.dir}">
        <int:poller id="poller" fixed-rate="5000" />
    </sftp:inbound-channel-adapter>

    <int:service-activator input-channel="inChannel"
                            output-channel="nullChannel"
                            ref="inFileHandler"
                            method="handler" />

</beans>

Some aspects of the Listing.39 from the above needs a little explanation.

The following is our main application to test the sftp channel adapter:


Listing.40
/*
 * Name:   SftpProcessMainXml
 * Author: Bhaskar S
 * Date:   05/01/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p3;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class SftpProcessMainXml {
    public static void main(String[] args) {
        new ClassPathXmlApplicationContext("p3/SftpProcess.xml");
    }
}

To execute the code from Listing.40, open a terminal window and run the following commands:

$ cd $HOME/java/SpringIntegration

$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p3.SftpProcessMainXml"

The following would be the typical output:

Output.15

[INFO] Scanning for projects...
[INFO] 
[INFO] ----------------< com.polarsparc.si:SpringIntegration >-----------------
[INFO] Building SpringIntegration 1.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration ---
2021-05-01 12:51:32:146 [com.polarsparc.si.p3.SftpProcessMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-05-01 12:51:32:152 [com.polarsparc.si.p3.SftpProcessMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-05-01 12:51:32:156 [com.polarsparc.si.p3.SftpProcessMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-05-01 12:51:32:384 [com.polarsparc.si.p3.SftpProcessMainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler'
2021-05-01 12:51:32:411 [com.polarsparc.si.p3.SftpProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator} as a subscriber to the 'inChannel' channel
2021-05-01 12:51:32:412 [com.polarsparc.si.p3.SftpProcessMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@4278a583.inChannel' has 1 subscriber(s).
2021-05-01 12:51:32:413 [com.polarsparc.si.p3.SftpProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2021-05-01 12:51:32:413 [com.polarsparc.si.p3.SftpProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-05-01 12:51:32:414 [com.polarsparc.si.p3.SftpProcessMainXml.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@4278a583.errorChannel' has 1 subscriber(s).
2021-05-01 12:51:32:415 [com.polarsparc.si.p3.SftpProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger'
2021-05-01 12:51:32:418 [com.polarsparc.si.p3.SftpProcessMainXml.main()] INFO org.springframework.integration.endpoint.SourcePollingChannelAdapter - started bean 'sftpInBound'; defined in: 'class path resource [p3/SftpProcess.xml]'; from source: ''sftp:inbound-channel-adapter' with id='sftpInBound''
2021-05-01 12:51:32:665 [task-scheduler-1] INFO org.springframework.integration.sftp.session.DefaultSftpSessionFactory - The authenticity of host 'localhost' can't be established.
RSA key fingerprint is f6:24:d1:19:a9:4e:8d:6b:47:15:36:54:55:31:44:21.
Are you sure you want to continue connecting?
2021-05-01 12:51:32:666 [task-scheduler-1] WARN com.jcraft.jsch - Permanently added 'localhost' (RSA) to the list of known hosts.

Create a file called abc.dat in the directory /tmp/in-dir and transfer to the sftp remote directory by executing the following commands:

$ echo 'Spring Integration' > /tmp/in-dir/abc.dat

$ sudo mv /tmp/in-dir/abc.dat $HOME/Downloads/DATA/sftp/share

We will see the following update in the terminal output:

Output.16

2021-05-01 12:55:32:801 [task-scheduler-3] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
2021-05-01 12:55:32:805 [task-scheduler-3] INFO com.polarsparc.si.p3.FileProcessHandler - Processed input file: /tmp/out-dir/abc.dat, size: 19 (using Xml)

Now, create another file called def.txt in the directory /tmp/in-dir and transfer to the sftp remote directory by executing the following commands:

$ echo 'Ignored' > /tmp/in-dir/def.txt

$ sudo mv /tmp/in-dir/def.txt $HOME/Downloads/DATA/sftp/share

Nothing will happen as the file name does not match the pattern.

Java Config based Approach

The following is the Java Config based POJO that defines the file handler endpoint that displays the file name and file size:


Listing.41
/*
 * Name:   FileProcessHandler3
 * Author: Bhaskar S
 * Date:   05/01/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p3;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;

import java.io.File;

@Configuration
@EnableIntegration
public class FileProcessHandler3 {
    private static final Logger LOGGER = LoggerFactory.getLogger(FileProcessHandler3.class);

    @ServiceActivator(inputChannel = "inChannel", outputChannel = "nullChannel")
    public File handler(File input) {
        LOGGER.info("Processed input file: {}, size: {} (using Config)", input.getAbsolutePath(), input.length());

        return input;
    }
}

The following is the Java Config based POJO refers to the external sftp.properties file and defines the SFTP session factory, the sftp channel adapter, and the endpoint similar to the way defined in the XML configuration file of Listing.39 above:


Listing.42
/*
 * Name:   SftpProcessConfig
 * Author: Bhaskar S
 * Date:   04/24/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p3;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.io.Resource;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.file.filters.AcceptOnceFileListFilter;
import org.springframework.integration.sftp.filters.SftpSimplePatternFileListFilter;
import org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizer;
import org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizingMessageSource;
import org.springframework.integration.sftp.session.DefaultSftpSessionFactory;

import java.io.File;

@Configuration
@EnableIntegration
@PropertySource("classpath:p3/sftp.properties")
public class SftpProcessConfig {
    private static final Logger LOGGER = LoggerFactory.getLogger(SftpProcessConfig.class);

    @Value("${sftp.host}")
    private String sftpHost;

    @Value("${sftp.port:22}")
    private int sftpPort;

    @Value("${sftp.username}")
    private String sftpUser;

    @Value("${sftp.privateKey}")
    private Resource sftpPrivateKey;

    @Value("${sftp.allowUnknownKeys}")
    private boolean sftpAllowUnknownKeys;

    @Value("${sftp.remote.dir}")
    private String sftpRemoteDir;

    @Value("${sftp.local.dir}")
    private String sftpLocalDir;

    @Value("${sftp.file.pattern}")
    private String sftpFilePattern;

    @Bean
    public DefaultSftpSessionFactory sftpSessionFactory() {
        LOGGER.info(String.format("Host: %s, Port: %d, User: %s", sftpHost, sftpPort, sftpUser));

        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
        factory.setHost(sftpHost);
        factory.setPort(sftpPort);
        factory.setUser(sftpUser);
        factory.setPrivateKey(sftpPrivateKey);
        factory.setAllowUnknownKeys(sftpAllowUnknownKeys);
        return factory;
    }

    @Bean
    public SftpInboundFileSynchronizer sftpFileSynchronizer() {
        LOGGER.info(String.format("Remote Dir: %s", sftpRemoteDir));

        SftpInboundFileSynchronizer synchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
        synchronizer.setDeleteRemoteFiles(false);
        synchronizer.setRemoteDirectory(sftpRemoteDir);
        synchronizer.setFilter(new SftpSimplePatternFileListFilter(sftpFilePattern));
        return synchronizer;
    }

    @Bean
    @InboundChannelAdapter(channel = "inChannel", poller = @Poller(fixedDelay = "5000"))
    public MessageSource<File> sftpChannelAdapter() {
        LOGGER.info(String.format("Local Dir: %s", sftpLocalDir));

        SftpInboundFileSynchronizingMessageSource adapter =
                new SftpInboundFileSynchronizingMessageSource(sftpFileSynchronizer());
        adapter.setLocalDirectory(new File(sftpLocalDir));
        adapter.setMaxFetchSize(1);
        adapter.setLocalFilter(new AcceptOnceFileListFilter<File>());
        return adapter;
    }
}

Some aspects of the Listing.42 from the above needs a little explanation.

And finally, the following is the main application that uses the POJOs from Listing.41 and Listing.42 to test the sftp channel adapter:


Listing.43
/*
 * Name:   SftpProcessMainConfig
 * Author: Bhaskar S
 * Date:   05/01/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p3;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class SftpProcessMainConfig {
    public static void main(String[] args) {
        new AnnotationConfigApplicationContext(FileProcessHandler3.class, SftpProcessConfig.class);
    }
}

To execute the code from Listing.43, open a terminal window and run the following commands:

$ cd $HOME/java/SpringIntegration

$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p3.SftpProcessMainConfig"

The following would be the typical output:

Output.17

[INFO] Scanning for projects...
[INFO] 
[INFO] ----------------< com.polarsparc.si:SpringIntegration >-----------------
[INFO] Building SpringIntegration 1.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration ---
2021-05-01 13:04:51:311 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-05-01 13:04:51:317 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-05-01 13:04:51:321 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-05-01 13:04:51:358 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-05-01 13:04:51:362 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-05-01 13:04:51:488 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO com.polarsparc.si.p3.SftpProcessConfig - Host: localhost, Port: 2222, User: bswamina
2021-05-01 13:04:51:495 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO com.polarsparc.si.p3.SftpProcessConfig - Remote Dir: .
2021-05-01 13:04:51:509 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO com.polarsparc.si.p3.SftpProcessConfig - Local Dir: /tmp/out-dir
2021-05-01 13:04:51:575 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler'
2021-05-01 13:04:51:653 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-05-01 13:04:51:654 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@396abb96.errorChannel' has 1 subscriber(s).
2021-05-01 13:04:51:655 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger'
2021-05-01 13:04:51:655 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator:fileProcessHandler3.handler.serviceActivator} as a subscriber to the 'inChannel' channel
2021-05-01 13:04:51:656 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@396abb96.inChannel' has 1 subscriber(s).
2021-05-01 13:04:51:657 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'fileProcessHandler3.handler.serviceActivator'
2021-05-01 13:04:51:660 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO org.springframework.integration.endpoint.SourcePollingChannelAdapter - started bean 'sftpProcessConfig.sftpChannelAdapter.inboundChannelAdapter'
2021-05-01 13:04:51:885 [task-scheduler-1] INFO org.springframework.integration.sftp.session.DefaultSftpSessionFactory - The authenticity of host 'localhost' can't be established.
RSA key fingerprint is f6:24:d1:19:a9:4e:8d:6b:47:15:36:54:55:31:44:21.
Are you sure you want to continue connecting?
2021-05-01 13:04:51:886 [task-scheduler-1] WARN com.jcraft.jsch - Permanently added 'localhost' (RSA) to the list of known hosts.

Once again, create a file called abc.dat in the directory /tmp/in-dir and transfer to the sftp remote directory by executing the following commands:

$ echo 'Spring Integration' > /tmp/in-dir/abc.dat

$ sudo mv /tmp/in-dir/abc.dat $HOME/Downloads/DATA/sftp/share

We will see the following update in the terminal output:

Output.18

2021-05-01 13:07:11:763 [task-scheduler-2] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
2021-05-01 13:07:11:766 [task-scheduler-2] INFO com.polarsparc.si.p3.FileProcessHandler3 - Processed input file: /tmp/out-dir/abc.dat, size: 19 (using Config)

Now, create another file called def.txt in the directory /tmp/in-dir and transfer to the sftp remote directory by executing the following commands:

$ echo 'Ignored' > /tmp/in-dir/def.txt

$ sudo mv /tmp/in-dir/def.txt $HOME/Downloads/DATA/sftp/share

Nothing will happen as the file name does not match the pattern.

As can be inferred from the Output.16 and Output.18 above, Spring Integration successfully transferred and processed the two files.

References

Spring Integration Notes :: Part - 2

Spring Integration Notes :: Part - 1

Spring Integration - File Support

Spring Integration - SFTP Adapters



© PolarSPARC