Spring 集成:让所有 headers 参与聚合,而不仅仅是最后一个

Spring Integration: get all headers involved in an aggration and not just the last one

我有一个 Spring 集成流程定义如下:

IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
                    .id("id")
                    .acknowledgeMode(MANUAL)
                    .autoStartup(autoStartup)
                    .concurrentConsumers(2)
                    .maxConcurrentConsumers(3)
                    .messageConverter(messageConverter()))
                    .aggregate(a -> ...)
                    .handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
                    .get();

服务激活器定义如下:

@Component
@Transactional
public class ServiceActivator {

    @ServiceActivator    
    public void myMethod(List<> Collection<MyEvent> events) {
        ....
    }    
}

我想做的是更改 myMethod 以同时获取与聚合中每条消息关联的特定 header 的列表。在我的例子中,我想为每条消息获取 AmqpHeaders.CHANNELAmqpHeaders.DELIVERY_TAG,这样我就可以对每条发送给 RabbitMQ 的消息执行 ACK 或 NACK(请注意,我故意使用手册IntegrationFlow 中的确认模式,因为我想在 myMethod 执行后发送 ACK's/NACK 的

我尝试过这种方法:

@Component
@Transactional
public class ServiceActivator {

    @ServiceActivator    
    public void myMethod(@Header(AmqpHeaders.CHANNEL) List<Channel> channels, 
                         @Header(AmqpHeaders.DELIVERY_TAG) List<Long> tags, 
                         Collection<MyEvent> events) {
        ....
    }    
}

但在这里我似乎只得到最后一条消息的 header 值(即 channelstags 的大小始终为 1,即使 events collection).

我也尝试将 Collection<MyEvent> 更改为 Collection<Message> (org.springframework.messaging.Message) 以尝试手动提取 headers 但失败了:

org.springframework.messaging.MessageHandlingException: nested exception is java.lang.ClassCastException: com.x.y.MyEvent cannot be cast to org.springframework.messaging.Message

因为消息已经被 IntegrationFlow 中定义的消息转换器转换。

我怎样才能做到这一点?

您需要在 aggregator 上自定义 outputProcessor 到 return 您想要的消息(在 headers 中包含 channels/delivery 标签列表和负载中的事件)。