聚合 Future/Response 用于 SQS Acks/Error 在多个队列上 Spring 集成 SQS

Aggregated Future/Response for SQS Acks/Error on multiple queues with Spring Integration SQS

我正在使用 Gateway 并在网关内部循环以通过 sqs-outbound adapter.

将消息发送到 2 个队列

我想实现这样的目标:

jsonArray.forEach(data -> {
    Future<AggregatedAckorErrorFrom2Queues> result = dataPublishGateway.publishToService(data);
    futures.add(result);
});
futures.forEach(f -> {                      
    System.out.println("Future Result -> "+ f.get())
});

网关和 SQS 适配器配置

<!-- Gateway to Publish to SQS -->
<task:executor id="dataExecutor" pool-size="5"/>
<int:publish-subscribe-channel id="dataChannel" task-executor="dataExecutor" />
<int:gateway service-interface="com.integration.gateway.PublishGateway" id="dataPublishGateway">
    <int:method name="publishToDataService" payload-expression="#args[0]" request-channel="dataChannel" />
</int:gateway>

<!-- Send to 2 Queues -->

    <int:channel id="successChannel"/>
    <int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
                                          queue="queue-1"
                                          channel="dataChannel"
                                          success-channel="successChannel"
                                          failure-channel="errorChannel"/>
    <int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
                                          queue="queue-2"
                                          channel="dataChannel"
                                          success-channel="successChannel"
                                          failure-channel="errorChannel"/>

我正在 dataChannelaggregator 上查看 apply-sequence,但是 aggregator 必须能够同时处理 ackerror.

问题 : 我如何 return 从 2 个队列返回网关的聚合响应(ack + 错误)?

success-channelfailure-channel 你的方向是正确的。聚合器可以订阅 successChannel 并使用默认策略进行关联和发布。对于 failure-channel,您可能需要一个自定义错误通道,而不是那个全局的 errorChannel。从 SqsMessageHandler 发送到该通道的 ErrorMessage 有一个像这样的有效载荷 AwsRequestFailureException,其中它的 failedMessage 实际上包含一个请求消息,其中包含上述相关细节。因此,您需要添加一些转换步骤以在继续聚合器之前从异常中提取该信息。