IntegrationFlowDefinition.aggregate 不起作用:也许 CorrelationStrategy 失败了?

IntegrationFlowDefinition.aggregate doesn't work: Maybe the CorrelationStrategy is failing?

错误消息:原因:java.lang.IllegalStateException:不允许空相关。也许 CorrelationStrategy 失败了?

我的实现,

    @Bean
    public IntegrationFlow start() {
        return IntegrationFlows
                .from("getOrders")
                .split()
                .publishSubscribeChannel(c -> c.subscribe(s -> s.channel(q -> q.queue(1))
                        .<Order, Message<?>>transform(p -> MessageBuilder.withPayload(new Item(p.getItems())).setHeader(ORDERID, p.getOrderId()).build())
                        .split(Item.class, Item::getItems)
                        .transform() // let's assume, an object created for each item, let's say ItemProperty to the object.
                                     //  Transform returns message; MessageBuilder.withPayload(createItemProperty(getItemName, getItemId)).build();
                        .aggregate() // so, here aggregate method needs to aggregate ItemProperties.
                        .handle() // handler gets List<ItemProperty> as an input.
      ))
      .get();
    }

两个分离器都工作正常。我还在第二个分离器之后测试了变压器,工作正常。但是,当涉及到聚合时,它就失败了。我在这里缺少什么?

您忽略了一个事实,即 transformer 是那种按原样处理整个消息的端点类型。如果您自己创建一条消息,它不会修改它。 因此,对于 MessageBuilder.withPayload(createItemProperty(getItemName, getItemId)).build();,您只是在拆分器之后错过了重要的序列细节 headers。因此,之后的聚合器不知道如何处理您的消息,因为您为默认关联策略配置了它,但您没有在消息中提供相应的 headers。

从技术上讲,我看不出有什么理由在那里手动创建消息:简单的 return createItemProperty(getItemName, getItemId); 对您来说应该足够了。该框架将代表您创建消息,并复制相应的请求消息 headers。

如果您真的仍然认为需要在该转换中自己创建一条消息,那么您需要考虑 copyHeaders() 在请求消息中 MessageBuilder 以携带所需的序列详细信息 headers.