运行 Spring 每个 Ftp 文件同时集成流程

Run Spring Integration flow concurrently for each Ftp file

我有一个使用 Java DSL 配置的集成流程,它使用 Ftp.inboundChannelAdapter 从 Ftp 服务器拉取文件,然后将其转换为 JobRequest,然后我有一个 .handle() 触发我的批处理作业的方法,一切都按要求工作,但 运行 中的过程顺序为 FTP 文件夹中的每个文件

我在我的 Transformer Endpoint 中添加了 currentThreadName 它为每个文件打印相同的线程名称

这是我到目前为止尝试过的方法

1.task 执行器 bean

 @Bean
    public TaskExecutor taskExecutor(){
        return new SimpleAsyncTaskExecutor("Integration");

    }

2.Integration流量

  @Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) throws IOException {
    return IntegrationFlows.from(Ftp.inboundAdapter(myFtpSessionFactory)
                    .remoteDirectory("/bar")
                    .localDirectory(localDir.getFile())
            ,c -> c.poller(Pollers.fixedRate(1000).taskExecutor(taskExecutor()).maxMessagesPerPoll(20)))
            .transform(fileMessageToJobRequest(importUserJob(step1())))
            .handle(jobLaunchingGateway)
            .log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload")
            .route(JobExecution.class,j->j.getStatus().isUnsuccessful()?"jobFailedChannel":"jobSuccessfulChannel")
            .get();
}

3.I 也在另一个我需要的 SO 线程中读到 ExecutorChannel 所以我配置了一个但是我不知道如何将这个通道注入我的 Ftp.inboundAdapter,从日志是看到频道总是 integrationFlow.channel#0 我猜是 DirectChannel

 @Bean
public MessageChannel inputChannel() {
    return new ExecutorChannel(taskExecutor());
}

我不知道我在这里遗漏了什么,或者我可能没有正确理解 Spring 消息系统,因为我对 Spring 和 Spring-集成非常陌生

感谢任何帮助

谢谢

您可以简单地将 ExecutorChannel 注入到流中,框架将应用到 SourcePollingChannelAdapter。因此,将 inputChannel 定义为一个 bean,您只需执行以下操作:

.channel(inputChannel())

在你的 .transform(fileMessageToJobRequest(importUserJob(step1()))) 之前。 在文档中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-channels

另一方面,要根据您的 .taskExecutor(taskExecutor()) 配置并行处理您的文件,您只需要 .maxMessagesPerPoll(20) 作为 1AbstractPollingEndpoint中的逻辑是这样的:

this.taskExecutor.execute(() -> {
                int count = 0;
                while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
                    if (pollForMessage() == null) {
                        break;
                    }
                    count++;
                }

因此,我们确实有并行任务,但只有当它们达到 maxMessagesPerPoll 当前情况下的 20 时。文档中也有一些解释:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#endpoint-pollingconsumer

The maxMessagesPerPoll property specifies the maximum number of messages to receive within a given poll operation. This means that the poller continues calling receive() without waiting, until either null is returned or the maximum value is reached. For example, if a poller has a ten-second interval trigger and a maxMessagesPerPoll setting of 25, and it is polling a channel that has 100 messages in its queue, all 100 messages can be retrieved within 40 seconds. It grabs 25, waits ten seconds, grabs the next 25, and so on.