需要在 Spring 集成中并行处理多个文件

Need to process multiple files in parallel in Spring Integration

我有一个 SFTP 目录和读取文件并将文件发送到 ServiceActivator.At 我需要使用处理程序并行处理它们的任何点。

这是我的 SPring 集成 java DSL 流程。

IntegrationFlows.from(Sftp.inboundAdapter(getSftpSessionFactory())
                        .temporaryFileSuffix("COPY")
                        .localDirectory(directory)
                        .deleteRemoteFiles(false)
                        .preserveTimestamp(true)
                        .remoteDirectory("remoteDir"))
                        .patternFilter("*.txt")), e -> e.poller(Pollers.fixedDelay(500).maxMessagesPerPoll(5)))
                        .handle("mybean", "myMethod")
                        .handle(Files.outboundAdapter(new File("success")))         
                        .deleteSourceFiles(true)
                        .autoCreateDirectory(true))
                        .get();

更新:这是我的 ThreadPoolExecutor:

@Bean(name = "executor")
public Executor getExecutor()
{
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(4);
    executor.setMaxPoolSize(4);
    executor.setQueueCapacity(20);          
    executor.initialize();
    return executor;
}

Sftp.inboundAdapter() (SftpInboundFileSynchronizingMessageSource) returns 无论如何远程文件一个一个。首先,它将它们同步到本地目录,然后才轮询它们以将消息处理为 File 有效负载。

要并行处理它们,只需将 taskExecutor 添加到您的 e.poller() 定义中,所有这些 maxMessagesPerPoll(5) 将分发到不同的线程。

对于我的用例,将 taskExecutor 分配给通道就可以了。 这是我用于轮询文件夹并处理文件的非常简单的应用程序的代码。文件是并行处理的,即使我只为轮询器分配了一个线程。

@Slf4j
@SpringBootApplication
public class SpringIntegrationDemoApplication {

  public static void main( String[] args ) {
    SpringApplication.run( SpringIntegrationDemoApplication.class, args );
  }

  @Bean
  public IntegrationFlow fileReadingFlow() {
    return IntegrationFlows
        .from( Files.inboundAdapter( new File( "/tmp/in" ) ).patternFilter( "*.txt" ),
               e ->
                   e.poller( Pollers.fixedDelay( 10000 )
//                               .taskExecutor( Executors.newFixedThreadPool( 1 ) )
                                 .maxMessagesPerPoll( 100 )
                   ) )
        .channel( MessageChannels.executor( Executors.newFixedThreadPool( 100 ) ) )
        .handle( "myFileHandler", "handleMessage" )
        .get();
  }

  @Bean
  public MessageHandler myFileHandler() {
    return message -> {
      try {
        log.info( "START" + message.getPayload() );
        Thread.sleep( 2000 );
        log.info( "STOP" + message.getPayload() );
      }
      catch ( InterruptedException e ) {
        e.printStackTrace();
      }
    };
  }
}