Spring 集成:AmqpInboundChannelAdapter 上的 TaskExecutor 和 MaxConcurrentConsumers
Spring Integration: TaskExecutor and MaxConcurrentConsumers on AmqpInboundChannelAdapter
我的 Spring 集成应用程序使用来自 RabbitMQ 的消息,将它们转换为 SOAP 消息并执行 Web 服务请求。
每秒可以从队列中获取很多 (10 – 50) 条消息。
或者在应用程序重新启动后,RabbitMQ 队列中可能有数千条消息。
在并行线程中处理最多 10 条消息的最佳方案是什么(消息排序很好,但不是必需的功能,如果 Web 服务响应业务失败,那么失败的消息应该重试直到成功) .
Amqp 侦听器不应从队列中消耗更多消息,因为任务执行器中没有可用的繁忙线程。
我可以在这样的通道中定义一个 ThreadExecutor:
@Bean
public AmqpInboundChannelAdapterSMLCSpec
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue)
{
return Amqp.inboundAdapter(connectionFactory, queue);
}
IntegrationFlow integrationFlow = IntegrationFlows
.from(amqpInboundChannelAdapter)
.channel(c -> c.executor(exportFlowsExecutor))
.transform(businessObjectToSoapRequestTransformer)
.handle(webServiceOutboundGatewayFactory.getObject())
.get();
或者像这样在 AmqpInboundChannelAdapter 中定义一个任务执行器就足够了,而不在流定义中定义通道任务执行器:
@Bean
public AmqpInboundChannelAdapterSMLCSpec
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue)
{
return Amqp.inboundAdapter(connectionFactory, queue)
.configureContainer(c->c.taskExecutor(taskExecutor));
}
或者可能像选项 1 一样为通道定义任务执行器,但另外在通道适配器上设置 maxConcurrentConsumers,如下所示:
@Bean
public AmqpInboundChannelAdapterSMLCSpec
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue)
{
return Amqp.inboundAdapter(connectionFactory, queue)
.configureContainer(c->c.maxConcurrentConsumers(10));
}
最佳做法是在 ListenerContainer
上配置 concurrency
并让所有下游进程在容器的这些线程上发生。这样,当由于线程繁忙而不再从队列中轮询消息时,您将获得自然的背压。另一方面,不会因为在侦听器容器之后使用 ExecutorChannel
释放轮询线程而导致消息丢失,并且当前消息将被确认为已消耗,但您可能会在下游失败.
我的 Spring 集成应用程序使用来自 RabbitMQ 的消息,将它们转换为 SOAP 消息并执行 Web 服务请求。
每秒可以从队列中获取很多 (10 – 50) 条消息。 或者在应用程序重新启动后,RabbitMQ 队列中可能有数千条消息。
在并行线程中处理最多 10 条消息的最佳方案是什么(消息排序很好,但不是必需的功能,如果 Web 服务响应业务失败,那么失败的消息应该重试直到成功) .
Amqp 侦听器不应从队列中消耗更多消息,因为任务执行器中没有可用的繁忙线程。 我可以在这样的通道中定义一个 ThreadExecutor:
@Bean
public AmqpInboundChannelAdapterSMLCSpec
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue)
{
return Amqp.inboundAdapter(connectionFactory, queue);
}
IntegrationFlow integrationFlow = IntegrationFlows
.from(amqpInboundChannelAdapter)
.channel(c -> c.executor(exportFlowsExecutor))
.transform(businessObjectToSoapRequestTransformer)
.handle(webServiceOutboundGatewayFactory.getObject())
.get();
或者像这样在 AmqpInboundChannelAdapter 中定义一个任务执行器就足够了,而不在流定义中定义通道任务执行器:
@Bean
public AmqpInboundChannelAdapterSMLCSpec
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue)
{
return Amqp.inboundAdapter(connectionFactory, queue)
.configureContainer(c->c.taskExecutor(taskExecutor));
}
或者可能像选项 1 一样为通道定义任务执行器,但另外在通道适配器上设置 maxConcurrentConsumers,如下所示:
@Bean
public AmqpInboundChannelAdapterSMLCSpec
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue)
{
return Amqp.inboundAdapter(connectionFactory, queue)
.configureContainer(c->c.maxConcurrentConsumers(10));
}
最佳做法是在 ListenerContainer
上配置 concurrency
并让所有下游进程在容器的这些线程上发生。这样,当由于线程繁忙而不再从队列中轮询消息时,您将获得自然的背压。另一方面,不会因为在侦听器容器之后使用 ExecutorChannel
释放轮询线程而导致消息丢失,并且当前消息将被确认为已消耗,但您可能会在下游失败.