Spring 集成:AmqpInboundChannelAdapter 上的 TaskExecutor 和 MaxConcurrentConsumers

Spring Integration: TaskExecutor and MaxConcurrentConsumers on AmqpInboundChannelAdapter

我的 Spring 集成应用程序使用来自 RabbitMQ 的消息,将它们转换为 SOAP 消息并执行 Web 服务请求。

每秒可以从队列中获取很多 (10 – 50) 条消息。 或者在应用程序重新启动后,RabbitMQ 队列中可能有数千条消息。

在并行线程中处理最多 10 条消息的最佳方案是什么(消息排序很好,但不是必需的功能,如果 Web 服务响应业务失败,那么失败的消息应该重试直到成功) .

Amqp 侦听器不应从队列中消耗更多消息,因为任务执行器中没有可用的繁忙线程。 我可以在这样的通道中定义一个 ThreadExecutor:

@Bean
public AmqpInboundChannelAdapterSMLCSpec 
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue) 
{
  return Amqp.inboundAdapter(connectionFactory, queue);
}

IntegrationFlow integrationFlow = IntegrationFlows
  .from(amqpInboundChannelAdapter)
  .channel(c -> c.executor(exportFlowsExecutor))
  .transform(businessObjectToSoapRequestTransformer)
  .handle(webServiceOutboundGatewayFactory.getObject())
  .get();

或者像这样在 AmqpInboundChannelAdapter 中定义一个任务执行器就足够了,而不在流定义中定义通道任务执行器:

@Bean
public AmqpInboundChannelAdapterSMLCSpec 
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue) 
{
  return Amqp.inboundAdapter(connectionFactory, queue)
             .configureContainer(c->c.taskExecutor(taskExecutor));
}

或者可能像选项 1 一样为通道定义任务执行器,但另外在通道适配器上设置 maxConcurrentConsumers,如下所示:

@Bean
public AmqpInboundChannelAdapterSMLCSpec 
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue) 
{
  return Amqp.inboundAdapter(connectionFactory, queue)
             .configureContainer(c->c.maxConcurrentConsumers(10));
}

最佳做法是在 ListenerContainer 上配置 concurrency 并让所有下游进程在容器的这些线程上发生。这样,当由于线程繁忙而不再从队列中轮询消息时,您将获得自然的背压。另一方面,不会因为在侦听器容器之后使用 ExecutorChannel 释放轮询线程而导致消息丢失,并且当前消息将被确认为已消耗,但您可能会在下游失败.