Camel:通过聚合丰富民意调查
Camel: Poll Enrich with Aggregation
来自 camel book,第 'Using pollEnrich to merge additional data with an existing message' 节,表明您可以将旧的 Exchange(来自 quarz)与新的(来自 ftp)合并。
问题是我有一个来自主题(旧 Exchange)的文件,我使用 pollEnrich 从 ftp 服务器获取一个新文件,我也想合并它。我有兴趣设置一些 headers 从 oldExchange 到 newExchange。
我面临的问题是 oldExchange 一直为空。
我已经阅读了 camel 书中的例子,对于聚合器,那里说:"The first message arrives for the first group. == null"。
我不明白,那我的oldExchange呢?主题中的那个。为什么仅在第二次迭代时交换不为空(对于同一组)。
from("myTopic")
.pollEnrich()
.simple("ftp://myUrl&fileName=${in.headers.test}")
.aggregate((Exchange oldExchange, Exchange newExchange) -> {
final String oldHeader = oldExchange.getIn().getHeader("test", String.class);
newExchange.getIn().setHeader("test", oldHeader);
return newExchange;
})
我已阅读:http://camel.465427.n5.nabble.com/Split-and-Aggregate-Old-Exchange-is-null-everytime-in-AggregationStrategy-td5746365.html#a5746405 但我仍然不明白为什么两条消息都属于同一个组。
The first message arrives for the first group. == null. I don't understand ...
这对于 standard aggregation 是正确的,例如,您将多条传入消息聚合为一条消息。在这种情况下,在第一条传入消息上,聚合器仍然是空的,因此oldExchange
(聚合器内容)是null
。您必须等待另一条(第二条)消息才能聚合某些内容。
但是,in your case (enrich) oldExchange
不应该是 null
因为第一条消息,即来自您主题的消息,已经那里。
您是否尝试过在调试器中检查来自主题的消息或在它到达 enricher 之前将其注销 ?只是为了确保它不为空。
测试后添加
这很有趣,我通过单元测试尝试了这个,当我像你一样定义 pollEnrich 时,我得到了相反的结果:我使用的消息由 .from(...)
路由是 oldExchange
和我的 newExchange
始终为空。
但是,如果我定义 pollEnrich "inline",它工作正常
.pollEnrich("URI", Timeout, (AggregationStrategy))
我怀疑如果你分析 DSL 使用这两个定义做了什么,这是可以解释的,但从我的快速测试角度来看,它看起来有点奇怪。
@burki 是的,它是否像你所说的那样在 pollEnrich() 中使用 aggregationStrategy 工作,但我需要简单的,因为我正在动态调用端点并且我不能在 pollEnrich 中执行此操作(或者至少我不这样做)不知道怎么做)。
我是这样解决的:
from("myTopic")
.pollEnrich()
.simple("ftp://myUrl&fileName=${in.headers.test}")
.aggregationStrategy((Exchange oldExchange, Exchange newExchange) -> {
final String oldHeader = oldExchange.getIn().getHeader("test", String.class);
newExchange.getIn().setHeader("test", oldHeader);
return newExchange;
})
因此,我使用的是 .aggregationStrategy 而不是 .aggregate 调用,我的理解是 .aggregate 调用用于标准聚合(如@burki 所述),如果我们想要聚合多条消息和 .aggregationStrategy call 可用于合并 2 条消息(其中一条来自外部服务)。
来自 camel book,第 'Using pollEnrich to merge additional data with an existing message' 节,表明您可以将旧的 Exchange(来自 quarz)与新的(来自 ftp)合并。
问题是我有一个来自主题(旧 Exchange)的文件,我使用 pollEnrich 从 ftp 服务器获取一个新文件,我也想合并它。我有兴趣设置一些 headers 从 oldExchange 到 newExchange。
我面临的问题是 oldExchange 一直为空。 我已经阅读了 camel 书中的例子,对于聚合器,那里说:"The first message arrives for the first group. == null"。 我不明白,那我的oldExchange呢?主题中的那个。为什么仅在第二次迭代时交换不为空(对于同一组)。
from("myTopic")
.pollEnrich()
.simple("ftp://myUrl&fileName=${in.headers.test}")
.aggregate((Exchange oldExchange, Exchange newExchange) -> {
final String oldHeader = oldExchange.getIn().getHeader("test", String.class);
newExchange.getIn().setHeader("test", oldHeader);
return newExchange;
})
我已阅读:http://camel.465427.n5.nabble.com/Split-and-Aggregate-Old-Exchange-is-null-everytime-in-AggregationStrategy-td5746365.html#a5746405 但我仍然不明白为什么两条消息都属于同一个组。
The first message arrives for the first group. == null. I don't understand ...
这对于 standard aggregation 是正确的,例如,您将多条传入消息聚合为一条消息。在这种情况下,在第一条传入消息上,聚合器仍然是空的,因此oldExchange
(聚合器内容)是null
。您必须等待另一条(第二条)消息才能聚合某些内容。
但是,in your case (enrich) oldExchange
不应该是 null
因为第一条消息,即来自您主题的消息,已经那里。
您是否尝试过在调试器中检查来自主题的消息或在它到达 enricher 之前将其注销 ?只是为了确保它不为空。
测试后添加
这很有趣,我通过单元测试尝试了这个,当我像你一样定义 pollEnrich 时,我得到了相反的结果:我使用的消息由 .from(...)
路由是 oldExchange
和我的 newExchange
始终为空。
但是,如果我定义 pollEnrich "inline",它工作正常
.pollEnrich("URI", Timeout, (AggregationStrategy))
我怀疑如果你分析 DSL 使用这两个定义做了什么,这是可以解释的,但从我的快速测试角度来看,它看起来有点奇怪。
@burki 是的,它是否像你所说的那样在 pollEnrich() 中使用 aggregationStrategy 工作,但我需要简单的,因为我正在动态调用端点并且我不能在 pollEnrich 中执行此操作(或者至少我不这样做)不知道怎么做)。
我是这样解决的:
from("myTopic")
.pollEnrich()
.simple("ftp://myUrl&fileName=${in.headers.test}")
.aggregationStrategy((Exchange oldExchange, Exchange newExchange) -> {
final String oldHeader = oldExchange.getIn().getHeader("test", String.class);
newExchange.getIn().setHeader("test", oldHeader);
return newExchange;
})
因此,我使用的是 .aggregationStrategy 而不是 .aggregate 调用,我的理解是 .aggregate 调用用于标准聚合(如@burki 所述),如果我们想要聚合多条消息和 .aggregationStrategy call 可用于合并 2 条消息(其中一条来自外部服务)。