Spring 集成 HTTP 到 Scatter Gather
Spring Integration HTTP to Scatter Gather
我是 Spring 集成的新手,并试图利用 scatter-gather 的企业模式,但我正在为实现细节和在线可用示例而苦苦挣扎。
简而言之,我的场景是:
- HTTP 请求从用户发送到系统 A。
- 在响应(又名同步)之前,系统 A 向 N 个系统 X 异步发送 N 个消息。
- 系统 A 等待响应。
- 一旦每个请求系统都有响应,系统 A 就会将这些响应整理成一个更大的响应。
- 系统A最终以较大的响应响应用户。
基本上,就原始消费者而言,一个单一的请求会响应一个答案,而不必 'come back later'。然而,该请求实际上是针对掩盖其背后复杂性的外观(可能会影响数百个系统,在 back-end non-performant 处发出同步请求并且不可行)。
到目前为止我有这个实现(擦除细节所以可能不是我正在玩的 1:1 示例,例如我已经制定的 correlationStrategy 没有做我想要的期望):
@Bean
public IntegrationFlow overallRequest(final AmqpTemplate amqpTemplate) {
return IntegrationFlows.from( // HTTP endpoint to user makes requests on
Http.inboundChannelAdapter("/request-overall-document")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(String.class))
.log()
// Arbitrary header to simplify example, realistically would generate a UUID
// and attach to some correlating header that works for systems involved
.enrichHeaders(p -> p.header("someHeader", "someValue"))
.log()
.scatterGather(
recipientListRouterSpec ->
recipientListRouterSpec
.applySequence(true)
.recipientFlow(
flow ->
flow.handle( // Straight pass through of msg received to see in response
Amqp.outboundAdapter(amqpTemplate)
.exchangeName( // RabbitMQ fanout exchange to N queues to N systems
"request-overall-document-exchange"))),
aggregatorSpec ->
aggregatorSpec
// Again for example, arbitrary once two correlated responses
.correlationStrategy(msg -> msg.getHeaders().get("someHeader"))
.releaseStrategy(gm -> gm.size() == 2)
// Simple string concatenation for overall response
.outputProcessor(
msgrp ->
msgrp.getMessages().stream()
.map(msg -> msg.getPayload().toString())
.reduce("Overall response: ", (nexus, txt) -> nexus + "|" + txt))
// Reset group on each response
.expireGroupsUponCompletion(true),
scatterGatherSpec ->
scatterGatherSpec.gatherChannel(
responseChannel())) // The channel to listen for responses to request on
.log()
.get();
}
以此作为响应通道配置:
@Bean
public MessageChannel responseChannel() {
return new QueueChannel();
}
@Bean
public AmqpInboundChannelAdapter responseChannelAdapter(
SimpleMessageListenerContainer listenerContainer,
@Qualifier("responseChannel") MessageChannel channel) {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(channel);
return adapter;
}
@Bean
public SimpleMessageListenerContainer responseContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("request-overall-document-responses");
return container;
}
所有响应都被发送到 一个单独的 Spring 应用程序,该应用程序只是将请求有效负载再次通过管道传输回来(又名用于测试而无需与实际系统集成):
@Bean
public IntegrationFlow systemOneReception(final ConnectionFactory connectionFactory, final AmqpTemplate amqpTemplate) {
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "request-overall-document-system-1"))
.log()
.handle(Amqp.outboundAdapter(amqpTemplate).routingKey("request-overall-document-responses"))
.get();
}
@Bean
public IntegrationFlow systemTwoReception(final ConnectionFactory connectionFactory, final AmqpTemplate amqpTemplate) {
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "request-overall-document-system-2"))
.log()
.handle(Amqp.outboundAdapter(amqpTemplate).routingKey("request-overall-document-responses"))
.get();
}
根据 scatter-gather 实施中的聚合/发布策略,成功发布后系统 A 出现以下错误:
2020-02-29 20:06:39.255 ERROR 152 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: The 'gatherResultChannel' header is required to deliver the gather result., failedMessage=GenericMessage [payload=Overall response: |somerequesttobesent|somerequesttobesent, headers={amqp_receivedDeliveryMode=PERSISTENT, content-length=19, amqp_deliveryTag=2, sequenceSize=1, amqp_redelivered=false, amqp_contentEncoding=UTF-8, host=localhost:18081, someHeader=someValue, connection=keep-alive, correlationId=182ee203-85ab-9ef6-7b19-3a8e2da8f5a7, id=994a0cf5-ad2b-02c3-dc93-74fae2f5092b, cache-control=no-cache, contentType=text/plain, timestamp=1583006799252, http_requestMethod=POST, sequenceNumber=1, amqp_consumerQueue=request-overall-document-responses, accept=*/*, amqp_receivedRoutingKey=request-overall-document-responses, amqp_timestamp=Sat Feb 29 20:06:39 GMT 2020, amqp_messageId=3341deae-7ed0-a042-0bb7-d2d2be871165, http_requestUrl=http://localhost:18081/request-overall-document, amqp_consumerTag=amq.ctag-ULxwuAjp8ZzcopBZYvcbZQ, accept-encoding=gzip, deflate, br, user-agent=PostmanRuntime/7.22.0}]
at org.springframework.integration.scattergather.ScatterGatherHandler.lambda$doInit(ScatterGatherHandler.java:160)
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:77)
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:71)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:431)
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:284)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:265)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:223)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:823)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:475)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:143)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:390)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null(AbstractPollingEndpoint.java:277)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute[=14=](ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller(AbstractPollingEndpoint.java:274)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
现在我明白我还有一些差距,但我正在努力寻找前进的方向:
- 给出的错误:没有一些 'gatherResultChannel' 输出。我原以为这会是随后的 'handles' / 'logs' / w.e 对 scatterGather(...) 调用的结果,但不是这样玩。
- 需要某种形式的映射,从 scatter-gather 聚合的结果返回到原始 Http.XXX 请求。
编辑:从进一步挖掘来看,给出的问题似乎是因为当通过 AMQP(在我的例子中是 RabbitMQ)出去时,有问题的 header deliberately dropped as it's a MessageChannel (see lines 230 to 257)。不确定这里的含义是否是 splitting/aggregation 不打算在多个独立应用程序之间交叉(我的假设是它被删除是因为它是 Java object 的一个实例,这将是有问题的传递)...
进一步编辑:用新的眼光注意到了我以前没有注意到的东西,我粘贴的异常引用了失败的消息,这似乎是输出的明确结果处理(虽然摆弄,在 DirectChannel 和 QueueChannel 之间轻弹,只有 DirectChannel 不打印有效负载,所以没有寻找它)。为了确保它没有做一些克隆或奇怪的事情,更新了存根服务以转换和附加唯一的后缀(如下所示),是的,它实际上是在聚合。
.transform(msg -> MessageFormat.format("{0}_system1response", msg))
.transform(msg -> MessageFormat.format("{0}_system2response", msg))
The 'gatherResultChannel' header is required to deliver the gather result., failedMessage=GenericMessage [payload=Overall response: |sometext_system2response|sometext_system1response, hea...
所以看起来分散、聚集和聚合都在起作用,唯一不好的是给定的处理不知道之后将消息推送到哪里?
再一次: 根据 Gary 的回复,用网关替换了所有适配器,但是这样做不能再扇出了吗?因此从 scatterGather 调用中删除了 scatterGatherSpec 参数,并在两个接收者中替换/添加如下:
.recipientFlow(flow -> flow.handle(Amqp.asyncOutboundGateway(asyncTemplate).routingKeyFunction(m -> "request-overall-document-system-1"), e -> e.id("sytemOneOutboundGateway")))
.recipientFlow(flow -> flow.handle(Amqp.asyncOutboundGateway(asyncTemplate).routingKeyFunction(m -> "request-overall-document-system-2"), e -> e.id("sytemTwoOutboundGateway")))
这是我能找到的最接近工作示例的方法,但是,虽然这确实 sort-of 有效,但它会导致多次重新处理消息 on/off queues,其中我POST 和 'msgtosend' 的预期输出应该是:
Overall message: |msgtosend_system1response|msgtosend_system2response
相反,我得到零星的输出,如:
Overall message: |msgtosend|msgtosend_system1response
Overall message: |msgtosend_system2response|msgtosend_system1response_system1response
Overall message: |msgtosend|msgtosend_system1response_system1response
Overall message: |msgtosend_system2response|msgtosend_system1response_system1response
我假设有一些配置/bean 重叠,但尽我所能尝试我无法隔离它是什么,即连接工厂、侦听器容器、异步模板等。
使用 AMQP 出站网关而不是出站和入站通道适配器;这样频道 header 将被保留。有一个 AsyncAmqpOutboundGateway
可能最适合您的目的。
如果出于某种原因必须使用通道适配器,请使用 header enricher 和 Header Channel Registry 将通道转换为字符串表示,以便保留。
编辑
这是一个简单的例子:
@SpringBootApplication
public class So60469260Application {
public static void main(String[] args) {
SpringApplication.run(So60469260Application.class, args);
}
@Bean
public IntegrationFlow flow(AsyncRabbitTemplate aTemp) {
return IntegrationFlows.from(Gate.class)
.enrichHeaders(he -> he.headerExpression("corr", "payload"))
.scatterGather(rlr -> rlr
.applySequence(true)
.recipientFlow(f1 -> f1.handle(Amqp.asyncOutboundGateway(aTemp)
.routingKey("foo")))
.recipientFlow(f2 -> f2.handle(Amqp.asyncOutboundGateway(aTemp)
.routingKey("bar"))),
agg -> agg.correlationStrategy(msg -> msg.getHeaders().get("corr")))
.get();
}
@Bean
public AsyncRabbitTemplate aTemp(RabbitTemplate template) {
return new AsyncRabbitTemplate(template);
}
@Bean
@DependsOn("flow")
public ApplicationRunner runner(Gate gate) {
return args -> System.out.println(gate.doIt("foo"));
}
@RabbitListener(queues = "foo")
public String foo(String in) {
return in.toUpperCase();
}
@RabbitListener(queues = "bar")
public String bar(String in) {
return in + in;
}
}
interface Gate {
List<String> doIt(String in);
}
[foofoo, FOO]
我是 Spring 集成的新手,并试图利用 scatter-gather 的企业模式,但我正在为实现细节和在线可用示例而苦苦挣扎。
简而言之,我的场景是:
- HTTP 请求从用户发送到系统 A。
- 在响应(又名同步)之前,系统 A 向 N 个系统 X 异步发送 N 个消息。
- 系统 A 等待响应。
- 一旦每个请求系统都有响应,系统 A 就会将这些响应整理成一个更大的响应。
- 系统A最终以较大的响应响应用户。
基本上,就原始消费者而言,一个单一的请求会响应一个答案,而不必 'come back later'。然而,该请求实际上是针对掩盖其背后复杂性的外观(可能会影响数百个系统,在 back-end non-performant 处发出同步请求并且不可行)。
到目前为止我有这个实现(擦除细节所以可能不是我正在玩的 1:1 示例,例如我已经制定的 correlationStrategy 没有做我想要的期望):
@Bean
public IntegrationFlow overallRequest(final AmqpTemplate amqpTemplate) {
return IntegrationFlows.from( // HTTP endpoint to user makes requests on
Http.inboundChannelAdapter("/request-overall-document")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(String.class))
.log()
// Arbitrary header to simplify example, realistically would generate a UUID
// and attach to some correlating header that works for systems involved
.enrichHeaders(p -> p.header("someHeader", "someValue"))
.log()
.scatterGather(
recipientListRouterSpec ->
recipientListRouterSpec
.applySequence(true)
.recipientFlow(
flow ->
flow.handle( // Straight pass through of msg received to see in response
Amqp.outboundAdapter(amqpTemplate)
.exchangeName( // RabbitMQ fanout exchange to N queues to N systems
"request-overall-document-exchange"))),
aggregatorSpec ->
aggregatorSpec
// Again for example, arbitrary once two correlated responses
.correlationStrategy(msg -> msg.getHeaders().get("someHeader"))
.releaseStrategy(gm -> gm.size() == 2)
// Simple string concatenation for overall response
.outputProcessor(
msgrp ->
msgrp.getMessages().stream()
.map(msg -> msg.getPayload().toString())
.reduce("Overall response: ", (nexus, txt) -> nexus + "|" + txt))
// Reset group on each response
.expireGroupsUponCompletion(true),
scatterGatherSpec ->
scatterGatherSpec.gatherChannel(
responseChannel())) // The channel to listen for responses to request on
.log()
.get();
}
以此作为响应通道配置:
@Bean
public MessageChannel responseChannel() {
return new QueueChannel();
}
@Bean
public AmqpInboundChannelAdapter responseChannelAdapter(
SimpleMessageListenerContainer listenerContainer,
@Qualifier("responseChannel") MessageChannel channel) {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(channel);
return adapter;
}
@Bean
public SimpleMessageListenerContainer responseContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("request-overall-document-responses");
return container;
}
所有响应都被发送到 一个单独的 Spring 应用程序,该应用程序只是将请求有效负载再次通过管道传输回来(又名用于测试而无需与实际系统集成):
@Bean
public IntegrationFlow systemOneReception(final ConnectionFactory connectionFactory, final AmqpTemplate amqpTemplate) {
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "request-overall-document-system-1"))
.log()
.handle(Amqp.outboundAdapter(amqpTemplate).routingKey("request-overall-document-responses"))
.get();
}
@Bean
public IntegrationFlow systemTwoReception(final ConnectionFactory connectionFactory, final AmqpTemplate amqpTemplate) {
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "request-overall-document-system-2"))
.log()
.handle(Amqp.outboundAdapter(amqpTemplate).routingKey("request-overall-document-responses"))
.get();
}
根据 scatter-gather 实施中的聚合/发布策略,成功发布后系统 A 出现以下错误:
2020-02-29 20:06:39.255 ERROR 152 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: The 'gatherResultChannel' header is required to deliver the gather result., failedMessage=GenericMessage [payload=Overall response: |somerequesttobesent|somerequesttobesent, headers={amqp_receivedDeliveryMode=PERSISTENT, content-length=19, amqp_deliveryTag=2, sequenceSize=1, amqp_redelivered=false, amqp_contentEncoding=UTF-8, host=localhost:18081, someHeader=someValue, connection=keep-alive, correlationId=182ee203-85ab-9ef6-7b19-3a8e2da8f5a7, id=994a0cf5-ad2b-02c3-dc93-74fae2f5092b, cache-control=no-cache, contentType=text/plain, timestamp=1583006799252, http_requestMethod=POST, sequenceNumber=1, amqp_consumerQueue=request-overall-document-responses, accept=*/*, amqp_receivedRoutingKey=request-overall-document-responses, amqp_timestamp=Sat Feb 29 20:06:39 GMT 2020, amqp_messageId=3341deae-7ed0-a042-0bb7-d2d2be871165, http_requestUrl=http://localhost:18081/request-overall-document, amqp_consumerTag=amq.ctag-ULxwuAjp8ZzcopBZYvcbZQ, accept-encoding=gzip, deflate, br, user-agent=PostmanRuntime/7.22.0}]
at org.springframework.integration.scattergather.ScatterGatherHandler.lambda$doInit(ScatterGatherHandler.java:160)
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:77)
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:71)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:431)
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:284)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:265)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:223)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:823)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:475)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:143)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:390)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null(AbstractPollingEndpoint.java:277)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute[=14=](ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller(AbstractPollingEndpoint.java:274)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
现在我明白我还有一些差距,但我正在努力寻找前进的方向:
- 给出的错误:没有一些 'gatherResultChannel' 输出。我原以为这会是随后的 'handles' / 'logs' / w.e 对 scatterGather(...) 调用的结果,但不是这样玩。
- 需要某种形式的映射,从 scatter-gather 聚合的结果返回到原始 Http.XXX 请求。
编辑:从进一步挖掘来看,给出的问题似乎是因为当通过 AMQP(在我的例子中是 RabbitMQ)出去时,有问题的 header deliberately dropped as it's a MessageChannel (see lines 230 to 257)。不确定这里的含义是否是 splitting/aggregation 不打算在多个独立应用程序之间交叉(我的假设是它被删除是因为它是 Java object 的一个实例,这将是有问题的传递)...
进一步编辑:用新的眼光注意到了我以前没有注意到的东西,我粘贴的异常引用了失败的消息,这似乎是输出的明确结果处理(虽然摆弄,在 DirectChannel 和 QueueChannel 之间轻弹,只有 DirectChannel 不打印有效负载,所以没有寻找它)。为了确保它没有做一些克隆或奇怪的事情,更新了存根服务以转换和附加唯一的后缀(如下所示),是的,它实际上是在聚合。
.transform(msg -> MessageFormat.format("{0}_system1response", msg))
.transform(msg -> MessageFormat.format("{0}_system2response", msg))
The 'gatherResultChannel' header is required to deliver the gather result., failedMessage=GenericMessage [payload=Overall response: |sometext_system2response|sometext_system1response, hea...
所以看起来分散、聚集和聚合都在起作用,唯一不好的是给定的处理不知道之后将消息推送到哪里?
再一次: 根据 Gary 的回复,用网关替换了所有适配器,但是这样做不能再扇出了吗?因此从 scatterGather 调用中删除了 scatterGatherSpec 参数,并在两个接收者中替换/添加如下:
.recipientFlow(flow -> flow.handle(Amqp.asyncOutboundGateway(asyncTemplate).routingKeyFunction(m -> "request-overall-document-system-1"), e -> e.id("sytemOneOutboundGateway")))
.recipientFlow(flow -> flow.handle(Amqp.asyncOutboundGateway(asyncTemplate).routingKeyFunction(m -> "request-overall-document-system-2"), e -> e.id("sytemTwoOutboundGateway")))
这是我能找到的最接近工作示例的方法,但是,虽然这确实 sort-of 有效,但它会导致多次重新处理消息 on/off queues,其中我POST 和 'msgtosend' 的预期输出应该是:
Overall message: |msgtosend_system1response|msgtosend_system2response
相反,我得到零星的输出,如:
Overall message: |msgtosend|msgtosend_system1response
Overall message: |msgtosend_system2response|msgtosend_system1response_system1response
Overall message: |msgtosend|msgtosend_system1response_system1response
Overall message: |msgtosend_system2response|msgtosend_system1response_system1response
我假设有一些配置/bean 重叠,但尽我所能尝试我无法隔离它是什么,即连接工厂、侦听器容器、异步模板等。
使用 AMQP 出站网关而不是出站和入站通道适配器;这样频道 header 将被保留。有一个 AsyncAmqpOutboundGateway
可能最适合您的目的。
如果出于某种原因必须使用通道适配器,请使用 header enricher 和 Header Channel Registry 将通道转换为字符串表示,以便保留。
编辑
这是一个简单的例子:
@SpringBootApplication
public class So60469260Application {
public static void main(String[] args) {
SpringApplication.run(So60469260Application.class, args);
}
@Bean
public IntegrationFlow flow(AsyncRabbitTemplate aTemp) {
return IntegrationFlows.from(Gate.class)
.enrichHeaders(he -> he.headerExpression("corr", "payload"))
.scatterGather(rlr -> rlr
.applySequence(true)
.recipientFlow(f1 -> f1.handle(Amqp.asyncOutboundGateway(aTemp)
.routingKey("foo")))
.recipientFlow(f2 -> f2.handle(Amqp.asyncOutboundGateway(aTemp)
.routingKey("bar"))),
agg -> agg.correlationStrategy(msg -> msg.getHeaders().get("corr")))
.get();
}
@Bean
public AsyncRabbitTemplate aTemp(RabbitTemplate template) {
return new AsyncRabbitTemplate(template);
}
@Bean
@DependsOn("flow")
public ApplicationRunner runner(Gate gate) {
return args -> System.out.println(gate.doIt("foo"));
}
@RabbitListener(queues = "foo")
public String foo(String in) {
return in.toUpperCase();
}
@RabbitListener(queues = "bar")
public String bar(String in) {
return in + in;
}
}
interface Gate {
List<String> doIt(String in);
}
[foofoo, FOO]