IntegrationFlowDefinition.aggregate 不起作用:也许 CorrelationStrategy 失败了?
IntegrationFlowDefinition.aggregate doesn't work: Maybe the CorrelationStrategy is failing?
错误消息:原因:java.lang.IllegalStateException:不允许空相关。也许 CorrelationStrategy 失败了?
我的实现,
@Bean
public IntegrationFlow start() {
return IntegrationFlows
.from("getOrders")
.split()
.publishSubscribeChannel(c -> c.subscribe(s -> s.channel(q -> q.queue(1))
.<Order, Message<?>>transform(p -> MessageBuilder.withPayload(new Item(p.getItems())).setHeader(ORDERID, p.getOrderId()).build())
.split(Item.class, Item::getItems)
.transform() // let's assume, an object created for each item, let's say ItemProperty to the object.
// Transform returns message; MessageBuilder.withPayload(createItemProperty(getItemName, getItemId)).build();
.aggregate() // so, here aggregate method needs to aggregate ItemProperties.
.handle() // handler gets List<ItemProperty> as an input.
))
.get();
}
两个分离器都工作正常。我还在第二个分离器之后测试了变压器,工作正常。但是,当涉及到聚合时,它就失败了。我在这里缺少什么?
您忽略了一个事实,即 transformer
是那种按原样处理整个消息的端点类型。如果您自己创建一条消息,它不会修改它。
因此,对于 MessageBuilder.withPayload(createItemProperty(getItemName, getItemId)).build();
,您只是在拆分器之后错过了重要的序列细节 headers。因此,之后的聚合器不知道如何处理您的消息,因为您为默认关联策略配置了它,但您没有在消息中提供相应的 headers。
从技术上讲,我看不出有什么理由在那里手动创建消息:简单的 return createItemProperty(getItemName, getItemId);
对您来说应该足够了。该框架将代表您创建消息,并复制相应的请求消息 headers。
如果您真的仍然认为需要在该转换中自己创建一条消息,那么您需要考虑 copyHeaders()
在请求消息中 MessageBuilder
以携带所需的序列详细信息 headers.
错误消息:原因:java.lang.IllegalStateException:不允许空相关。也许 CorrelationStrategy 失败了?
我的实现,
@Bean
public IntegrationFlow start() {
return IntegrationFlows
.from("getOrders")
.split()
.publishSubscribeChannel(c -> c.subscribe(s -> s.channel(q -> q.queue(1))
.<Order, Message<?>>transform(p -> MessageBuilder.withPayload(new Item(p.getItems())).setHeader(ORDERID, p.getOrderId()).build())
.split(Item.class, Item::getItems)
.transform() // let's assume, an object created for each item, let's say ItemProperty to the object.
// Transform returns message; MessageBuilder.withPayload(createItemProperty(getItemName, getItemId)).build();
.aggregate() // so, here aggregate method needs to aggregate ItemProperties.
.handle() // handler gets List<ItemProperty> as an input.
))
.get();
}
两个分离器都工作正常。我还在第二个分离器之后测试了变压器,工作正常。但是,当涉及到聚合时,它就失败了。我在这里缺少什么?
您忽略了一个事实,即 transformer
是那种按原样处理整个消息的端点类型。如果您自己创建一条消息,它不会修改它。
因此,对于 MessageBuilder.withPayload(createItemProperty(getItemName, getItemId)).build();
,您只是在拆分器之后错过了重要的序列细节 headers。因此,之后的聚合器不知道如何处理您的消息,因为您为默认关联策略配置了它,但您没有在消息中提供相应的 headers。
从技术上讲,我看不出有什么理由在那里手动创建消息:简单的 return createItemProperty(getItemName, getItemId);
对您来说应该足够了。该框架将代表您创建消息,并复制相应的请求消息 headers。
如果您真的仍然认为需要在该转换中自己创建一条消息,那么您需要考虑 copyHeaders()
在请求消息中 MessageBuilder
以携带所需的序列详细信息 headers.