无法在 MDB 中并发处理消息

Unable to process messages concurrently in MDB

我正在尝试创建一个带有 EJB 注释的简单 MDB,以便我可以异步执行任务。

有数以千计的慢任务要执行,硬件有很多处理器和 RAM,所以我需要 运行 以并发方式(在许多线程中)执行它们。它在开始时以这种方式工作(在许多线程中),但是在一些消息之后它“缩小”并且一次只处理一条消息。

这些是一些相关信息:

  1. 我正在使用 Java 1.8.0_221 和 WildFly 19.1.0.

  2. 这是我的 MDB 消费者:

@MessageDriven(activationConfig = {
        @ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "java:/jms/queue/MessageQueue"),
        @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue")
})
public class MessageConsumerTest implements MessageListener {
  (...)
}
  1. 这个消费者有一些注入的依赖。他们中的大多数有一个 @Stateless 注释,一些有一个 @Singleton 注释。对于单例依赖,它们都有 @ConcurrencyManagement(ConcurrencyManagementType.BEAN) 注释。

  2. standalone-full.xml 中声明的应用程序数据源有一个 jta="false" 参数。

  3. 我的队列是非持久化的(如果 WildFly 停止,生产者会检查并再次重新发送所有待处理的消息),所以这就是我在生产者中所做的:

@Inject
@JMSConnectionFactory("java:jboss/DefaultJMSConnectionFactory")
private JMSContext context;

@Resource(mappedName = "java:/jms/queue/MessageQueue")
private Queue queue;

(...)

context.createProducer().setDeliveryMode(DeliveryMode.NON_PERSISTENT).send(queue, msg);

我尝试更改很多东西(standalone-full.xml 中的池大小、MDB 使用者中的 @ActivationConfigProperty 注释、-D.... 参数),但是 none工作了。结果总是一样的:MDB 开始并发处理许多对象,但下降到一个。

改变这种行为的正确方法是什么and/or做更深入的分析?

提前致谢!

更多信息: 我试图在“jboss-cli”应用程序中检查队列 运行ning 命令,结果如下:

一开始,有 3 个“ServerConsumer”(这是预期的行为):

[standalone@localhost:9990 /] jms-queue list-delivering-messages --queue-address=MessageQueue
  consumerName=ServerConsumer [id=f3e141c0-b56d-11ea-8030-b8aeed89da5a:f3e1b6f2-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
  elements
      durable=false
      address=jms.queue.MessageQueue
      __AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
      _AMQ_ROUTING_TYPE=1
      messageID=253403378547L
      expiration=0
      type=3
      priority=4
      userID=ID:227e8bd1-b56e-11ea-8030-b8aeed89da5a
      timestamp=1592929521797L
  consumerName=ServerConsumer [id=f3ea1b75-b56d-11ea-8030-b8aeed89da5a:f3ea4287-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
  elements
      durable=false
      address=jms.queue.MessageQueue
      __AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
      _AMQ_ROUTING_TYPE=1
      messageID=253403378523L
      expiration=0
      type=3
      priority=4
      userID=ID:f63d0c39-b56d-11ea-8030-b8aeed89da5a
      timestamp=1592929447548L
  consumerName=ServerConsumer [id=f3e86dc2-b56d-11ea-8030-b8aeed89da5a:f3e93114-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
  elements
      durable=false
      address=jms.queue.MessageQueue
      __AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
      _AMQ_ROUTING_TYPE=1
      messageID=253403378520L
      expiration=0
      type=3
      priority=4
      userID=ID:f6170da8-b56d-11ea-8030-b8aeed89da5a
      timestamp=1592929447299L

一段时间后,只有 2 个 ServerConsumer 实例,但其中一个似乎是两条消息:

[standalone@localhost:9990 /] jms-queue list-delivering-messages --queue-address=MessageQueue
  consumerName=ServerConsumer [id=f3d29bb3-b56d-11ea-8030-b8aeed89da5a:f3d66c45-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
  elements
      durable=false
      address=jms.queue.MessageQueue
      __AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
      _AMQ_ROUTING_TYPE=1
      messageID=253403378580L
      expiration=0
      type=3
      priority=4
      userID=ID:6a77afd0-b56e-11ea-8030-b8aeed89da5a
      timestamp=1592929642548L
  consumerName=ServerConsumer [id=f3e86dc2-b56d-11ea-8030-b8aeed89da5a:f3e93114-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
  elements
      durable=false
      address=jms.queue.MessageQueue
      __AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
      _AMQ_ROUTING_TYPE=1
      messageID=253403378520L
      expiration=0
      type=3
      priority=4
      userID=ID:f6170da8-b56d-11ea-8030-b8aeed89da5a
      timestamp=1592929447299L
      durable=false
      address=jms.queue.MessageQueue
      __AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
      _AMQ_ROUTING_TYPE=1
      messageID=253403378565L
      expiration=0
      type=3
      priority=4
      userID=ID:34d2c6c7-b56e-11ea-8030-b8aeed89da5a
      timestamp=1592929552548L

再过一段时间,只有一个 ServerConsumer 实例,它似乎在处理所有消息。至此,并发就没了!

[standalone@localhost:9990 /] jms-queue list-delivering-messages --queue-address=MessageQueue
  consumerName=ServerConsumer [id=f3e86dc2-b56d-11ea-8030-b8aeed89da5a:f3e93114-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
  elements
      durable=false
      address=jms.queue.MessageQueue
      __AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
      _AMQ_ROUTING_TYPE=1
      messageID=253403378520L
      expiration=0
      type=3
      priority=4
      userID=ID:f6170da8-b56d-11ea-8030-b8aeed89da5a
      timestamp=1592929447299L
      durable=false
      address=jms.queue.MessageQueue
      __AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
      _AMQ_ROUTING_TYPE=1
      messageID=253403378565L
      expiration=0
      type=3
      priority=4
      userID=ID:34d2c6c7-b56e-11ea-8030-b8aeed89da5a
      timestamp=1592929552548L
      durable=false
      address=jms.queue.MessageQueue
      __AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
      _AMQ_ROUTING_TYPE=1
      messageID=253403378610L
      expiration=0
      type=3
      priority=4
      userID=ID:9751c5ea-b56e-11ea-8030-b8aeed89da5a
      timestamp=1592929717797L

我认为这是由于 MDB 下的 JMS 会话中的消息缓冲所致。尝试将 consumerWindowSize 激活配置 属性 设置为 0,例如:

@MessageDriven(activationConfig = {
        @ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "java:/jms/queue/MessageQueue"),
        @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
        @ActivationConfigProperty(propertyName = "consumerWindowSize", propertyValue = "0")
})
public class MessageConsumerTest implements MessageListener {
  (...)
}

此设置将在 ActiveMQ Artemis documentation 中进一步讨论。