如何将 GroupedExchangeAggregationStrategy 聚合交换拆分为原始交换?

How can I split a GroupedExchangeAggregationStrategy aggregate exchange into the original exchanges?

在使用 GroupedExchangeAggregationStrategy 聚合交换后,我需要将它们分开(以发出单独的处理时间指标)到原始交换中。

我尝试使用以下内容进行拆分,但生成的拆分交换包装了原始交换并将其放入 Message 正文中。

是否可以将 GroupedExchangeAggregationStrategy 聚合交换拆分为没有包装交换的原始交换?我需要使用原始交换属性,并希望使用 SpEL 表达式来实现。

.aggregate(constant(true), myGroupedExchangeAggregationStrategy)
    .completionInterval(1000)
    .completeAllOnStop()
    .process { /* do stuff */ }
.split(exchangeProperty(Exchange.GROUPED_EXCHANGE))
    .to(/* micrometer timer metric using SpEL expression */)
    // ^- the resulting split exchange is wrapped in another exchange

如果当前不支持此功能,我将尝试自行找出实现此行为的最佳方式,而无需为此单一功能创建自定义 Splitter 处理器。我希望以某种方式覆盖进行包装的 SplitterIterable 但它似乎不可能。

是的,另一方面,GroupedExchangeAggregationStrategy does nothing else than create a java.util.List of all Exchanges. The Splitter EIP 默认将 List 拆分为元素,并将元素放入消息 body 中。因此,您最终得到一个在其 body 中包含一个 Exchange 的 Exchange。

您需要的是 收集列表中所有 body Objects 而不是所有交换的 AggregationStrategy。

您可以尝试使用可通过流畅的 API.

配置的 Camels FlexibleAggregationStrategy

新的 FlexibleAggregationStrategy() .storeInBody() .accumulateInCollection(ArrayList.class) .pick(new SimpleExpression("${body}"));

这应该创建一个 AggregationStrategy 来提取每条消息的 body(您也许可以省略 pick 方法,因为 body 提取是 pick 默认值),将它们收集到一个列表中并存储消息中的聚合 body。

要再次拆分此聚合,一个简单的 split(body()) 应该就足够了。

根据评论编辑

是的,你是对的,我的解决方案的一个副作用是你失去了原始消息的属性和headers,因为它只聚合了消息正文。

您要做的是将交易所列表拆分回原始列表。即 Splitter 不能创建新的 Exchange,而是使用已经存在的 并丢弃聚合包装器 Exchange。

据我在 source code of the Splitter 中看到的,这是 目前不可能的

Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false);
...
if (part instanceof Message) {
    newExchange.setIn((Message) part);
} else {
    Message in = newExchange.getIn();
    in.setBody(part);
}

根据已接受的答案,它似乎不受本地支持。

此自定义处理器将 展开 拆分交换(即将嵌套交换 Message 和属性复制到根交换)。展开的交换将 几乎 与原始交换相同——它将保留根交换的所有非冲突属性(例如 Splitter 相关属性,如拆分索引等。 )

class ExchangeUnwrapper : Processor {

    override fun process(exchange: Exchange) {
        val wrappedExchange = exchange.`in`.body as Exchange
        ExchangeHelper.copyResultsPreservePattern(exchange, wrappedExchange)
    }
}

// Route.kt

from(...)
.aggregate(...)
.process { /* do things with aggregate */ }
.split(exchangeProperty(Exchange.GROUPED_EXCHANGE))
  .process(ExchangeUnwrapper())
  .process { /* do something with the original exchange */ }
.end()