Spring 集成多个消费者未同时处理

Spring Integration Multiple consumers not processing concurrently

我正在使用 Spring 与 ActiveMQ 的集成。我用 maxConcurrentConsumers = 5 定义了一个 DefaultMessageListenerContainer。它在 .在 int-xml:validating-filter 和 int-xml:unmarshalling-transformer 之后,我定义了一个队列通道 actionInstructionTransformed。我有这个队列通道的轮询器。当我启动我的应用程序时,在 ActiveMQ 控制台中,我可以看到一个连接已创建并且在五个会话中。

现在,我得到了一个 @MessageEndpoint 方法注释

@ServiceActivator(inputChannel = "actionInstructionTransformed", poller = @Poller(value = "customPoller")). 

我在方法入口处得到了一条日志语句。每条消息的处理时间很长(几分钟)。在我的日志中,我可以看到 thread-1 开始处理,然后我只能看到 thread-1 输出。只有当 thread-1 处理完 1 条消息时,我才能看到 thread-2 开始处理下一条消息,等等。我的 class 注释 @MessageEndpoint 中没有任何同步块。我还没有设法获得 thread-1thread-2 等并发处理消息。

有没有人经历过类似的事情?

看,你说:

After an int-xml:validating-filter and an int-xml:unmarshalling-transformer, I defined a queue channel actionInstructionTransformed.

现在让我们转到 QueueChannelPollingConsumer definitions

On the other hand, a channel adapter connected to a channel that implements the org.springframework.messaging.PollableChannel interface (e.g. a QueueChannel) will produce an instance of PollingConsumer.

注意 @Poller (PollerMetadata) 有 taskExecutor 选项。

默认情况下,TaskScedhuler 根据 trigger 配置定期询问 QueueChannel 数据。如果那是 PeriodicTriggerfixedRate = false 这样的默认选项,下一次投票真的发生在上一次之后。这就是为什么你只看到一个线程。

因此,请尝试配置 taskExecutor,您来自该队列的消息将并行发送。

DefaultMessageListenerContainer 上的 concurrency 无效。因为最后您将所有这些消息都放到了 QueueChannel。在这里,一个新的线程模型开始基于 @Poller 配置工作。