仅聚合具有相同相关键的连续交换
Aggregate only consecutive exchanges with same correlation key
我正在使用 Apache Camel 并获得一个大文件作为输入,我必须逐行处理它。内容已经排序,我必须使用相同的相关键聚合所有连续的行。如果相关键发生变化,则必须完成先前的聚合。如果文件结束,则最后的聚合也已完成。
我有一些限制:
- 因为传入的文件相当大,我们希望以流方式处理它。
- 因为结果是给同步端点的,所以我不想使用超时完成谓词。否则我将失去调节数据源消耗速度的背压,并且交换将累积在 AggregateProcessor 的超时映射和聚合存储库中。
PreCompletionAwareAggregationStrategy 看起来是一个很有前途的解决方案,但事实证明,在下一个文件到达之前,最后一个聚合不会完成。如果我在 preComplete 中使用 CamelSplitComplete 属性,最后的聚合将完成但没有最后的传入交换。相反,最后一次交换将被添加到下一个到达的文件的内容中。
所以目前我找不到一个不太丑陋的解决方案。
好吧,也许一种方法是,因为您的数据已经排序,所以以流方式解析并将具有相同 correlationkey 的每一行添加到某个 hashmap 结构中。一旦遇到新的 correlationkey,您基本上想要 "flush" 哈希映射来创建新消息,然后重新启动相同的进程。看看这里:
http://camel.apache.org/how-do-i-write-a-custom-processor-which-sends-multiple-messages.html
在所描述的场景中,我会将拆分的消息发送到带有聚合器的路由(我们称之为 "AggregationRoute"),其聚合策略实现了 PreCompletionAwareAggregationStrategy(我猜你已经在使用它了)。
然后,当拆分结束时,将 AGGREGATION_COMPLETE_ALL_GROUPS header 设置为 true 并将其发送到 AggregationRoute。本次兑换仅作为完成所有聚合组的信号。
示例:
...
.split(body()).streaming()
.to("direct:aggregationRoute")
.end()
.setHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS,constant(true))
.to("direct:aggregationRoute");
from("direct:aggregationRoute")
.aggregate([your correlation expression]), myAggregationStrategy)
...
另一种方法是使用 AggregateController 通过调用其方法 forceCompletionOfAllGroups() 来结束所有组的聚合:
AggregateController aggregateController = new DefaultAggregateController();
from(...)
...
.split(body()).streaming()
.aggregate([correlation expression], aggregationStrategy).aggregateController(aggregateController)
...
// Do what you need to do with the aggregated exchange
...
.end()
.end()
.bean(aggregateController, "forceCompletionOfAllGroups")
我正在使用 Apache Camel 并获得一个大文件作为输入,我必须逐行处理它。内容已经排序,我必须使用相同的相关键聚合所有连续的行。如果相关键发生变化,则必须完成先前的聚合。如果文件结束,则最后的聚合也已完成。 我有一些限制: - 因为传入的文件相当大,我们希望以流方式处理它。 - 因为结果是给同步端点的,所以我不想使用超时完成谓词。否则我将失去调节数据源消耗速度的背压,并且交换将累积在 AggregateProcessor 的超时映射和聚合存储库中。
PreCompletionAwareAggregationStrategy 看起来是一个很有前途的解决方案,但事实证明,在下一个文件到达之前,最后一个聚合不会完成。如果我在 preComplete 中使用 CamelSplitComplete 属性,最后的聚合将完成但没有最后的传入交换。相反,最后一次交换将被添加到下一个到达的文件的内容中。
所以目前我找不到一个不太丑陋的解决方案。
好吧,也许一种方法是,因为您的数据已经排序,所以以流方式解析并将具有相同 correlationkey 的每一行添加到某个 hashmap 结构中。一旦遇到新的 correlationkey,您基本上想要 "flush" 哈希映射来创建新消息,然后重新启动相同的进程。看看这里: http://camel.apache.org/how-do-i-write-a-custom-processor-which-sends-multiple-messages.html
在所描述的场景中,我会将拆分的消息发送到带有聚合器的路由(我们称之为 "AggregationRoute"),其聚合策略实现了 PreCompletionAwareAggregationStrategy(我猜你已经在使用它了)。 然后,当拆分结束时,将 AGGREGATION_COMPLETE_ALL_GROUPS header 设置为 true 并将其发送到 AggregationRoute。本次兑换仅作为完成所有聚合组的信号。
示例:
...
.split(body()).streaming()
.to("direct:aggregationRoute")
.end()
.setHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS,constant(true))
.to("direct:aggregationRoute");
from("direct:aggregationRoute")
.aggregate([your correlation expression]), myAggregationStrategy)
...
另一种方法是使用 AggregateController 通过调用其方法 forceCompletionOfAllGroups() 来结束所有组的聚合:
AggregateController aggregateController = new DefaultAggregateController();
from(...)
...
.split(body()).streaming()
.aggregate([correlation expression], aggregationStrategy).aggregateController(aggregateController)
...
// Do what you need to do with the aggregated exchange
...
.end()
.end()
.bean(aggregateController, "forceCompletionOfAllGroups")