如何使用 Spring 集成处理来自 AWS SQS FiFo queue 的超过 10 条并发消息

How to process more than 10 concurrent messages from an AWS SQS FiFo queue using Spring Integration

我希望能够使用 Spring 集成工作流一次处理 10 条以上的 SQS 消息。

根据这个问题,建议使用 ExecutorChannel。我更新了我的代码,但仍然有相同的症状。

How execute Spring integration flow in multiple threads to consume more Amazon SQS queue messages in parallel?

进行此更新后,我的应用程序请求了 10 条消息,并处理了这些消息,只有在我接近流程结束时调用 amazonSQSClient.deleteMessage 之后,它才会执行接受来自 SQS queue.

的另外 10 条消息

应用程序使用 SQS FiFo queue。

还有什么我遗漏的吗,或者这是使用 SqsMessageDeletionPolicy.NEVER 然后在流程结束时删除消息的不可避免的症状?由于其他限制,在流程开始时接受消息并不是一个真正的选择。

这里是相关的代码片段,有一些简化,但我希望它能表达问题。

Queue 配置

@Bean
public AsyncTaskExecutor inputChannelTaskExecutor() {
    SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
    executor.setConcurrencyLimit(50);
    return executor;
}

@Bean
@Qualifier("inputChannel")
public ExecutorChannel inputChannel() {
    return new ExecutorChannel(inputChannelTaskExecutor());
}

我还尝试了 ThreadPoolTask​​Executor 而不是 SimpleAsyncTaskExecutor,结果相同,但我也会包括它,以防它提供其他见解。

    @Bean
    public AsyncTaskExecutor inputChannelTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(50);
        executor.setThreadNamePrefix("spring-async-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.afterPropertiesSet();
        executor.initialize();
        return executor;
    }

SQS 通道适配器

@Bean
public SqsMessageDrivenChannelAdapter changeQueueMessageAdapter() {
    SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(this.amazonSQSClient, changeQueue);
    adapter.setOutputChannel(inputChannel);
    adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.NEVER);
    return adapter;
}


@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
    return Pollers.fixedRate(500, TimeUnit.MILLISECONDS).maxMessagesPerPoll(10);
}

简化主流程

我们的一个常见场景是在短时间内获得多个 Branch 编辑。此流程仅 'cares' 至少发生了一次编辑。 messageTransformer 从 payload 文档中提取一个 id 并将其放入 header dsp_docId 然后我们用它来聚合on(我们在其他几个地方使用了这个 id,所以我们觉得 header 比在自定义聚合器中完成所有工作更有意义)。

provisioningServiceActivator 检索分支的最新版本,然后路由器决定是否需要进一步转换(在这种情况下,它会将其发送到 transformBranchChannel) 或者它可以发送到我们的 PI 实例(通过 sendToPiChannel)。

转换流(未显示,我认为您不需要它)最终导致发送到 PI 流,它只是先做更多的工作。

listingGroupProcessor 捕获所有 aws_receiptHandle header 并将它们添加到新的 header作为|分隔列表。

sendToPi 流(和 errorFlow)以对自定义处理程序的调用结束,该处理程序负责删除该 aws_receiptHandle 字符串列表引用的所有 SQS 消息。

@Bean
IntegrationFlow sqsListener() {
    return IntegrationFlows.from(inputChannel)
                           .transform(messageTransformer)
                           .aggregate(a -> a.correlationExpression("1")
                                            .outputProcessor(listingGroupProcessor)
                                            .autoStartup(true)
                                            .correlationStrategy(message -> message.getHeaders().get("dsp_docId"))
                                            .groupTimeout(messageAggregateTimeout)  // currently 25s
                                            .expireGroupsUponCompletion(true)
                                            .sendPartialResultOnExpiry(true)
                                            .get())

                           .handle(provisioningServiceActivator, "handleStandard")
                           .route(Branch.class, branch -> (branch.isSuppressed() == null || !branch.isSuppressed()),
                                  routerSpec -> routerSpec.channelMapping(true, "transformBranchChannel")
                                                          .resolutionRequired(false)
                                                          .defaultOutputToParentFlow())

                           .channel(sendtoPiChannel)
                           .get();
}

我想我会 post 这个作为答案,因为它解决了我的问题,并且可能对其他人有所帮助。作为答案,它更有可能被发现,而不是对可能被忽视的原始问题进行编辑。

首先,我应该注意到我们正在使用 FiFo 队列。

问题实际上出在链的更上端,我们将 MessageGroupId 设置为描述数据源的简单值。这意味着我们有非常大的消息组。

ReceiveMessage 文档中您可以看到,在这种情况下,它非常明智地阻止您从该组请求更多消息,因为如果需要放回消息,则无法保证顺序排队。

更新 post 消息的代码以设置适当的 MessageGroupId 然后意味着 ExecutorChannel 按预期工作.

While messages with a particular MessageGroupId are invisible, no more messages belonging to the same MessageGroupId are returned until the visibility timeout expires. You can still receive messages with another MessageGroupId as long as it is also visible.