Spring 集成 Java DSL - 异步执行多个服务激活器?

Spring Integration Java DSL - execute multiple service activators async?

有一个包含任务列表的作业。 每个任务都有 ID、名称、状态。

我已经为每个任务创建了服务激活器,如下所示:

@ServiceActivator
public Message<Task> execute(Message<Task> message){
    //do stuff
}

我已经为 Job 创建了一个网关 在集成流程中,从网关开始:

@Bean
    public IntegrationFlow startJob() {
        return f -> f
                .handle("jobService", "execute")
                .channel("TaskRoutingChannel");
    }

@Bean
    public IntegrationFlow startJobTask() {
        return IntegrationFlows.from("TaskRoutingChannel")
                .handle("jobService", "executeTasks")
                .route("headers['Destination-Channel']")
                .get();
    }

@Bean
    public IntegrationFlow TaskFlow() {
        return IntegrationFlows.from("testTaskChannel")
                .handle("aTaskService", "execute")
                .channel("TaskRoutingChannel")
                .get();
    }

@Bean
    public IntegrationFlow TaskFlow2() {
        return IntegrationFlows.from("test2TaskChannel")
                .handle("bTaskService", "execute")
                .channel("TaskRoutingChannel")
                .get();
    }

我已经得到要按顺序执行的任务,使用上面的路由器。

但是,我需要启动作业,并行执行所有任务。 我不知道该怎么做。我尝试在服务激活器方法上使用 @Async 并将其设为 return void。但在那种情况下,我如何将它链接回路由通道并让它开始下一个任务? 请帮忙。谢谢。

编辑:

我使用 RecepientListRouter 和 ExecutorChannel 来获得并行执行:

@Bean
public IntegrationFlow startJobTask() {
    return IntegrationFlows.from("TaskRoutingChannel")
            .handle("jobService", "executeTasks")
            .routeToRecipients(r -> r
                .recipient("testTaskChannel")
                .recipient("test2TaskChannel"))
            .get();
}

@Bean ExecutorChannel testTaskChannel(){
    return new ExecutorChannel(this.getAsyncExecutor());
}

@Bean ExecutorChannel test2TaskChannel(){
    return new ExecutorChannel(this.getAsyncExecutor());
}
@Bean
public Executor getAsyncExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(5);
    executor.setMaxPoolSize(10);
    executor.setQueueCapacity(10);
    executor.initialize();
    return executor;
}

现在,3 个问题: 1) 如果这是一个好方法,我如何将有效负载的特定部分发送到每个接收通道。假设有效载荷是一个 List<>,我想将每个列表项发送到每个通道。 2) 如何动态设置收件人通道?从 header 说起?或清单? 3)这真的是一个好方法吗?有更好的方法吗?

提前致谢。

您的 TaskRoutingChannel 必须是 ExecutorChannel 的实例。例如:

return f -> f
            .handle("jobService", "execute")
            .channel(c -> c.executor("TaskRoutingChannel", threadPoolTaskExecutor()));

否则,是的:一切都是通过单个 Thread 调用的,这对您的任务不利。

更新

让我试着一一回答你的问题,尽管听起来每个问题都必须单独作为一个 :-)。

  1. 如果你确实需要向多个服务发送相同的消息,你可以使用routeToRecipients,或者可以返回publishSubscribe。甚至可以根据header做动态路由,比如

  2. 要将消息的一部分发送到每个频道,在您的 .routeToRecipients()

  3. 之前有足够的位置 .split()
  4. 为了回答您的最后一个问题,我需要了解该任务的业务需求。