Camel 2.11 批量聚合如何与单独的路由一起使用?
How Camel 2.11 batch aggregation works with separate route?
首先有一个类似的未回答的问题Joining routes into single aggregator
我们有一些消费者路由(ftp、文件、smb)从远程系统读取文件。
简化了直接路由测试,但与批量消费者的行为相似:
from("direct:"+routeId).id(routeId)
.setProperty(AGGREGATION_PROPERTY, constant(routeId))
.log(String.format("Sending (${body}) to %s", "direct:start1"))
.to("direct:aggregate");
转换后,一次投票的所有结果都在单独的路径中按批次汇总:
from("direct:aggregate")
.aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregatingStrategy())
.completionFromBatchConsumer()
.to("log:result", "mock:result");
如果每个消费者分开运行,一切正常。但如果多个消费者并行运行,聚合将拆分轮询。例如,如果文件消费者轮询 500 条消息并且第二个路由开始从 ftp 读取 6 个文件,则期望我们得到 2 个聚合,1 个包含来自文件的 500 条消息,1 个包含来自 ftp 的 6 条消息。
测试用例:
public void testAggregateByProperty() throws Exception {
MockEndpoint result = getMockEndpoint("mock:result");
result.expectedBodiesReceived("A+A+A", "B+B", "A", "Z");
template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 1);
template.sendBodyAndProperty("direct:Z", "Z", Exchange.BATCH_SIZE, 7);
assertMockEndpointsSatisfied();
}
结果是:"A+A"、"B"、"A"、"B"、"A" 而不是预期的 "A+A+A"、"B+B"、"A"、"Z"。
问题:
- 我们关于聚合的假设是错误的吗?
- 我们如何实现预期的行为?
- 如果我们设置 completionTimeout,它会从第一次交换开始超时 - 如果还有新的交换则独立?
你几乎可以正常工作了。这是您需要的更改(稍后我会解释)。
from("direct:aggregate").id("aggregate")
.aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregationStrategy())
.completionSize(property(Exchange.BATCH_SIZE))
.to("log:result", "mock:result")
结果将是:
Exchange received, body: A+A+A
Exchange received, body: B+B
Exchange received, body: A
注意:您不会收到 "Z"
的结果,因为批量大小为 7
。
解释一下 - 如您所读,Aggregator 是一个多功能的 camel 组件,正确定义的关键是:
- 聚合表达式
- 完成规则
现在,在您的情况下,您正在聚合 属性 AGGREGATION_PROPERTY
,这将是 A
、B
或 Z
。此外,您还指定了批量大小。
但是您没有在路线中表达 completionSize()
。相反,您使用的是 completionFromBatchConsumer
- 它做了一些不同的事情(代码声明它寻找 Exchange#BATCH_COMPLETE
属性),因此结果很奇怪。
无论如何,.completionSize(Exchange.BATCH_SIZE)
会让你的测试运行如愿。
祝你好运。
首先有一个类似的未回答的问题Joining routes into single aggregator
我们有一些消费者路由(ftp、文件、smb)从远程系统读取文件。 简化了直接路由测试,但与批量消费者的行为相似:
from("direct:"+routeId).id(routeId)
.setProperty(AGGREGATION_PROPERTY, constant(routeId))
.log(String.format("Sending (${body}) to %s", "direct:start1"))
.to("direct:aggregate");
转换后,一次投票的所有结果都在单独的路径中按批次汇总:
from("direct:aggregate")
.aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregatingStrategy())
.completionFromBatchConsumer()
.to("log:result", "mock:result");
如果每个消费者分开运行,一切正常。但如果多个消费者并行运行,聚合将拆分轮询。例如,如果文件消费者轮询 500 条消息并且第二个路由开始从 ftp 读取 6 个文件,则期望我们得到 2 个聚合,1 个包含来自文件的 500 条消息,1 个包含来自 ftp 的 6 条消息。
测试用例:
public void testAggregateByProperty() throws Exception {
MockEndpoint result = getMockEndpoint("mock:result");
result.expectedBodiesReceived("A+A+A", "B+B", "A", "Z");
template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 1);
template.sendBodyAndProperty("direct:Z", "Z", Exchange.BATCH_SIZE, 7);
assertMockEndpointsSatisfied();
}
结果是:"A+A"、"B"、"A"、"B"、"A" 而不是预期的 "A+A+A"、"B+B"、"A"、"Z"。 问题:
- 我们关于聚合的假设是错误的吗?
- 我们如何实现预期的行为?
- 如果我们设置 completionTimeout,它会从第一次交换开始超时 - 如果还有新的交换则独立?
你几乎可以正常工作了。这是您需要的更改(稍后我会解释)。
from("direct:aggregate").id("aggregate")
.aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregationStrategy())
.completionSize(property(Exchange.BATCH_SIZE))
.to("log:result", "mock:result")
结果将是:
Exchange received, body: A+A+A
Exchange received, body: B+B
Exchange received, body: A
注意:您不会收到 "Z"
的结果,因为批量大小为 7
。
解释一下 - 如您所读,Aggregator 是一个多功能的 camel 组件,正确定义的关键是:
- 聚合表达式
- 完成规则
现在,在您的情况下,您正在聚合 属性 AGGREGATION_PROPERTY
,这将是 A
、B
或 Z
。此外,您还指定了批量大小。
但是您没有在路线中表达 completionSize()
。相反,您使用的是 completionFromBatchConsumer
- 它做了一些不同的事情(代码声明它寻找 Exchange#BATCH_COMPLETE
属性),因此结果很奇怪。
无论如何,.completionSize(Exchange.BATCH_SIZE)
会让你的测试运行如愿。
祝你好运。