camel jms组件如何消费队列中的消息

How does camel jms component consume messages from queue

我们的环境由 3 jboss 个服务器(门户、jms、协调)组成。

  1. 协调服务器托管骆驼路线,其中有一条路线从队列 (SLAQueue) 中消耗。
  2. JMS 服务器托管了我们所有的队列。
  3. 最近我们发现了一个错误,其中 TaskQueue 中的一些消息 托管在 JMS 服务器上的数据未传送到门户服务器上的 MDB。出于某种原因,他们 卡住了,当我们重新启动 JMS 服务器时,卡住的消息是 已交付。

为了调查,我们在“org.apache.activemq.artemis”上启用了 TRACE 级别的日志记录。我们注意到我们的 camel jms 组件和 JMS 服务器之间有很多争执。下面列出了一个聊天实例,这样的日志每 1 秒写入一次。

问题:

  1. camel jms 组件使用什么机制从队列中获取消息? JMS 组件是否每秒轮询一次队列(拉)?或者 JMS 组件在消息到达队列时得到通知?
  2. 该机制与J2EE Message driven beans 有区别吗?当消息到达队列时,MDB 会收到通知。
  3. 根据下面的讨论,我认为它正在投票。如果是轮询,可以配置轮询window吗?我尝试了 receiveTimeout 选项,但没有成功。

** camel JMS 组件和 JMS 服务器之间的日志中的示例聊天:**

2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) createdConsumer ActiveMQSession->ClientSessionImpl [name=eebaf2ec-47b3-11ec-ad21-0242ac110003, username=null, closed=false, factory = org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl@413c962e, metaData=(jms-session=,)]@72d15942 consumer=org.apache.activemq.artemis.ra.ActiveMQRAMessageConsumer@1dd2e8b
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) addConsumer(org.apache.activemq.artemis.ra.ActiveMQRAMessageConsumer@1dd2e8b)
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl] (Thread-206 (ActiveMQ-remoting-threads-ActiveMQServerImpl::serverUUID=b5ba8675-1a62-11ec-b397-0242ac110002-1886035738)) RemotingConnectionID=eeb5e9d6-47b3-11ec-ad21-0242ac110003 handling packet PACKET(SessionConsumerFlowCreditMessage)[type=70, channelID=12, responseAsync=false, requiresResponse=false, correlationID=-1, packetObject=SessionConsumerFlowCreditMessage, consumerID=5086, credits=1048576]
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) unlock()
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnection] (Thread-206 (ActiveMQ-remoting-threads-ActiveMQServerImpl::serverUUID=b5ba8675-1a62-11ec-b397-0242ac110002-1886035738)) InVMConnection [serverID=0, id=eeb5e9d6-47b3-11ec-ad21-0242ac110003]::packet sent done
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) unlock()
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) lock()
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) tryLock()
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) getUseTryLock()
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) getUseTryLock()
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) lock()
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) receive org.apache.activemq.artemis.ra.ActiveMQRAMessageConsumer@1dd2e8b timeout=120000
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) checkState()
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl@5e7ed34b{consumerContext=ActiveMQConsumerContext{id=5086}, queueName=jms.queue.SLAQueue}:: receive(120000)
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl@5e7ed34b{consumerContext=ActiveMQConsumerContext{id=5086}, queueName=jms.queue.SLAQueue}::receive(120000, false)
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler] (Thread-1288 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl@775fa129)) ServerSessionPacketHandler::handlePacket,PACKET(SessionConsumerFlowCreditMessage)[type=70, channelID=12, responseAsync=false, requiresResponse=false, correlationID=-1, packetObject=SessionConsumerFlowCreditMessage, consumerID=5086, credits=1048576]
2021-11-17 16:39:24,612 DEBUG [org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl] (Thread-1288 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl@775fa129)) ServerConsumerImpl [id=5086, filter=null, binding=LocalQueueBinding [address=jms.queue.SLAQueue, queue=QueueImpl[name=jms.queue.SLAQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=b5ba8675-1a62-11ec-b397-0242ac110002], temp=false]@3b0e1e9f, filter=null, name=jms.queue.SLAQueue, clusterName=jms.queue.SLAQueueb5ba8675-1a62-11ec-b397-0242ac110002]]::FlowControl::Received 1048576 credits, previous value = 0 currentValue = 1048576
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl] (Thread-1288 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl@775fa129)) ServerConsumerImpl [id=5086, filter=null, binding=LocalQueueBinding [address=jms.queue.SLAQueue, queue=QueueImpl[name=jms.queue.SLAQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=b5ba8675-1a62-11ec-b397-0242ac110002], temp=false]@3b0e1e9f, filter=null, name=jms.queue.SLAQueue, clusterName=jms.queue.SLAQueueb5ba8675-1a62-11ec-b397-0242ac110002]]::calling promptDelivery from receiving credits
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler] (Thread-1288 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl@775fa129)) ServerSessionPacketHandler::scheduling response::null
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler] (Thread-1288 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl@775fa129)) ServerSessionPacketHandler::regular response sent::null

是的,JMS 消费者正在轮询。默认情况下,Camel 在底层使用 Spring 的 DefaultMessageListenerContainer

Spring-Listener 本身直接使用 JMS API。

Spring 使用默认的 1 秒超时来拉取消息。我不知道你是否真的可以用 Camel 的 receiveTimeout 选项来改变它。