需要在 Spring 集成中并行处理多个文件
Need to process multiple files in parallel in Spring Integration
我有一个 SFTP 目录和读取文件并将文件发送到 ServiceActivator.At 我需要使用处理程序并行处理它们的任何点。
这是我的 SPring 集成 java DSL 流程。
IntegrationFlows.from(Sftp.inboundAdapter(getSftpSessionFactory())
.temporaryFileSuffix("COPY")
.localDirectory(directory)
.deleteRemoteFiles(false)
.preserveTimestamp(true)
.remoteDirectory("remoteDir"))
.patternFilter("*.txt")), e -> e.poller(Pollers.fixedDelay(500).maxMessagesPerPoll(5)))
.handle("mybean", "myMethod")
.handle(Files.outboundAdapter(new File("success")))
.deleteSourceFiles(true)
.autoCreateDirectory(true))
.get();
更新:这是我的 ThreadPoolExecutor:
@Bean(name = "executor")
public Executor getExecutor()
{
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(4);
executor.setQueueCapacity(20);
executor.initialize();
return executor;
}
Sftp.inboundAdapter()
(SftpInboundFileSynchronizingMessageSource
) returns 无论如何远程文件一个一个。首先,它将它们同步到本地目录,然后才轮询它们以将消息处理为 File
有效负载。
要并行处理它们,只需将 taskExecutor
添加到您的 e.poller()
定义中,所有这些 maxMessagesPerPoll(5)
将分发到不同的线程。
对于我的用例,将 taskExecutor 分配给通道就可以了。
这是我用于轮询文件夹并处理文件的非常简单的应用程序的代码。文件是并行处理的,即使我只为轮询器分配了一个线程。
@Slf4j
@SpringBootApplication
public class SpringIntegrationDemoApplication {
public static void main( String[] args ) {
SpringApplication.run( SpringIntegrationDemoApplication.class, args );
}
@Bean
public IntegrationFlow fileReadingFlow() {
return IntegrationFlows
.from( Files.inboundAdapter( new File( "/tmp/in" ) ).patternFilter( "*.txt" ),
e ->
e.poller( Pollers.fixedDelay( 10000 )
// .taskExecutor( Executors.newFixedThreadPool( 1 ) )
.maxMessagesPerPoll( 100 )
) )
.channel( MessageChannels.executor( Executors.newFixedThreadPool( 100 ) ) )
.handle( "myFileHandler", "handleMessage" )
.get();
}
@Bean
public MessageHandler myFileHandler() {
return message -> {
try {
log.info( "START" + message.getPayload() );
Thread.sleep( 2000 );
log.info( "STOP" + message.getPayload() );
}
catch ( InterruptedException e ) {
e.printStackTrace();
}
};
}
}
我有一个 SFTP 目录和读取文件并将文件发送到 ServiceActivator.At 我需要使用处理程序并行处理它们的任何点。
这是我的 SPring 集成 java DSL 流程。
IntegrationFlows.from(Sftp.inboundAdapter(getSftpSessionFactory())
.temporaryFileSuffix("COPY")
.localDirectory(directory)
.deleteRemoteFiles(false)
.preserveTimestamp(true)
.remoteDirectory("remoteDir"))
.patternFilter("*.txt")), e -> e.poller(Pollers.fixedDelay(500).maxMessagesPerPoll(5)))
.handle("mybean", "myMethod")
.handle(Files.outboundAdapter(new File("success")))
.deleteSourceFiles(true)
.autoCreateDirectory(true))
.get();
更新:这是我的 ThreadPoolExecutor:
@Bean(name = "executor")
public Executor getExecutor()
{
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(4);
executor.setQueueCapacity(20);
executor.initialize();
return executor;
}
Sftp.inboundAdapter()
(SftpInboundFileSynchronizingMessageSource
) returns 无论如何远程文件一个一个。首先,它将它们同步到本地目录,然后才轮询它们以将消息处理为 File
有效负载。
要并行处理它们,只需将 taskExecutor
添加到您的 e.poller()
定义中,所有这些 maxMessagesPerPoll(5)
将分发到不同的线程。
对于我的用例,将 taskExecutor 分配给通道就可以了。 这是我用于轮询文件夹并处理文件的非常简单的应用程序的代码。文件是并行处理的,即使我只为轮询器分配了一个线程。
@Slf4j
@SpringBootApplication
public class SpringIntegrationDemoApplication {
public static void main( String[] args ) {
SpringApplication.run( SpringIntegrationDemoApplication.class, args );
}
@Bean
public IntegrationFlow fileReadingFlow() {
return IntegrationFlows
.from( Files.inboundAdapter( new File( "/tmp/in" ) ).patternFilter( "*.txt" ),
e ->
e.poller( Pollers.fixedDelay( 10000 )
// .taskExecutor( Executors.newFixedThreadPool( 1 ) )
.maxMessagesPerPoll( 100 )
) )
.channel( MessageChannels.executor( Executors.newFixedThreadPool( 100 ) ) )
.handle( "myFileHandler", "handleMessage" )
.get();
}
@Bean
public MessageHandler myFileHandler() {
return message -> {
try {
log.info( "START" + message.getPayload() );
Thread.sleep( 2000 );
log.info( "STOP" + message.getPayload() );
}
catch ( InterruptedException e ) {
e.printStackTrace();
}
};
}
}