Apache Camel EIP 路由 - 如何停止 split()
Apache Camel EIP route - How to stop split()
我在使用以下路线时遇到了一些问题:
// from("cxf:....")...
from("direct:start").process(startRequestProcessor) // STEP 1
.choice()
.when(body().isNull())
.to("direct:finish")
.otherwise()
.split(body()) // STEP 2
.bean(TypeMapper.class) // STEP 3
.log("Goes to DynamicRouter:: routeByTypeHeader with header: ${headers.type}")
.recipientList().method(Endpoint1DynamicRouter.class, "routeByTypeHeader") // STEP 4
.ignoreInvalidEndpoints();
from("direct:endpoint2") // STEP 6
.log("Goes to DynamicRouter::routeByCollectionHeader with header: ${headers.collection}")
.recipientList().method(Endpoint2DynamicRouter.class, "routeByCollectionHeader")
.ignoreInvalidEndpoints();
from("direct:endpoint1.1") // STEP 5
.process(new DateRangeProcessor())
.to("direct:collections");
from("direct:endpoint1.2") // STEP 5
.process(new SingleProcessor())
.to("direct:collections");
from("direct:endpoint2.2") // STEP 7
.aggregate(header("collection" /** endpoint2.2 */), CollectionAggregationStrategy)
.completionSize(exchangeProperty("endpoint22"))
.process(new QueryBuilderProcessor())
.bean(MyService, "getDbCriteria")
.setHeader("collection", constant("endpoint2.1"))
.to("direct:endpoint2.1").end();
from("direct:endpoint2.1") // STEP 8
.aggregate(header("collection" /** endpoint2.1 */), CollectionAggregationStrategy)
.completionSize(exchangeProperty("CamelSplitSize"))
.to("direct:finish").end();
from("direct:finish")
.process(new QueryBuilderProcessor())
.bean(MyRepository, "findAll")
.log("ResponseData: ${body}").
marshal().json(JsonLibrary.Gson).end();
路线
- 接收 json 字符串并将其转换为 JSONObjects 列表 (HashSet)。
- 将收到的列表拆分为 json objects.
- 根据object内容设置相应的headers
- 根据 headers 将消息路由到 endpoint1.1 或 endpoint1.2
- 将消息转换为 mongodb 条件并发送到端点 2
- Endpoint2 根据另一个 header 将消息路由到 endpoint2.1 或 endpoint2.2。
- Endpoint2.2聚合所有收到的消息,处理得到mongodbCriteria并发送给endpoint2.1(completionSize在第2步计算并保存在属性 "endpoint22" ).
- Enpoint2.1 聚合所有消息 (CamelSplitSize) 将聚合消息转换为 Query object 并将其发送到 Repository 以检索数据。
我可以在调试器中看到有效响应 object,但无论如何我都会收到错误消息:
No message body writer has been found for class java.util.HashSet, ContentType: application/json
问题不在响应 object 中,因为它与其他路由一起使用并且不包含 HashSet。
我的猜测是路由将第 1 步创建的 HashSet 发送到输出...
我的问题是:
- 路由输出有什么问题?
两个recipientList()都尝试转发
发送到无效端点的消息(我必须使用 .ignoreInvalidEndpoints() 来避免异常):
org.apache.camel.NoSuchEndpointException: No endpoint could be found for:
org.springframework.data.mongodb.core.query.Criteria@20f55e70, please
check your classpath contains the needed Camel component jar.
如有任何帮助,我们将不胜感激!
谢谢
我觉得很奇怪,但是.aggregate()函数没有回复exchange。它使用您的聚合策略,但始终回复传入的交换。阅读文档时不清楚,但您必须使用聚合策略和 split() 才能 return 交换。
我在使用以下路线时遇到了一些问题:
// from("cxf:....")...
from("direct:start").process(startRequestProcessor) // STEP 1
.choice()
.when(body().isNull())
.to("direct:finish")
.otherwise()
.split(body()) // STEP 2
.bean(TypeMapper.class) // STEP 3
.log("Goes to DynamicRouter:: routeByTypeHeader with header: ${headers.type}")
.recipientList().method(Endpoint1DynamicRouter.class, "routeByTypeHeader") // STEP 4
.ignoreInvalidEndpoints();
from("direct:endpoint2") // STEP 6
.log("Goes to DynamicRouter::routeByCollectionHeader with header: ${headers.collection}")
.recipientList().method(Endpoint2DynamicRouter.class, "routeByCollectionHeader")
.ignoreInvalidEndpoints();
from("direct:endpoint1.1") // STEP 5
.process(new DateRangeProcessor())
.to("direct:collections");
from("direct:endpoint1.2") // STEP 5
.process(new SingleProcessor())
.to("direct:collections");
from("direct:endpoint2.2") // STEP 7
.aggregate(header("collection" /** endpoint2.2 */), CollectionAggregationStrategy)
.completionSize(exchangeProperty("endpoint22"))
.process(new QueryBuilderProcessor())
.bean(MyService, "getDbCriteria")
.setHeader("collection", constant("endpoint2.1"))
.to("direct:endpoint2.1").end();
from("direct:endpoint2.1") // STEP 8
.aggregate(header("collection" /** endpoint2.1 */), CollectionAggregationStrategy)
.completionSize(exchangeProperty("CamelSplitSize"))
.to("direct:finish").end();
from("direct:finish")
.process(new QueryBuilderProcessor())
.bean(MyRepository, "findAll")
.log("ResponseData: ${body}").
marshal().json(JsonLibrary.Gson).end();
路线
- 接收 json 字符串并将其转换为 JSONObjects 列表 (HashSet)。
- 将收到的列表拆分为 json objects.
- 根据object内容设置相应的headers
- 根据 headers 将消息路由到 endpoint1.1 或 endpoint1.2
- 将消息转换为 mongodb 条件并发送到端点 2
- Endpoint2 根据另一个 header 将消息路由到 endpoint2.1 或 endpoint2.2。
- Endpoint2.2聚合所有收到的消息,处理得到mongodbCriteria并发送给endpoint2.1(completionSize在第2步计算并保存在属性 "endpoint22" ).
- Enpoint2.1 聚合所有消息 (CamelSplitSize) 将聚合消息转换为 Query object 并将其发送到 Repository 以检索数据。
我可以在调试器中看到有效响应 object,但无论如何我都会收到错误消息:
No message body writer has been found for class java.util.HashSet, ContentType: application/json
问题不在响应 object 中,因为它与其他路由一起使用并且不包含 HashSet。
我的猜测是路由将第 1 步创建的 HashSet 发送到输出...
我的问题是:
- 路由输出有什么问题?
两个recipientList()都尝试转发 发送到无效端点的消息(我必须使用 .ignoreInvalidEndpoints() 来避免异常):
org.apache.camel.NoSuchEndpointException: No endpoint could be found for: org.springframework.data.mongodb.core.query.Criteria@20f55e70, please check your classpath contains the needed Camel component jar.
如有任何帮助,我们将不胜感激! 谢谢
我觉得很奇怪,但是.aggregate()函数没有回复exchange。它使用您的聚合策略,但始终回复传入的交换。阅读文档时不清楚,但您必须使用聚合策略和 split() 才能 return 交换。