Spring 集成 Java DSL:缓冲消息流并将处理程序放在单独的线程中
Spring Integration Java DSL: buffering the message flow and put handler in separate thread
我在 Spring 集成 DSL 中配置了一个流程:
// A custom channel Bean
@Autowired
@Qualifier(INPUT_DATA_CHANNEL)
private PublishSubscribeChannel publishSubscribeChannel;
//A Service that can do database recording
@Autowired
private DatabaseActivator databaseActivator;
@Bean
public IntegrationFlow setupDatabaseFlow() {
return IntegrationFlows.from(publishSubscribeChannel)
.handle((p, h) -> databaseActivator.recordToDatabase(p))
.get();
}
根据日志,一切都在线程 "main" 中按顺序发生。顺便说一句,我同时使用 publishSubscribeChannel 我有兔子 publisher/handler 以同样的方式收听这个频道。
由于数据库操作需要时间,我应该如何正确地进行处理才不会减慢 "main"。最好,主线程必须尽快解除阻塞,并且处理应该在工作线程中继续。我说得对吗?
我可以在 Flow 中引入一个缓冲区,它会收集来自 publishSubscribeChannel 的突发消息吗?
此外,我更喜欢其他线程(池)来处理实际发送,以便从执行流程的主线程中移除负载。我很清楚 Spring 中的 ThreadPoolTaskExecutor 都有缓冲区和线程池。这是使用它的好方法吗?如何以 Java DSL 方式使用 ThreadPoolTaskExecutor?
有负责人:
/**
* Create a PublishSubscribeChannel that will use an {@link Executor}
* to invoke the handlers. If this is null, each invocation will occur in
* the message sender's thread.
*
* @param executor The executor.
*/
public PublishSubscribeChannel(Executor executor) {
使用 Java DSL,您可以像这样声明它:
@Bean
PublishSubscribeChannel publishSubscribeChannel(Executor executor) {
return Channels.publishSubscribe(executor).get();
}
我在 Spring 集成 DSL 中配置了一个流程:
// A custom channel Bean
@Autowired
@Qualifier(INPUT_DATA_CHANNEL)
private PublishSubscribeChannel publishSubscribeChannel;
//A Service that can do database recording
@Autowired
private DatabaseActivator databaseActivator;
@Bean
public IntegrationFlow setupDatabaseFlow() {
return IntegrationFlows.from(publishSubscribeChannel)
.handle((p, h) -> databaseActivator.recordToDatabase(p))
.get();
}
根据日志,一切都在线程 "main" 中按顺序发生。顺便说一句,我同时使用 publishSubscribeChannel 我有兔子 publisher/handler 以同样的方式收听这个频道。
由于数据库操作需要时间,我应该如何正确地进行处理才不会减慢 "main"。最好,主线程必须尽快解除阻塞,并且处理应该在工作线程中继续。我说得对吗?
我可以在 Flow 中引入一个缓冲区,它会收集来自 publishSubscribeChannel 的突发消息吗?
此外,我更喜欢其他线程(池)来处理实际发送,以便从执行流程的主线程中移除负载。我很清楚 Spring 中的 ThreadPoolTaskExecutor 都有缓冲区和线程池。这是使用它的好方法吗?如何以 Java DSL 方式使用 ThreadPoolTaskExecutor?
有负责人:
/**
* Create a PublishSubscribeChannel that will use an {@link Executor}
* to invoke the handlers. If this is null, each invocation will occur in
* the message sender's thread.
*
* @param executor The executor.
*/
public PublishSubscribeChannel(Executor executor) {
使用 Java DSL,您可以像这样声明它:
@Bean
PublishSubscribeChannel publishSubscribeChannel(Executor executor) {
return Channels.publishSubscribe(executor).get();
}