Spring 集成 Java DSL:缓冲消息流并将处理程序放在单独的线程中

Spring Integration Java DSL: buffering the message flow and put handler in separate thread

我在 Spring 集成 DSL 中配置了一个流程:

// A custom channel Bean 
@Autowired
@Qualifier(INPUT_DATA_CHANNEL)
private PublishSubscribeChannel publishSubscribeChannel;

//A Service that can do database recording
@Autowired
private DatabaseActivator databaseActivator;

@Bean
public IntegrationFlow setupDatabaseFlow() {

    return IntegrationFlows.from(publishSubscribeChannel)
            .handle((p, h) -> databaseActivator.recordToDatabase(p))
            .get();
}

根据日志,一切都在线程 "main" 中按顺序发生。顺便说一句,我同时使用 publishSubscribeChannel 我有兔子 publisher/handler 以同样的方式收听这个频道。

由于数据库操作需要时间,我应该如何正确地进行处理才不会减慢 "main"。最好,主线程必须尽快解除阻塞,并且处理应该在工作线程中继续。我说得对吗?

我可以在 Flow 中引入一个缓冲区,它会收集来自 publishSubscribeChannel 的突发消息吗?

此外,我更喜欢其他线程(池)来处理实际发送,以便从执行流程的主线程中移除负载。我很清楚 Spring 中的 ThreadPoolTask​​Executor 都有缓冲区和线程池。这是使用它的好方法吗?如何以 Java DSL 方式使用 ThreadPoolTask​​Executor?

有负责人:

/**
 * Create a PublishSubscribeChannel that will use an {@link Executor}
 * to invoke the handlers. If this is null, each invocation will occur in
 * the message sender's thread.
 *
 * @param executor The executor.
 */
public PublishSubscribeChannel(Executor executor) {

http://docs.spring.io/spring-integration/reference/html/messaging-channels-section.html#channel-configuration-pubsubchannel

使用 Java DSL,您可以像这样声明它:

@Bean
PublishSubscribeChannel publishSubscribeChannel(Executor executor) {
    return Channels.publishSubscribe(executor).get();
}