骆驼,如何在直接端点上创建真正的同步

Camel, how to create genuine sync on direct endpoint

骆驼 (3.11 - 3.12.0-SNAPSHOT) Java 16.

在这个例子中,定时器发送了两条消息。当第一条消息尚未完成时,第二条消息开始处理。我希望消息在一条 DIRECT 路由中一条一条地处理。


    @Override
    public void configure() throws Exception {
        from("timer:cTimer?period=1&repeatCount=2")
                .process(e -> {
                    final Integer oldBody = timerMessageId.getAndIncrement();
                    log.info("Send message: {}", oldBody);
                    e.getIn().setHeader(KEY, oldBody);
                    final List<Integer> list = IntStream.range(1, 10).boxed().collect(Collectors.toList());
                    e.getIn().setBody(list);
                })
                .to("direct:direct-order-file-process-route:" + psId);

        from(direct("direct-order-file-process-route:" + psId))
                .routeId("routeId:" + psId)
                .process(exchange -> {
                    log.info("Started process message: {}", exchange.getIn().getHeader(KEY));
                })
                .split()
                .body()
                .aggregate(header(KEY), this::aggregatorExchanges)
                .completionSize(5)
                .completionTimeout(1000L)
                .process(exchange -> {
                    Thread.sleep(100);
                    log.info("DONE: {} / {}", exchange.getIn().getHeader(KEY), exchange.getIn().getBody());
                })
                .end();
    }

    private Exchange aggregatorExchanges(Exchange oldExchange, Exchange newExchange) {
        List<Integer> drinks;
        if (oldExchange == null) {
            drinks = new ArrayList<>();
        } else {
            drinks = (List<Integer>) oldExchange.getIn().getBody();
        }
        drinks.add((Integer) newExchange.getIn().getBody());
        newExchange.getIn().setBody(drinks);
        return newExchange;
    }

日志:

Send message: 1
Started process message: 1
Send message: 2
Started process message: 2
DONE: 1 / [1, 2, 3, 4, 5]
DONE: 2 / [1, 2, 3, 4, 5]
DONE: 1 / [6, 7, 8, 9]
DONE: 2 / [6, 7, 8, 9]

预期日志:

Send message: 1
Started process message: 1
DONE: 1 / [1, 2, 3, 4, 5]
DONE: 1 / [6, 7, 8, 9]

Send message: 2
Started process message: 2
DONE: 2 / [1, 2, 3, 4, 5]
DONE: 2 / [6, 7, 8, 9]

这里的问题是关于“聚合”的。没有聚合“直接”工作正常。 换句话说:direct 和 aggregate 是不同步的(它们 运行 在不同的线程中)。直接不等待聚合完成。

这完全不是同步处理与否的问题。 您的期望只是一条拆分消息,而不是聚合消息

  • 您发送 2 条具有不同密钥的消息,每条消息包含 9 个数字的列表
  • 两条消息都拆分为 9 条单独的消息
  • 您希望原始消息一个接一个地处理,但这违背了聚合器的概念

聚合器的角度来看,有 18 条单独的消息到达。 9 with key 1 and 9 with key 2. 它对这 2 条原始消息一无所知,但通过密钥聚合了这 18 条消息。

您的日志很好地显示了该过程。

  1. 第一条 密钥为 1 的消息已发送。它包含 9 个数字的列表。
  2. 该消息拆分为 9 条消息,每条消息有一个编号。
  3. 九条消息,都具有相同的密钥 1,到达聚合器
  4. 因为聚合器创建了 5 条消息的聚合(completionSize(5)第一个聚合很快完成并生成( DONE: 1 / [1, 2, 3, 4, 5])
  5. 保留key为1的4条消息,因为还需要1条消息才能完成聚合
  6. 同时包含 9 个数字的密钥 2 消息也被拆分,并且 9 个密钥 2 的消息到达聚合器
  7. 开始一个新的聚合因为KEY被用作区分符
  8. 因此第二个聚合很快完成并生成(DONE: 2 / [1, 2, 3, 4, 5])
  9. 同样,对于消息 2,剩余的 4 条消息被保留,因为还需要 1 条消息才能完成聚合
  10. 此时 2 个聚合(键 1 和键 2)每个保存 4 条消息,正在等待更多消息。
  11. 因为没有更多消息到达,completionTimeout 在没有新消息 1 秒后开始
  12. 因此 2 个“不完整”聚合仍然关闭 并生成(DONE: 1 / [6, 7, 8, 9]DONE: 2 / [6, 7, 8, 9]

如果你想得到你的“预期日志”,你根本不需要聚合器因为这只是一个创建 5 块的“自定义消息拆分”每条消息的数量。例如,您可以 implement the Camel Split EIP in a bean 来实现这一点。