运行 Spring 每个 Ftp 文件同时集成流程
Run Spring Integration flow concurrently for each Ftp file
我有一个使用 Java DSL 配置的集成流程,它使用 Ftp.inboundChannelAdapter
从 Ftp 服务器拉取文件,然后将其转换为 JobRequest
,然后我有一个 .handle()
触发我的批处理作业的方法,一切都按要求工作,但 运行 中的过程顺序为 FTP 文件夹中的每个文件
我在我的 Transformer Endpoint 中添加了 currentThreadName
它为每个文件打印相同的线程名称
这是我到目前为止尝试过的方法
1.task 执行器 bean
@Bean
public TaskExecutor taskExecutor(){
return new SimpleAsyncTaskExecutor("Integration");
}
2.Integration流量
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) throws IOException {
return IntegrationFlows.from(Ftp.inboundAdapter(myFtpSessionFactory)
.remoteDirectory("/bar")
.localDirectory(localDir.getFile())
,c -> c.poller(Pollers.fixedRate(1000).taskExecutor(taskExecutor()).maxMessagesPerPoll(20)))
.transform(fileMessageToJobRequest(importUserJob(step1())))
.handle(jobLaunchingGateway)
.log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload")
.route(JobExecution.class,j->j.getStatus().isUnsuccessful()?"jobFailedChannel":"jobSuccessfulChannel")
.get();
}
3.I 也在另一个我需要的 SO 线程中读到 ExecutorChannel
所以我配置了一个但是我不知道如何将这个通道注入我的 Ftp.inboundAdapter
,从日志是看到频道总是 integrationFlow.channel#0
我猜是 DirectChannel
@Bean
public MessageChannel inputChannel() {
return new ExecutorChannel(taskExecutor());
}
我不知道我在这里遗漏了什么,或者我可能没有正确理解 Spring 消息系统,因为我对 Spring 和 Spring-集成非常陌生
感谢任何帮助
谢谢
您可以简单地将 ExecutorChannel
注入到流中,框架将应用到 SourcePollingChannelAdapter
。因此,将 inputChannel
定义为一个 bean,您只需执行以下操作:
.channel(inputChannel())
在你的 .transform(fileMessageToJobRequest(importUserJob(step1())))
之前。
在文档中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-channels
另一方面,要根据您的 .taskExecutor(taskExecutor())
配置并行处理您的文件,您只需要 .maxMessagesPerPoll(20)
作为 1
。 AbstractPollingEndpoint
中的逻辑是这样的:
this.taskExecutor.execute(() -> {
int count = 0;
while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
if (pollForMessage() == null) {
break;
}
count++;
}
因此,我们确实有并行任务,但只有当它们达到 maxMessagesPerPoll
当前情况下的 20
时。文档中也有一些解释:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#endpoint-pollingconsumer
The maxMessagesPerPoll property specifies the maximum number of messages to receive within a given poll operation. This means that the poller continues calling receive() without waiting, until either null is returned or the maximum value is reached. For example, if a poller has a ten-second interval trigger and a maxMessagesPerPoll setting of 25, and it is polling a channel that has 100 messages in its queue, all 100 messages can be retrieved within 40 seconds. It grabs 25, waits ten seconds, grabs the next 25, and so on.
我有一个使用 Java DSL 配置的集成流程,它使用 Ftp.inboundChannelAdapter
从 Ftp 服务器拉取文件,然后将其转换为 JobRequest
,然后我有一个 .handle()
触发我的批处理作业的方法,一切都按要求工作,但 运行 中的过程顺序为 FTP 文件夹中的每个文件
我在我的 Transformer Endpoint 中添加了 currentThreadName
它为每个文件打印相同的线程名称
这是我到目前为止尝试过的方法
1.task 执行器 bean
@Bean
public TaskExecutor taskExecutor(){
return new SimpleAsyncTaskExecutor("Integration");
}
2.Integration流量
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) throws IOException {
return IntegrationFlows.from(Ftp.inboundAdapter(myFtpSessionFactory)
.remoteDirectory("/bar")
.localDirectory(localDir.getFile())
,c -> c.poller(Pollers.fixedRate(1000).taskExecutor(taskExecutor()).maxMessagesPerPoll(20)))
.transform(fileMessageToJobRequest(importUserJob(step1())))
.handle(jobLaunchingGateway)
.log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload")
.route(JobExecution.class,j->j.getStatus().isUnsuccessful()?"jobFailedChannel":"jobSuccessfulChannel")
.get();
}
3.I 也在另一个我需要的 SO 线程中读到 ExecutorChannel
所以我配置了一个但是我不知道如何将这个通道注入我的 Ftp.inboundAdapter
,从日志是看到频道总是 integrationFlow.channel#0
我猜是 DirectChannel
@Bean
public MessageChannel inputChannel() {
return new ExecutorChannel(taskExecutor());
}
我不知道我在这里遗漏了什么,或者我可能没有正确理解 Spring 消息系统,因为我对 Spring 和 Spring-集成非常陌生
感谢任何帮助
谢谢
您可以简单地将 ExecutorChannel
注入到流中,框架将应用到 SourcePollingChannelAdapter
。因此,将 inputChannel
定义为一个 bean,您只需执行以下操作:
.channel(inputChannel())
在你的 .transform(fileMessageToJobRequest(importUserJob(step1())))
之前。
在文档中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-channels
另一方面,要根据您的 .taskExecutor(taskExecutor())
配置并行处理您的文件,您只需要 .maxMessagesPerPoll(20)
作为 1
。 AbstractPollingEndpoint
中的逻辑是这样的:
this.taskExecutor.execute(() -> {
int count = 0;
while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
if (pollForMessage() == null) {
break;
}
count++;
}
因此,我们确实有并行任务,但只有当它们达到 maxMessagesPerPoll
当前情况下的 20
时。文档中也有一些解释:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#endpoint-pollingconsumer
The maxMessagesPerPoll property specifies the maximum number of messages to receive within a given poll operation. This means that the poller continues calling receive() without waiting, until either null is returned or the maximum value is reached. For example, if a poller has a ten-second interval trigger and a maxMessagesPerPoll setting of 25, and it is polling a channel that has 100 messages in its queue, all 100 messages can be retrieved within 40 seconds. It grabs 25, waits ten seconds, grabs the next 25, and so on.