EIP/Apache Camel - 如何同时处理消息,但按组原子处理?

EIP/Apache Camel - How to process message concurrently, but atomically per group?

我有以下情况:

我开始骆驼路线如下:

public class MyMessage implements Runnable {
    public void run() {
        // omitted here
    }
}

from("netty:tcp://localhost:7777?textline=true&sync=false")
   ... // omitted here: parse message to pojo MyMessage, set header "group-identifier"
   .to(seda:process);

这个 Camel 路由消耗 TCP 流,解析每个传入消息的有效负载并将其转换为 MyMessage pojo,并在与 a 对应的交换上设置 group-identifier header留言...

现在我想消费 seda:process 如下:

我可以在这里应用哪些企业集成模式?我如何将这些概念映射到 Camel?

了解到ActiveMQ有消息组的概念(http://activemq.apache.org/message-groups.html)。这可能提供一种方法来确保同一组的两条消息永远不会同时执行。不过,我不确定仅仅为此引入 ActiveMQ 是否过分。这也可以用 'core' Camel/Java 来实现吗?

在 ActiveMQ 中很容易做到这一点。以下代码片段根据需要模拟执行消息:

  • 属于同一组的消息按顺序执行。
  • 属于不同组的消息并发执行。

这依赖于 http://activemq.apache.org/message-groups.html 中解释的 ActiveMQ 消息组。

final CamelContext context = new DefaultCamelContext();

context.addComponent("activemq", ActiveMQComponent.activeMQComponent("vm://localhost?broker.persistent=false"));
context.addRoutes(new RouteBuilder() {
    @Override
    public void configure() {
        from("activemq:queue:q?concurrentConsumers=5")
                .process(exchange -> {
                    System.out.println(Thread.currentThread() + " - " + exchange.getIn().getBody());
                    Thread.sleep(5000);
                });
    }
});
context.start();

for (int i = 0; i < 1000; ++i) {
    context.createFluentProducerTemplate()
            .withBody("This is a message from group : " + (i % 5))
            .withHeader("JMSXGroupID", "" + (i % 5))
            .to("activemq:queue:q")
            .send();
}

也就是说,我(仍然)想知道这是否可以用纯 EIPs/Camel-core 来完成。