如何将 GroupedExchangeAggregationStrategy 聚合交换拆分为原始交换?
How can I split a GroupedExchangeAggregationStrategy aggregate exchange into the original exchanges?
在使用 GroupedExchangeAggregationStrategy
聚合交换后,我需要将它们分开(以发出单独的处理时间指标)到原始交换中。
我尝试使用以下内容进行拆分,但生成的拆分交换包装了原始交换并将其放入 Message
正文中。
是否可以将 GroupedExchangeAggregationStrategy
聚合交换拆分为没有包装交换的原始交换?我需要使用原始交换属性,并希望使用 SpEL 表达式来实现。
.aggregate(constant(true), myGroupedExchangeAggregationStrategy)
.completionInterval(1000)
.completeAllOnStop()
.process { /* do stuff */ }
.split(exchangeProperty(Exchange.GROUPED_EXCHANGE))
.to(/* micrometer timer metric using SpEL expression */)
// ^- the resulting split exchange is wrapped in another exchange
如果当前不支持此功能,我将尝试自行找出实现此行为的最佳方式,而无需为此单一功能创建自定义 Splitter
处理器。我希望以某种方式覆盖进行包装的 SplitterIterable
但它似乎不可能。
是的,另一方面,GroupedExchangeAggregationStrategy
does nothing else than create a java.util.List
of all Exchanges. The Splitter EIP 默认将 List 拆分为元素,并将元素放入消息 body 中。因此,您最终得到一个在其 body 中包含一个 Exchange 的 Exchange。
您需要的是 收集列表中所有 body Objects 而不是所有交换的 AggregationStrategy。
您可以尝试使用可通过流畅的 API.
配置的 Camels FlexibleAggregationStrategy
新的 FlexibleAggregationStrategy()
.storeInBody()
.accumulateInCollection(ArrayList.class)
.pick(new SimpleExpression("${body}"));
这应该创建一个 AggregationStrategy 来提取每条消息的 body(您也许可以省略 pick 方法,因为 body 提取是 pick 默认值),将它们收集到一个列表中并存储消息中的聚合 body。
要再次拆分此聚合,一个简单的 split(body())
应该就足够了。
根据评论编辑
是的,你是对的,我的解决方案
的一个副作用是你失去了原始消息的属性和headers,因为它只聚合了消息正文。
您要做的是将交易所列表拆分回原始列表。即 Splitter 不能创建新的 Exchange,而是使用已经存在的 并丢弃聚合包装器 Exchange。
据我在 source code of the Splitter 中看到的,这是 目前不可能的:
Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false);
...
if (part instanceof Message) {
newExchange.setIn((Message) part);
} else {
Message in = newExchange.getIn();
in.setBody(part);
}
根据已接受的答案,它似乎不受本地支持。
此自定义处理器将 展开 拆分交换(即将嵌套交换 Message
和属性复制到根交换)。展开的交换将 几乎 与原始交换相同——它将保留根交换的所有非冲突属性(例如 Splitter
相关属性,如拆分索引等。 )
class ExchangeUnwrapper : Processor {
override fun process(exchange: Exchange) {
val wrappedExchange = exchange.`in`.body as Exchange
ExchangeHelper.copyResultsPreservePattern(exchange, wrappedExchange)
}
}
// Route.kt
from(...)
.aggregate(...)
.process { /* do things with aggregate */ }
.split(exchangeProperty(Exchange.GROUPED_EXCHANGE))
.process(ExchangeUnwrapper())
.process { /* do something with the original exchange */ }
.end()
在使用 GroupedExchangeAggregationStrategy
聚合交换后,我需要将它们分开(以发出单独的处理时间指标)到原始交换中。
我尝试使用以下内容进行拆分,但生成的拆分交换包装了原始交换并将其放入 Message
正文中。
是否可以将 GroupedExchangeAggregationStrategy
聚合交换拆分为没有包装交换的原始交换?我需要使用原始交换属性,并希望使用 SpEL 表达式来实现。
.aggregate(constant(true), myGroupedExchangeAggregationStrategy)
.completionInterval(1000)
.completeAllOnStop()
.process { /* do stuff */ }
.split(exchangeProperty(Exchange.GROUPED_EXCHANGE))
.to(/* micrometer timer metric using SpEL expression */)
// ^- the resulting split exchange is wrapped in another exchange
如果当前不支持此功能,我将尝试自行找出实现此行为的最佳方式,而无需为此单一功能创建自定义 Splitter
处理器。我希望以某种方式覆盖进行包装的 SplitterIterable
但它似乎不可能。
是的,另一方面,GroupedExchangeAggregationStrategy
does nothing else than create a java.util.List
of all Exchanges. The Splitter EIP 默认将 List 拆分为元素,并将元素放入消息 body 中。因此,您最终得到一个在其 body 中包含一个 Exchange 的 Exchange。
您需要的是 收集列表中所有 body Objects 而不是所有交换的 AggregationStrategy。
您可以尝试使用可通过流畅的 API.
配置的 CamelsFlexibleAggregationStrategy
新的 FlexibleAggregationStrategy() .storeInBody() .accumulateInCollection(ArrayList.class) .pick(new SimpleExpression("${body}"));
这应该创建一个 AggregationStrategy 来提取每条消息的 body(您也许可以省略 pick 方法,因为 body 提取是 pick 默认值),将它们收集到一个列表中并存储消息中的聚合 body。
要再次拆分此聚合,一个简单的 split(body())
应该就足够了。
根据评论编辑
是的,你是对的,我的解决方案
的一个副作用是你失去了原始消息的属性和headers,因为它只聚合了消息正文。您要做的是将交易所列表拆分回原始列表。即 Splitter 不能创建新的 Exchange,而是使用已经存在的 并丢弃聚合包装器 Exchange。
据我在 source code of the Splitter 中看到的,这是 目前不可能的:
Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false);
...
if (part instanceof Message) {
newExchange.setIn((Message) part);
} else {
Message in = newExchange.getIn();
in.setBody(part);
}
根据已接受的答案,它似乎不受本地支持。
此自定义处理器将 展开 拆分交换(即将嵌套交换 Message
和属性复制到根交换)。展开的交换将 几乎 与原始交换相同——它将保留根交换的所有非冲突属性(例如 Splitter
相关属性,如拆分索引等。 )
class ExchangeUnwrapper : Processor {
override fun process(exchange: Exchange) {
val wrappedExchange = exchange.`in`.body as Exchange
ExchangeHelper.copyResultsPreservePattern(exchange, wrappedExchange)
}
}
// Route.kt
from(...)
.aggregate(...)
.process { /* do things with aggregate */ }
.split(exchangeProperty(Exchange.GROUPED_EXCHANGE))
.process(ExchangeUnwrapper())
.process { /* do something with the original exchange */ }
.end()