Spring 集成 Java DSL - 异步执行多个服务激活器?
Spring Integration Java DSL - execute multiple service activators async?
有一个包含任务列表的作业。
每个任务都有 ID、名称、状态。
我已经为每个任务创建了服务激活器,如下所示:
@ServiceActivator
public Message<Task> execute(Message<Task> message){
//do stuff
}
我已经为 Job 创建了一个网关
在集成流程中,从网关开始:
@Bean
public IntegrationFlow startJob() {
return f -> f
.handle("jobService", "execute")
.channel("TaskRoutingChannel");
}
@Bean
public IntegrationFlow startJobTask() {
return IntegrationFlows.from("TaskRoutingChannel")
.handle("jobService", "executeTasks")
.route("headers['Destination-Channel']")
.get();
}
@Bean
public IntegrationFlow TaskFlow() {
return IntegrationFlows.from("testTaskChannel")
.handle("aTaskService", "execute")
.channel("TaskRoutingChannel")
.get();
}
@Bean
public IntegrationFlow TaskFlow2() {
return IntegrationFlows.from("test2TaskChannel")
.handle("bTaskService", "execute")
.channel("TaskRoutingChannel")
.get();
}
我已经得到要按顺序执行的任务,使用上面的路由器。
但是,我需要启动作业,并行执行所有任务。
我不知道该怎么做。我尝试在服务激活器方法上使用 @Async
并将其设为 return void
。但在那种情况下,我如何将它链接回路由通道并让它开始下一个任务?
请帮忙。谢谢。
编辑:
我使用 RecepientListRouter 和 ExecutorChannel 来获得并行执行:
@Bean
public IntegrationFlow startJobTask() {
return IntegrationFlows.from("TaskRoutingChannel")
.handle("jobService", "executeTasks")
.routeToRecipients(r -> r
.recipient("testTaskChannel")
.recipient("test2TaskChannel"))
.get();
}
@Bean ExecutorChannel testTaskChannel(){
return new ExecutorChannel(this.getAsyncExecutor());
}
@Bean ExecutorChannel test2TaskChannel(){
return new ExecutorChannel(this.getAsyncExecutor());
}
@Bean
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(10);
executor.initialize();
return executor;
}
现在,3 个问题:
1) 如果这是一个好方法,我如何将有效负载的特定部分发送到每个接收通道。假设有效载荷是一个 List<>,我想将每个列表项发送到每个通道。
2) 如何动态设置收件人通道?从 header 说起?或清单?
3)这真的是一个好方法吗?有更好的方法吗?
提前致谢。
您的 TaskRoutingChannel
必须是 ExecutorChannel
的实例。例如:
return f -> f
.handle("jobService", "execute")
.channel(c -> c.executor("TaskRoutingChannel", threadPoolTaskExecutor()));
否则,是的:一切都是通过单个 Thread
调用的,这对您的任务不利。
更新
让我试着一一回答你的问题,尽管听起来每个问题都必须单独作为一个 :-)。
如果你确实需要向多个服务发送相同的消息,你可以使用routeToRecipients
,或者可以返回publishSubscribe
。甚至可以根据header
做动态路由,比如
要将消息的一部分发送到每个频道,在您的 .routeToRecipients()
之前有足够的位置 .split()
为了回答您的最后一个问题,我需要了解该任务的业务需求。
有一个包含任务列表的作业。 每个任务都有 ID、名称、状态。
我已经为每个任务创建了服务激活器,如下所示:
@ServiceActivator
public Message<Task> execute(Message<Task> message){
//do stuff
}
我已经为 Job 创建了一个网关 在集成流程中,从网关开始:
@Bean
public IntegrationFlow startJob() {
return f -> f
.handle("jobService", "execute")
.channel("TaskRoutingChannel");
}
@Bean
public IntegrationFlow startJobTask() {
return IntegrationFlows.from("TaskRoutingChannel")
.handle("jobService", "executeTasks")
.route("headers['Destination-Channel']")
.get();
}
@Bean
public IntegrationFlow TaskFlow() {
return IntegrationFlows.from("testTaskChannel")
.handle("aTaskService", "execute")
.channel("TaskRoutingChannel")
.get();
}
@Bean
public IntegrationFlow TaskFlow2() {
return IntegrationFlows.from("test2TaskChannel")
.handle("bTaskService", "execute")
.channel("TaskRoutingChannel")
.get();
}
我已经得到要按顺序执行的任务,使用上面的路由器。
但是,我需要启动作业,并行执行所有任务。
我不知道该怎么做。我尝试在服务激活器方法上使用 @Async
并将其设为 return void
。但在那种情况下,我如何将它链接回路由通道并让它开始下一个任务?
请帮忙。谢谢。
编辑:
我使用 RecepientListRouter 和 ExecutorChannel 来获得并行执行:
@Bean
public IntegrationFlow startJobTask() {
return IntegrationFlows.from("TaskRoutingChannel")
.handle("jobService", "executeTasks")
.routeToRecipients(r -> r
.recipient("testTaskChannel")
.recipient("test2TaskChannel"))
.get();
}
@Bean ExecutorChannel testTaskChannel(){
return new ExecutorChannel(this.getAsyncExecutor());
}
@Bean ExecutorChannel test2TaskChannel(){
return new ExecutorChannel(this.getAsyncExecutor());
}
@Bean
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(10);
executor.initialize();
return executor;
}
现在,3 个问题: 1) 如果这是一个好方法,我如何将有效负载的特定部分发送到每个接收通道。假设有效载荷是一个 List<>,我想将每个列表项发送到每个通道。 2) 如何动态设置收件人通道?从 header 说起?或清单? 3)这真的是一个好方法吗?有更好的方法吗?
提前致谢。
您的 TaskRoutingChannel
必须是 ExecutorChannel
的实例。例如:
return f -> f
.handle("jobService", "execute")
.channel(c -> c.executor("TaskRoutingChannel", threadPoolTaskExecutor()));
否则,是的:一切都是通过单个 Thread
调用的,这对您的任务不利。
更新
让我试着一一回答你的问题,尽管听起来每个问题都必须单独作为一个 :-)。
如果你确实需要向多个服务发送相同的消息,你可以使用
routeToRecipients
,或者可以返回publishSubscribe
。甚至可以根据header
做动态路由,比如要将消息的一部分发送到每个频道,在您的
.routeToRecipients()
之前有足够的位置 为了回答您的最后一个问题,我需要了解该任务的业务需求。
.split()