Spring 集成:让所有 headers 参与聚合,而不仅仅是最后一个
Spring Integration: get all headers involved in an aggration and not just the last one
我有一个 Spring 集成流程定义如下:
IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
.id("id")
.acknowledgeMode(MANUAL)
.autoStartup(autoStartup)
.concurrentConsumers(2)
.maxConcurrentConsumers(3)
.messageConverter(messageConverter()))
.aggregate(a -> ...)
.handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
.get();
服务激活器定义如下:
@Component
@Transactional
public class ServiceActivator {
@ServiceActivator
public void myMethod(List<> Collection<MyEvent> events) {
....
}
}
我想做的是更改 myMethod
以同时获取与聚合中每条消息关联的特定 header 的列表。在我的例子中,我想为每条消息获取 AmqpHeaders.CHANNEL
和 AmqpHeaders.DELIVERY_TAG
,这样我就可以对每条发送给 RabbitMQ 的消息执行 ACK 或 NACK(请注意,我故意使用手册IntegrationFlow 中的确认模式,因为我想在 myMethod
执行后发送 ACK's/NACK 的 。
我尝试过这种方法:
@Component
@Transactional
public class ServiceActivator {
@ServiceActivator
public void myMethod(@Header(AmqpHeaders.CHANNEL) List<Channel> channels,
@Header(AmqpHeaders.DELIVERY_TAG) List<Long> tags,
Collection<MyEvent> events) {
....
}
}
但在这里我似乎只得到最后一条消息的 header 值(即 channels
和 tags
的大小始终为 1,即使 events
collection).
我也尝试将 Collection<MyEvent>
更改为 Collection<Message>
(org.springframework.messaging.Message
) 以尝试手动提取 headers 但失败了:
org.springframework.messaging.MessageHandlingException: nested exception is java.lang.ClassCastException: com.x.y.MyEvent cannot be cast to org.springframework.messaging.Message
因为消息已经被 IntegrationFlow 中定义的消息转换器转换。
我怎样才能做到这一点?
您需要在 aggregator
上自定义 outputProcessor
到 return 您想要的消息(在 headers 中包含 channels/delivery 标签列表和负载中的事件)。
我有一个 Spring 集成流程定义如下:
IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
.id("id")
.acknowledgeMode(MANUAL)
.autoStartup(autoStartup)
.concurrentConsumers(2)
.maxConcurrentConsumers(3)
.messageConverter(messageConverter()))
.aggregate(a -> ...)
.handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
.get();
服务激活器定义如下:
@Component
@Transactional
public class ServiceActivator {
@ServiceActivator
public void myMethod(List<> Collection<MyEvent> events) {
....
}
}
我想做的是更改 myMethod
以同时获取与聚合中每条消息关联的特定 header 的列表。在我的例子中,我想为每条消息获取 AmqpHeaders.CHANNEL
和 AmqpHeaders.DELIVERY_TAG
,这样我就可以对每条发送给 RabbitMQ 的消息执行 ACK 或 NACK(请注意,我故意使用手册IntegrationFlow 中的确认模式,因为我想在 myMethod
执行后发送 ACK's/NACK 的 。
我尝试过这种方法:
@Component
@Transactional
public class ServiceActivator {
@ServiceActivator
public void myMethod(@Header(AmqpHeaders.CHANNEL) List<Channel> channels,
@Header(AmqpHeaders.DELIVERY_TAG) List<Long> tags,
Collection<MyEvent> events) {
....
}
}
但在这里我似乎只得到最后一条消息的 header 值(即 channels
和 tags
的大小始终为 1,即使 events
collection).
我也尝试将 Collection<MyEvent>
更改为 Collection<Message>
(org.springframework.messaging.Message
) 以尝试手动提取 headers 但失败了:
org.springframework.messaging.MessageHandlingException: nested exception is java.lang.ClassCastException: com.x.y.MyEvent cannot be cast to org.springframework.messaging.Message
因为消息已经被 IntegrationFlow 中定义的消息转换器转换。
我怎样才能做到这一点?
您需要在 aggregator
上自定义 outputProcessor
到 return 您想要的消息(在 headers 中包含 channels/delivery 标签列表和负载中的事件)。