并行处理时不考虑 Apache Camel Scheduler 延迟
Apache Camel Scheduler delay not respected while parallel processing
在 Spring Boot (2.3.0.RELEASE) 应用程序上使用 Apache Camel (3.6.0 ),我在并行模式下遇到 scheduler 组件的奇怪行为;特别是,我想创建一个每 n 秒执行给定逻辑的路由。
为此,我编写了以下示例:
@Component
public class TestRoute extends RouteBuilder {
@Override
public void configure() {
from("scheduler:testRoute?delay=2000")
.log(LoggingLevel.INFO, "Test route begin")
.setBody(this::generateRandomBody)
.split(body())
.parallelProcessing()
.process(this::consumeElement)
.end()
.log(LoggingLevel.INFO, "Test route end");
}
private List<Integer> generateRandomBody(Exchange exchange) {
return IntStream.range(0, random(20)).boxed().collect(toList());
}
private void consumeElement(Exchange exchange) throws InterruptedException {
Thread.sleep(random(1000));
}
private int random(int max) {
return new Random().nextInt(max) + 1;
}
将 scheduler.delay 参数设置为 2000 我希望 下一次轮询 在 当前一首的结尾;但是,只有禁用并行处理才能做到这一点。
实际上,这是一个并行处理的输出示例:
09:02:56.859 Test route begin
09:02:58.086 Test route end
09:02:58.868 Test route begin
09:02:59.266 Test route end
09:03:00.870 Test route begin
09:03:01.654 Test route end
09:03:02.871 Test route begin
09:03:04.028 Test route end
09:03:04.873 Test route begin
下面是一个没有并行处理的输出示例:
09:08:01.666 Test route begin
09:08:11.290 Test route end
09:08:13.292 Test route begin
09:08:21.707 Test route end
09:08:23.709 Test route begin
09:08:26.161 Test route end
09:08:28.162 Test route begin
09:08:37.761 Test route end
09:08:39.763 Test route begin
如您所见,在并行处理时,不会考虑延迟,因为下一次轮询时间是根据当前轮询开始计算的;另一方面,如果没有并行处理,下一次轮询时间大约是当前轮询结束后的 2 秒。
这似乎有点奇怪,因为并行处理文档指出:
If enabled then processing each splitted messages occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. Its only processing the sub messages from the splitter which happens concurrently.
我在这里错过了什么?如何并行处理消息并在实际路由完成后触发下一次轮询?
在 Camel 3 中,EIP 已被大修为反应式,因此拆分器允许调度程序线程更快完成,因此可以遵守 2 秒的延迟。
如果你想让scheduled等待整个exchange完成(不管不同线程如何处理它),那么你需要将它配置为同步的。即使是 Camel 2.
from("scheduler:testRoute?delay=2000&synchronous=true")
在 Spring Boot (2.3.0.RELEASE) 应用程序上使用 Apache Camel (3.6.0 ),我在并行模式下遇到 scheduler 组件的奇怪行为;特别是,我想创建一个每 n 秒执行给定逻辑的路由。 为此,我编写了以下示例:
@Component
public class TestRoute extends RouteBuilder {
@Override
public void configure() {
from("scheduler:testRoute?delay=2000")
.log(LoggingLevel.INFO, "Test route begin")
.setBody(this::generateRandomBody)
.split(body())
.parallelProcessing()
.process(this::consumeElement)
.end()
.log(LoggingLevel.INFO, "Test route end");
}
private List<Integer> generateRandomBody(Exchange exchange) {
return IntStream.range(0, random(20)).boxed().collect(toList());
}
private void consumeElement(Exchange exchange) throws InterruptedException {
Thread.sleep(random(1000));
}
private int random(int max) {
return new Random().nextInt(max) + 1;
}
将 scheduler.delay 参数设置为 2000 我希望 下一次轮询 在 当前一首的结尾;但是,只有禁用并行处理才能做到这一点。
实际上,这是一个并行处理的输出示例:
09:02:56.859 Test route begin
09:02:58.086 Test route end
09:02:58.868 Test route begin
09:02:59.266 Test route end
09:03:00.870 Test route begin
09:03:01.654 Test route end
09:03:02.871 Test route begin
09:03:04.028 Test route end
09:03:04.873 Test route begin
下面是一个没有并行处理的输出示例:
09:08:01.666 Test route begin
09:08:11.290 Test route end
09:08:13.292 Test route begin
09:08:21.707 Test route end
09:08:23.709 Test route begin
09:08:26.161 Test route end
09:08:28.162 Test route begin
09:08:37.761 Test route end
09:08:39.763 Test route begin
如您所见,在并行处理时,不会考虑延迟,因为下一次轮询时间是根据当前轮询开始计算的;另一方面,如果没有并行处理,下一次轮询时间大约是当前轮询结束后的 2 秒。
这似乎有点奇怪,因为并行处理文档指出:
If enabled then processing each splitted messages occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. Its only processing the sub messages from the splitter which happens concurrently.
我在这里错过了什么?如何并行处理消息并在实际路由完成后触发下一次轮询?
在 Camel 3 中,EIP 已被大修为反应式,因此拆分器允许调度程序线程更快完成,因此可以遵守 2 秒的延迟。
如果你想让scheduled等待整个exchange完成(不管不同线程如何处理它),那么你需要将它配置为同步的。即使是 Camel 2.
from("scheduler:testRoute?delay=2000&synchronous=true")