骆驼,如何在直接端点上创建真正的同步
Camel, how to create genuine sync on direct endpoint
骆驼 (3.11 - 3.12.0-SNAPSHOT) Java 16.
在这个例子中,定时器发送了两条消息。当第一条消息尚未完成时,第二条消息开始处理。我希望消息在一条 DIRECT 路由中一条一条地处理。
@Override
public void configure() throws Exception {
from("timer:cTimer?period=1&repeatCount=2")
.process(e -> {
final Integer oldBody = timerMessageId.getAndIncrement();
log.info("Send message: {}", oldBody);
e.getIn().setHeader(KEY, oldBody);
final List<Integer> list = IntStream.range(1, 10).boxed().collect(Collectors.toList());
e.getIn().setBody(list);
})
.to("direct:direct-order-file-process-route:" + psId);
from(direct("direct-order-file-process-route:" + psId))
.routeId("routeId:" + psId)
.process(exchange -> {
log.info("Started process message: {}", exchange.getIn().getHeader(KEY));
})
.split()
.body()
.aggregate(header(KEY), this::aggregatorExchanges)
.completionSize(5)
.completionTimeout(1000L)
.process(exchange -> {
Thread.sleep(100);
log.info("DONE: {} / {}", exchange.getIn().getHeader(KEY), exchange.getIn().getBody());
})
.end();
}
private Exchange aggregatorExchanges(Exchange oldExchange, Exchange newExchange) {
List<Integer> drinks;
if (oldExchange == null) {
drinks = new ArrayList<>();
} else {
drinks = (List<Integer>) oldExchange.getIn().getBody();
}
drinks.add((Integer) newExchange.getIn().getBody());
newExchange.getIn().setBody(drinks);
return newExchange;
}
日志:
Send message: 1
Started process message: 1
Send message: 2
Started process message: 2
DONE: 1 / [1, 2, 3, 4, 5]
DONE: 2 / [1, 2, 3, 4, 5]
DONE: 1 / [6, 7, 8, 9]
DONE: 2 / [6, 7, 8, 9]
预期日志:
Send message: 1
Started process message: 1
DONE: 1 / [1, 2, 3, 4, 5]
DONE: 1 / [6, 7, 8, 9]
Send message: 2
Started process message: 2
DONE: 2 / [1, 2, 3, 4, 5]
DONE: 2 / [6, 7, 8, 9]
这里的问题是关于“聚合”的。没有聚合“直接”工作正常。
换句话说:direct 和 aggregate 是不同步的(它们 运行 在不同的线程中)。直接不等待聚合完成。
这完全不是同步处理与否的问题。 您的期望只是一条拆分消息,而不是聚合消息。
- 您发送 2 条具有不同密钥的消息,每条消息包含 9 个数字的列表
- 两条消息都拆分为 9 条单独的消息
- 您希望原始消息一个接一个地处理,但这违背了聚合器的概念
从聚合器的角度来看,有 18 条单独的消息到达。 9 with key 1 and 9 with key 2. 它对这 2 条原始消息一无所知,但通过密钥聚合了这 18 条消息。
您的日志很好地显示了该过程。
- 第一条 密钥为 1 的消息已发送。它包含 9 个数字的列表。
- 该消息拆分为 9 条消息,每条消息有一个编号。
- 九条消息,都具有相同的密钥 1,到达聚合器
- 因为聚合器创建了 5 条消息的聚合(
completionSize(5)
),第一个聚合很快完成并生成( DONE: 1 / [1, 2, 3, 4, 5]
)
- 保留key为1的4条消息,因为还需要1条消息才能完成聚合
- 同时包含 9 个数字的密钥 2 消息也被拆分,并且 9 个密钥 2 的消息到达聚合器
- 它开始一个新的聚合因为KEY被用作区分符
- 因此第二个聚合很快完成并生成(
DONE: 2 / [1, 2, 3, 4, 5]
)
- 同样,对于消息 2,剩余的 4 条消息被保留,因为还需要 1 条消息才能完成聚合
- 此时 2 个聚合(键 1 和键 2)每个保存 4 条消息,正在等待更多消息。
- 因为没有更多消息到达,
completionTimeout
在没有新消息 1 秒后开始
- 因此 2 个“不完整”聚合仍然关闭 并生成(
DONE: 1 / [6, 7, 8, 9]
和 DONE: 2 / [6, 7, 8, 9]
)
如果你想得到你的“预期日志”,你根本不需要聚合器因为这只是一个创建 5 块的“自定义消息拆分”每条消息的数量。例如,您可以 implement the Camel Split EIP in a bean 来实现这一点。
骆驼 (3.11 - 3.12.0-SNAPSHOT) Java 16.
在这个例子中,定时器发送了两条消息。当第一条消息尚未完成时,第二条消息开始处理。我希望消息在一条 DIRECT 路由中一条一条地处理。
@Override
public void configure() throws Exception {
from("timer:cTimer?period=1&repeatCount=2")
.process(e -> {
final Integer oldBody = timerMessageId.getAndIncrement();
log.info("Send message: {}", oldBody);
e.getIn().setHeader(KEY, oldBody);
final List<Integer> list = IntStream.range(1, 10).boxed().collect(Collectors.toList());
e.getIn().setBody(list);
})
.to("direct:direct-order-file-process-route:" + psId);
from(direct("direct-order-file-process-route:" + psId))
.routeId("routeId:" + psId)
.process(exchange -> {
log.info("Started process message: {}", exchange.getIn().getHeader(KEY));
})
.split()
.body()
.aggregate(header(KEY), this::aggregatorExchanges)
.completionSize(5)
.completionTimeout(1000L)
.process(exchange -> {
Thread.sleep(100);
log.info("DONE: {} / {}", exchange.getIn().getHeader(KEY), exchange.getIn().getBody());
})
.end();
}
private Exchange aggregatorExchanges(Exchange oldExchange, Exchange newExchange) {
List<Integer> drinks;
if (oldExchange == null) {
drinks = new ArrayList<>();
} else {
drinks = (List<Integer>) oldExchange.getIn().getBody();
}
drinks.add((Integer) newExchange.getIn().getBody());
newExchange.getIn().setBody(drinks);
return newExchange;
}
日志:
Send message: 1
Started process message: 1
Send message: 2
Started process message: 2
DONE: 1 / [1, 2, 3, 4, 5]
DONE: 2 / [1, 2, 3, 4, 5]
DONE: 1 / [6, 7, 8, 9]
DONE: 2 / [6, 7, 8, 9]
预期日志:
Send message: 1
Started process message: 1
DONE: 1 / [1, 2, 3, 4, 5]
DONE: 1 / [6, 7, 8, 9]
Send message: 2
Started process message: 2
DONE: 2 / [1, 2, 3, 4, 5]
DONE: 2 / [6, 7, 8, 9]
这里的问题是关于“聚合”的。没有聚合“直接”工作正常。 换句话说:direct 和 aggregate 是不同步的(它们 运行 在不同的线程中)。直接不等待聚合完成。
这完全不是同步处理与否的问题。 您的期望只是一条拆分消息,而不是聚合消息。
- 您发送 2 条具有不同密钥的消息,每条消息包含 9 个数字的列表
- 两条消息都拆分为 9 条单独的消息
- 您希望原始消息一个接一个地处理,但这违背了聚合器的概念
从聚合器的角度来看,有 18 条单独的消息到达。 9 with key 1 and 9 with key 2. 它对这 2 条原始消息一无所知,但通过密钥聚合了这 18 条消息。
您的日志很好地显示了该过程。
- 第一条 密钥为 1 的消息已发送。它包含 9 个数字的列表。
- 该消息拆分为 9 条消息,每条消息有一个编号。
- 九条消息,都具有相同的密钥 1,到达聚合器
- 因为聚合器创建了 5 条消息的聚合(
completionSize(5)
),第一个聚合很快完成并生成(DONE: 1 / [1, 2, 3, 4, 5]
) - 保留key为1的4条消息,因为还需要1条消息才能完成聚合
- 同时包含 9 个数字的密钥 2 消息也被拆分,并且 9 个密钥 2 的消息到达聚合器
- 它开始一个新的聚合因为KEY被用作区分符
- 因此第二个聚合很快完成并生成(
DONE: 2 / [1, 2, 3, 4, 5]
) - 同样,对于消息 2,剩余的 4 条消息被保留,因为还需要 1 条消息才能完成聚合
- 此时 2 个聚合(键 1 和键 2)每个保存 4 条消息,正在等待更多消息。
- 因为没有更多消息到达,
completionTimeout
在没有新消息 1 秒后开始 - 因此 2 个“不完整”聚合仍然关闭 并生成(
DONE: 1 / [6, 7, 8, 9]
和DONE: 2 / [6, 7, 8, 9]
)
如果你想得到你的“预期日志”,你根本不需要聚合器因为这只是一个创建 5 块的“自定义消息拆分”每条消息的数量。例如,您可以 implement the Camel Split EIP in a bean 来实现这一点。