聚合 Future/Response 用于 SQS Acks/Error 在多个队列上 Spring 集成 SQS
Aggregated Future/Response for SQS Acks/Error on multiple queues with Spring Integration SQS
我正在使用 Gateway
并在网关内部循环以通过 sqs-outbound adapter
.
将消息发送到 2 个队列
我想实现这样的目标:
jsonArray.forEach(data -> {
Future<AggregatedAckorErrorFrom2Queues> result = dataPublishGateway.publishToService(data);
futures.add(result);
});
futures.forEach(f -> {
System.out.println("Future Result -> "+ f.get())
});
网关和 SQS 适配器配置
<!-- Gateway to Publish to SQS -->
<task:executor id="dataExecutor" pool-size="5"/>
<int:publish-subscribe-channel id="dataChannel" task-executor="dataExecutor" />
<int:gateway service-interface="com.integration.gateway.PublishGateway" id="dataPublishGateway">
<int:method name="publishToDataService" payload-expression="#args[0]" request-channel="dataChannel" />
</int:gateway>
<!-- Send to 2 Queues -->
<int:channel id="successChannel"/>
<int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
queue="queue-1"
channel="dataChannel"
success-channel="successChannel"
failure-channel="errorChannel"/>
<int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
queue="queue-2"
channel="dataChannel"
success-channel="successChannel"
failure-channel="errorChannel"/>
我正在 dataChannel
和 aggregator
上查看 apply-sequence
,但是 aggregator
必须能够同时处理 ack
和 error
.
问题 : 我如何 return 从 2 个队列返回网关的聚合响应(ack + 错误)?
success-channel
和 failure-channel
你的方向是正确的。聚合器可以订阅 successChannel
并使用默认策略进行关联和发布。对于 failure-channel
,您可能需要一个自定义错误通道,而不是那个全局的 errorChannel
。从 SqsMessageHandler
发送到该通道的 ErrorMessage
有一个像这样的有效载荷 AwsRequestFailureException
,其中它的 failedMessage
实际上包含一个请求消息,其中包含上述相关细节。因此,您需要添加一些转换步骤以在继续聚合器之前从异常中提取该信息。
我正在使用 Gateway
并在网关内部循环以通过 sqs-outbound adapter
.
我想实现这样的目标:
jsonArray.forEach(data -> {
Future<AggregatedAckorErrorFrom2Queues> result = dataPublishGateway.publishToService(data);
futures.add(result);
});
futures.forEach(f -> {
System.out.println("Future Result -> "+ f.get())
});
网关和 SQS 适配器配置
<!-- Gateway to Publish to SQS -->
<task:executor id="dataExecutor" pool-size="5"/>
<int:publish-subscribe-channel id="dataChannel" task-executor="dataExecutor" />
<int:gateway service-interface="com.integration.gateway.PublishGateway" id="dataPublishGateway">
<int:method name="publishToDataService" payload-expression="#args[0]" request-channel="dataChannel" />
</int:gateway>
<!-- Send to 2 Queues -->
<int:channel id="successChannel"/>
<int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
queue="queue-1"
channel="dataChannel"
success-channel="successChannel"
failure-channel="errorChannel"/>
<int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
queue="queue-2"
channel="dataChannel"
success-channel="successChannel"
failure-channel="errorChannel"/>
我正在 dataChannel
和 aggregator
上查看 apply-sequence
,但是 aggregator
必须能够同时处理 ack
和 error
.
问题 : 我如何 return 从 2 个队列返回网关的聚合响应(ack + 错误)?
success-channel
和 failure-channel
你的方向是正确的。聚合器可以订阅 successChannel
并使用默认策略进行关联和发布。对于 failure-channel
,您可能需要一个自定义错误通道,而不是那个全局的 errorChannel
。从 SqsMessageHandler
发送到该通道的 ErrorMessage
有一个像这样的有效载荷 AwsRequestFailureException
,其中它的 failedMessage
实际上包含一个请求消息,其中包含上述相关细节。因此,您需要添加一些转换步骤以在继续聚合器之前从异常中提取该信息。