并行处理时不考虑 Apache Camel Scheduler 延迟

Apache Camel Scheduler delay not respected while parallel processing

Spring Boot (2.3.0.RELEASE) 应用程序上使用 Apache Camel (3.6.0 ),我在并行模式下遇到 scheduler 组件的奇怪行为;特别是,我想创建一个每 n 秒执行给定逻辑的路由。 为此,我编写了以下示例:

@Component
public class TestRoute extends RouteBuilder {
  @Override
  public void configure() {
    from("scheduler:testRoute?delay=2000")
      .log(LoggingLevel.INFO, "Test route begin")
      .setBody(this::generateRandomBody)
      .split(body())
        .parallelProcessing()
        .process(this::consumeElement)
      .end()
      .log(LoggingLevel.INFO, "Test route end");
}

private List<Integer> generateRandomBody(Exchange exchange) {
  return IntStream.range(0, random(20)).boxed().collect(toList());
}

private void consumeElement(Exchange exchange) throws InterruptedException {
  Thread.sleep(random(1000));
}

private int random(int max) {
  return new Random().nextInt(max) + 1;
}

scheduler.delay 参数设置为 2000 我希望 下一次轮询 当前一首的结尾;但是,只有禁用并行处理才能做到这一点。

实际上,这是一个并行处理的输出示例:

09:02:56.859    Test route begin
09:02:58.086    Test route end

09:02:58.868    Test route begin
09:02:59.266    Test route end

09:03:00.870    Test route begin
09:03:01.654    Test route end

09:03:02.871    Test route begin
09:03:04.028    Test route end

09:03:04.873    Test route begin

下面是一个没有并行处理的输出示例:

09:08:01.666    Test route begin
09:08:11.290    Test route end

09:08:13.292    Test route begin
09:08:21.707    Test route end

09:08:23.709    Test route begin
09:08:26.161    Test route end

09:08:28.162    Test route begin
09:08:37.761    Test route end

09:08:39.763    Test route begin

如您所见,在并行处理时,不会考虑延迟,因为下一次轮询时间是根据当前轮询开始计算的;另一方面,如果没有并行处理,下一次轮询时间大约是当前轮询结束后的 2 秒。

这似乎有点奇怪,因为并行处理文档指出:

If enabled then processing each splitted messages occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. Its only processing the sub messages from the splitter which happens concurrently.

我在这里错过了什么?如何并行处理消息并在实际路由完成后触发下一次轮询?

在 Camel 3 中,EIP 已被大修为反应式,因此拆分器允许调度程序线程更快完成,因此可以遵守 2 秒的延迟。

如果你想让scheduled等待整个exchange完成(不管不同线程如何处理它),那么你需要将它配置为同步的。即使是 Camel 2.

 from("scheduler:testRoute?delay=2000&synchronous=true")