无法在 MDB 中并发处理消息
Unable to process messages concurrently in MDB
我正在尝试创建一个带有 EJB 注释的简单 MDB,以便我可以异步执行任务。
有数以千计的慢任务要执行,硬件有很多处理器和 RAM,所以我需要 运行 以并发方式(在许多线程中)执行它们。它在开始时以这种方式工作(在许多线程中),但是在一些消息之后它“缩小”并且一次只处理一条消息。
这些是一些相关信息:
我正在使用 Java 1.8.0_221 和 WildFly 19.1.0.
这是我的 MDB 消费者:
@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "java:/jms/queue/MessageQueue"),
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue")
})
public class MessageConsumerTest implements MessageListener {
(...)
}
这个消费者有一些注入的依赖。他们中的大多数有一个 @Stateless
注释,一些有一个 @Singleton
注释。对于单例依赖,它们都有 @ConcurrencyManagement(ConcurrencyManagementType.BEAN)
注释。
在 standalone-full.xml
中声明的应用程序数据源有一个 jta="false"
参数。
我的队列是非持久化的(如果 WildFly 停止,生产者会检查并再次重新发送所有待处理的消息),所以这就是我在生产者中所做的:
@Inject
@JMSConnectionFactory("java:jboss/DefaultJMSConnectionFactory")
private JMSContext context;
@Resource(mappedName = "java:/jms/queue/MessageQueue")
private Queue queue;
(...)
context.createProducer().setDeliveryMode(DeliveryMode.NON_PERSISTENT).send(queue, msg);
我尝试更改很多东西(standalone-full.xml
中的池大小、MDB 使用者中的 @ActivationConfigProperty
注释、-D....
参数),但是 none工作了。结果总是一样的:MDB 开始并发处理许多对象,但下降到一个。
改变这种行为的正确方法是什么and/or做更深入的分析?
提前致谢!
更多信息: 我试图在“jboss-cli”应用程序中检查队列 运行ning 命令,结果如下:
一开始,有 3 个“ServerConsumer”(这是预期的行为):
[standalone@localhost:9990 /] jms-queue list-delivering-messages --queue-address=MessageQueue
consumerName=ServerConsumer [id=f3e141c0-b56d-11ea-8030-b8aeed89da5a:f3e1b6f2-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
elements
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378547L
expiration=0
type=3
priority=4
userID=ID:227e8bd1-b56e-11ea-8030-b8aeed89da5a
timestamp=1592929521797L
consumerName=ServerConsumer [id=f3ea1b75-b56d-11ea-8030-b8aeed89da5a:f3ea4287-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
elements
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378523L
expiration=0
type=3
priority=4
userID=ID:f63d0c39-b56d-11ea-8030-b8aeed89da5a
timestamp=1592929447548L
consumerName=ServerConsumer [id=f3e86dc2-b56d-11ea-8030-b8aeed89da5a:f3e93114-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
elements
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378520L
expiration=0
type=3
priority=4
userID=ID:f6170da8-b56d-11ea-8030-b8aeed89da5a
timestamp=1592929447299L
一段时间后,只有 2 个 ServerConsumer
实例,但其中一个似乎是两条消息:
[standalone@localhost:9990 /] jms-queue list-delivering-messages --queue-address=MessageQueue
consumerName=ServerConsumer [id=f3d29bb3-b56d-11ea-8030-b8aeed89da5a:f3d66c45-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
elements
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378580L
expiration=0
type=3
priority=4
userID=ID:6a77afd0-b56e-11ea-8030-b8aeed89da5a
timestamp=1592929642548L
consumerName=ServerConsumer [id=f3e86dc2-b56d-11ea-8030-b8aeed89da5a:f3e93114-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
elements
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378520L
expiration=0
type=3
priority=4
userID=ID:f6170da8-b56d-11ea-8030-b8aeed89da5a
timestamp=1592929447299L
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378565L
expiration=0
type=3
priority=4
userID=ID:34d2c6c7-b56e-11ea-8030-b8aeed89da5a
timestamp=1592929552548L
再过一段时间,只有一个 ServerConsumer
实例,它似乎在处理所有消息。至此,并发就没了!
[standalone@localhost:9990 /] jms-queue list-delivering-messages --queue-address=MessageQueue
consumerName=ServerConsumer [id=f3e86dc2-b56d-11ea-8030-b8aeed89da5a:f3e93114-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
elements
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378520L
expiration=0
type=3
priority=4
userID=ID:f6170da8-b56d-11ea-8030-b8aeed89da5a
timestamp=1592929447299L
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378565L
expiration=0
type=3
priority=4
userID=ID:34d2c6c7-b56e-11ea-8030-b8aeed89da5a
timestamp=1592929552548L
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378610L
expiration=0
type=3
priority=4
userID=ID:9751c5ea-b56e-11ea-8030-b8aeed89da5a
timestamp=1592929717797L
我认为这是由于 MDB 下的 JMS 会话中的消息缓冲所致。尝试将 consumerWindowSize
激活配置 属性 设置为 0
,例如:
@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "java:/jms/queue/MessageQueue"),
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "consumerWindowSize", propertyValue = "0")
})
public class MessageConsumerTest implements MessageListener {
(...)
}
此设置将在 ActiveMQ Artemis documentation 中进一步讨论。
我正在尝试创建一个带有 EJB 注释的简单 MDB,以便我可以异步执行任务。
有数以千计的慢任务要执行,硬件有很多处理器和 RAM,所以我需要 运行 以并发方式(在许多线程中)执行它们。它在开始时以这种方式工作(在许多线程中),但是在一些消息之后它“缩小”并且一次只处理一条消息。
这些是一些相关信息:
我正在使用 Java 1.8.0_221 和 WildFly 19.1.0.
这是我的 MDB 消费者:
@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "java:/jms/queue/MessageQueue"),
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue")
})
public class MessageConsumerTest implements MessageListener {
(...)
}
这个消费者有一些注入的依赖。他们中的大多数有一个
@Stateless
注释,一些有一个@Singleton
注释。对于单例依赖,它们都有@ConcurrencyManagement(ConcurrencyManagementType.BEAN)
注释。在
standalone-full.xml
中声明的应用程序数据源有一个jta="false"
参数。我的队列是非持久化的(如果 WildFly 停止,生产者会检查并再次重新发送所有待处理的消息),所以这就是我在生产者中所做的:
@Inject
@JMSConnectionFactory("java:jboss/DefaultJMSConnectionFactory")
private JMSContext context;
@Resource(mappedName = "java:/jms/queue/MessageQueue")
private Queue queue;
(...)
context.createProducer().setDeliveryMode(DeliveryMode.NON_PERSISTENT).send(queue, msg);
我尝试更改很多东西(standalone-full.xml
中的池大小、MDB 使用者中的 @ActivationConfigProperty
注释、-D....
参数),但是 none工作了。结果总是一样的:MDB 开始并发处理许多对象,但下降到一个。
改变这种行为的正确方法是什么and/or做更深入的分析?
提前致谢!
更多信息: 我试图在“jboss-cli”应用程序中检查队列 运行ning 命令,结果如下:
一开始,有 3 个“ServerConsumer”(这是预期的行为):
[standalone@localhost:9990 /] jms-queue list-delivering-messages --queue-address=MessageQueue
consumerName=ServerConsumer [id=f3e141c0-b56d-11ea-8030-b8aeed89da5a:f3e1b6f2-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
elements
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378547L
expiration=0
type=3
priority=4
userID=ID:227e8bd1-b56e-11ea-8030-b8aeed89da5a
timestamp=1592929521797L
consumerName=ServerConsumer [id=f3ea1b75-b56d-11ea-8030-b8aeed89da5a:f3ea4287-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
elements
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378523L
expiration=0
type=3
priority=4
userID=ID:f63d0c39-b56d-11ea-8030-b8aeed89da5a
timestamp=1592929447548L
consumerName=ServerConsumer [id=f3e86dc2-b56d-11ea-8030-b8aeed89da5a:f3e93114-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
elements
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378520L
expiration=0
type=3
priority=4
userID=ID:f6170da8-b56d-11ea-8030-b8aeed89da5a
timestamp=1592929447299L
一段时间后,只有 2 个 ServerConsumer
实例,但其中一个似乎是两条消息:
[standalone@localhost:9990 /] jms-queue list-delivering-messages --queue-address=MessageQueue
consumerName=ServerConsumer [id=f3d29bb3-b56d-11ea-8030-b8aeed89da5a:f3d66c45-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
elements
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378580L
expiration=0
type=3
priority=4
userID=ID:6a77afd0-b56e-11ea-8030-b8aeed89da5a
timestamp=1592929642548L
consumerName=ServerConsumer [id=f3e86dc2-b56d-11ea-8030-b8aeed89da5a:f3e93114-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
elements
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378520L
expiration=0
type=3
priority=4
userID=ID:f6170da8-b56d-11ea-8030-b8aeed89da5a
timestamp=1592929447299L
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378565L
expiration=0
type=3
priority=4
userID=ID:34d2c6c7-b56e-11ea-8030-b8aeed89da5a
timestamp=1592929552548L
再过一段时间,只有一个 ServerConsumer
实例,它似乎在处理所有消息。至此,并发就没了!
[standalone@localhost:9990 /] jms-queue list-delivering-messages --queue-address=MessageQueue
consumerName=ServerConsumer [id=f3e86dc2-b56d-11ea-8030-b8aeed89da5a:f3e93114-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
elements
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378520L
expiration=0
type=3
priority=4
userID=ID:f6170da8-b56d-11ea-8030-b8aeed89da5a
timestamp=1592929447299L
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378565L
expiration=0
type=3
priority=4
userID=ID:34d2c6c7-b56e-11ea-8030-b8aeed89da5a
timestamp=1592929552548L
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378610L
expiration=0
type=3
priority=4
userID=ID:9751c5ea-b56e-11ea-8030-b8aeed89da5a
timestamp=1592929717797L
我认为这是由于 MDB 下的 JMS 会话中的消息缓冲所致。尝试将 consumerWindowSize
激活配置 属性 设置为 0
,例如:
@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "java:/jms/queue/MessageQueue"),
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "consumerWindowSize", propertyValue = "0")
})
public class MessageConsumerTest implements MessageListener {
(...)
}
此设置将在 ActiveMQ Artemis documentation 中进一步讨论。