Spring 集成 Reactor 配置
Spring Integration Reactor configuration
我是 运行 一个使用 spring 集成处理任务的应用程序。
我想让它同时处理多个任务,但到目前为止任何尝试都失败了。
我的配置是:
ReactorConfiguration.java
@Configuration
@EnableAutoConfiguration
public class ReactorConfiguration {
@Bean
Environment reactorEnv() {
return new Environment();
}
@Bean
Reactor createReactor(Environment env) {
return Reactors.reactor()
.env(env)
.dispatcher(Environment.THREAD_POOL)
.get();
}
}
TaskProcessor.java
@MessagingGateway(reactorEnvironment = "reactorEnv")
public interface TaskProcessor {
@Gateway(requestChannel = "routeTaskByType", replyChannel = "")
Promise<Result> processTask(Task task);
}
IntegrationConfiguration.java(简体)
@Bean
public IntegrationFlow routeFlow() {
return IntegrationFlows.from(MessageChannels.executor("routeTaskByType", Executors.newFixedThreadPool(10)))
.handle(Task.class, (payload, headers) -> {
logger.info("Task submitted!" + payload);
payload.setRunning(true);
//Try-catch
Thread.sleep(999999);
return payload;
})
.route(/*...*/)
.get();
}
我的测试代码可以这样简化:
Task task1 = new Task();
Task task2 = new Task();
Promise<Result> resultPromise1 = taskProcessor.processTask(task1).flush();
Promise<Result> resultPromise2 = taskProcessor.processTask(task2).flush();
while( !task1.isRunning() || !task2.isRunning() ){
logger.info("Task2: {}, Task2: {}", task1, task2);
Thread.sleep(1000);
}
logger.info("Yes! your tasks are running in parallel!");
但不幸的是,最后一行日志永远不会被执行!
有什么想法吗?
非常感谢
好吧,我只是用简单的 Reactor 测试用例重现了它:
@Test
public void testParallelPromises() throws InterruptedException {
Environment environment = new Environment();
final AtomicBoolean first = new AtomicBoolean(true);
for (int i = 0; i < 10; i++) {
final Promise<String> promise = Promises.task(environment, () -> {
if (!first.getAndSet(false)) {
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
return "foo";
}
);
String result = promise.await(10, TimeUnit.SECONDS);
System.out.println(result);
assertNotNull(result);
}
}
(与 Reactor-2.0.6 一起使用)。
问题是因为:
public static <T> Promise<T> task(Environment env, Supplier<T> supplier) {
return task(env, env.getDefaultDispatcher(), supplier);
}
其中 DefaultDispatcher
是 RingBufferDispatcher extends SingleThreadDispatcher
。
由于 @MessagingGateway
基于 request/reply 场景,我们正在等待 RingBufferDispatcher
的线程中的回复。由于您没有 return 在那里回复 (Thread.sleep(999999);
),我们无法接受 RingBuffer
内的下一个活动。
您的 dispatcher(Environment.THREAD_POOL)
在这里没有帮助,因为它不影响 Environment
。您应该考虑使用 reactor.dispatchers.default = threadPoolExecutor
属性。类似于这个文件:https://github.com/reactor/reactor/blob/2.0.x/reactor-net/src/test/resources/META-INF/reactor/reactor-environment.properties#L46.
是的:请升级到最新的 Reactor。
我是 运行 一个使用 spring 集成处理任务的应用程序。
我想让它同时处理多个任务,但到目前为止任何尝试都失败了。
我的配置是:
ReactorConfiguration.java
@Configuration
@EnableAutoConfiguration
public class ReactorConfiguration {
@Bean
Environment reactorEnv() {
return new Environment();
}
@Bean
Reactor createReactor(Environment env) {
return Reactors.reactor()
.env(env)
.dispatcher(Environment.THREAD_POOL)
.get();
}
}
TaskProcessor.java
@MessagingGateway(reactorEnvironment = "reactorEnv")
public interface TaskProcessor {
@Gateway(requestChannel = "routeTaskByType", replyChannel = "")
Promise<Result> processTask(Task task);
}
IntegrationConfiguration.java(简体)
@Bean
public IntegrationFlow routeFlow() {
return IntegrationFlows.from(MessageChannels.executor("routeTaskByType", Executors.newFixedThreadPool(10)))
.handle(Task.class, (payload, headers) -> {
logger.info("Task submitted!" + payload);
payload.setRunning(true);
//Try-catch
Thread.sleep(999999);
return payload;
})
.route(/*...*/)
.get();
}
我的测试代码可以这样简化:
Task task1 = new Task();
Task task2 = new Task();
Promise<Result> resultPromise1 = taskProcessor.processTask(task1).flush();
Promise<Result> resultPromise2 = taskProcessor.processTask(task2).flush();
while( !task1.isRunning() || !task2.isRunning() ){
logger.info("Task2: {}, Task2: {}", task1, task2);
Thread.sleep(1000);
}
logger.info("Yes! your tasks are running in parallel!");
但不幸的是,最后一行日志永远不会被执行!
有什么想法吗?
非常感谢
好吧,我只是用简单的 Reactor 测试用例重现了它:
@Test
public void testParallelPromises() throws InterruptedException {
Environment environment = new Environment();
final AtomicBoolean first = new AtomicBoolean(true);
for (int i = 0; i < 10; i++) {
final Promise<String> promise = Promises.task(environment, () -> {
if (!first.getAndSet(false)) {
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
return "foo";
}
);
String result = promise.await(10, TimeUnit.SECONDS);
System.out.println(result);
assertNotNull(result);
}
}
(与 Reactor-2.0.6 一起使用)。
问题是因为:
public static <T> Promise<T> task(Environment env, Supplier<T> supplier) {
return task(env, env.getDefaultDispatcher(), supplier);
}
其中 DefaultDispatcher
是 RingBufferDispatcher extends SingleThreadDispatcher
。
由于 @MessagingGateway
基于 request/reply 场景,我们正在等待 RingBufferDispatcher
的线程中的回复。由于您没有 return 在那里回复 (Thread.sleep(999999);
),我们无法接受 RingBuffer
内的下一个活动。
您的 dispatcher(Environment.THREAD_POOL)
在这里没有帮助,因为它不影响 Environment
。您应该考虑使用 reactor.dispatchers.default = threadPoolExecutor
属性。类似于这个文件:https://github.com/reactor/reactor/blob/2.0.x/reactor-net/src/test/resources/META-INF/reactor/reactor-environment.properties#L46.
是的:请升级到最新的 Reactor。