Artemis qpid 消费者无法与经纪人建立联系

Artemis qpid consumer Not able to make connection with the broker

下面是Qpid客户端的配置,使用Camel上下文连接Artemis broker

上下文文件按预期启动,但 qpid 客户端在这种情况下无法连接到 Apache Artemis 2.14.0 代理。

qpid客户端连接不上broker服务器,存在Thread Blocking。我不确定客户端实际发生了什么。

但我们注意到,对于另一项服务,我们使用单个 spring camel 上下文文件,其中包含按预期工作的所有队列和引用。​​

生产者使用 producerTemplate 通过 camelContextAware 实现推送消息数据。

队列中的生产者和消费者是否有可能在这里发生冲突?

我所看到的只是消费者端的一些线程块。

普通-context.xml

...
 <bean id="jmsConnectionFactory" class="org.apache.qpid.jms.JmsConnectionFactory">
            <property name="remoteURI" value="failover:(amqp://host1:5672,amqp://host2:5672)"/>
    </bean>
 <bean id="pooledConnectionFactory" class="org.messaginghub.pooled.jms.JmsPoolConnectionFactory"
                    init-method="start" destroy-method="stop" >
        <property name="maxConnections" value="3" />
        <property name="connectionFactory" ref="jmsConnectionFactory" />
 </bean>
 <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
        <property name="connectionFactory" ref="pooledConnectionFactory" />
        <property name="concurrentConsumers" value="3" />
 </bean>
 <bean id="env-queue" class="org.apache.camel.component.amqp.AMQPComponent">
        <property name="configuration" ref="jmsConfig" />
 </bean>
...

服务-context.xml 注意:在这个 xml 文件中有两个 camel 上下文,即使所有上下文都得到了,使用 camel 2.20.0 版本

...
< import resource="classpath:/common-context.xml"/>
...
<route id="messageProcessor" shutdownRoute="Default" shutdownRunningTask="CompleteAllTasks">
    <from uri="env-queue:queue:order-demo-queue" id="OrderInfo"/>
    <camel:setProperty propertyName="orderType">
       <camel:jsonpath trim="true">$.orderType</camel:jsonpath>
    </camel:setProperty>
...

日志:

2020-10-05 23:25:21,078: Thread-1 DEBUG (DefaultCamelContext.java:3993) - Starting consumer (order: 1000) on route: messageProcessor
2020-10-05 23:25:21,079: Thread-1 DEBUG (DefaultManagementAgent.java:470) - Registered MBean with ObjectName: org.apache.camel:context=orderventprocesor,type=consumers,name=JmsConsumer(0x3fed52a3)
2020-10-05 23:25:21,079: Thread-1 DEBUG (DefaultConsumer.java:144) - Starting consumer: Consumer[env-queue://queue:order-demo-queue]
2020-10-05 23:25:21,138: Thread-1 DEBUG (FailoverProvider.java:153) - Initiating initial connection attempt task
2020-10-05 23:25:21,140: FailoverProvider: async work thread DEBUG (FailoverProvider.java:744) - Connection attempt:[1] to: amqp://host1:5672 in-progress
2020-10-05 23:25:21,145: Thread-1 DEBUG (AbstractJmsListeningContainer.java:382) - Established shared JMS Connection
2020-10-05 23:25:21,145: Thread-1 DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@51524bce
2020-10-05 23:25:21,146: Thread-1 DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@708b22ca
2020-10-05 23:25:21,147: Thread-1 DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@314f82d7
2020-10-05 23:25:21,148: Thread-1 DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@73719eb3
2020-10-05 23:25:21,148: Thread-1 DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@30beb728
2020-10-05 23:25:21,155: FailoverProvider: async work thread INFO (FailoverProvider.java:751) - Connection attempt:[1] to: amqp://host1:5672 failed
2020-10-05 23:25:21,156: FailoverProvider: async work thread DEBUG (FailoverProvider.java:744) - Connection attempt:[1] to: amqp://host2:5672 in-progress
2020-10-05 23:25:21,157: FailoverProvider: async work thread INFO (FailoverProvider.java:751) - Connection attempt:[1] to: amqp://host2:5672 failed
2020-10-05 23:25:21,167: FailoverProvider: async work thread DEBUG (FailoverProvider.java:744) - Connection attempt:[2] to: amqp://host1:5672 in-progress

线程转储(如下所示)

Camel (orderventprocesor) thread #5 - JmsConsumer[order-demo-queue]" #45 daemon prio=5 os_prio=0 tid=0x00007faf395b3000 nid=0x58e3 waiting for monitor entry [0x00007faf1290c000]
INFO   | jvm 1    | 2020/10/06 00:01:38 |    java.lang.Thread.State: BLOCKED (on object monitor)
INFO   | jvm 1    | 2020/10/06 00:01:38 |       at org.springframework.jms.listener.AbstractJmsListeningContainer.getSharedConnection(AbstractJmsListeningContainer.java:492)
INFO   | jvm 1    | 2020/10/06 00:01:38 |       - waiting to lock <0x00000007ba976500> (a java.lang.Object)
INFO   | jvm 1    | 2020/10/06 00:01:38 |       at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.initResourcesIfNecessary(DefaultMessageListenerContainer.java:1170)
INFO   | jvm 1    | 2020/10/06 00:01:38 |       at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1149)
INFO   | jvm 1    | 2020/10/06 00:01:38 |       at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1142)
INFO   | jvm 1    | 2020/10/06 00:01:38 |       at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1039)
INFO   | jvm 1    | 2020/10/06 00:01:38 |       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
INFO   | jvm 1    | 2020/10/06 00:01:38 |       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
INFO   | jvm 1    | 2020/10/06 00:01:38 |       at java.lang.Thread.run(Thread.java:748)

已更新:

当我将 remoteUri 更改为简单 amqp://localhost:5672 时,我在 netty-resolver

上看到异常
 java.lang.NoClassDefFoundError: "io/netty/resolver/AddressResolverGroup"

包含 netty-resolver 4.1.51 jar 后,现在我看到以下异常。

2020-10-09 15:11:04.944 Camel (orderventprocesor) thread #6 - JmsConsumer[order-demo-queue] ERROR JmsConnection:165 - Failed to connect to remote at: amqp://localhost:5672
2020-10-09 15:11:04.946 Camel (orderventprocesor) thread #6 - JmsConsumer[order-demo-queue] ERROR DefaultJmsMessageListenerContainer:934 - Could not refresh JMS Connection for destination 'order-demo-queue' - retrying using FixedBackOff{interval=5000, currentAttempts=0, maxAttempts=unlimited}. Cause: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
javax.jms.JMSException: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
    at org.apache.qpid.jms.provider.ProviderException.toJMSException(ProviderException.java:34)
    at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80)
    at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112)
    at org.apache.qpid.jms.JmsConnection.connect(JmsConnection.java:176)
    at org.apache.qpid.jms.JmsConnectionFactory.createConnection(JmsConnectionFactory.java:212)
    at org.apache.qpid.jms.JmsConnectionFactory.createConnection(JmsConnectionFactory.java:199)
    at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createProviderConnection(JmsPoolConnectionFactory.java:651)
    at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.makeObject(JmsPoolConnectionFactory.java:108)
    at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.makeObject(JmsPoolConnectionFactory.java:105)
    at org.apache.commons.pool2.impl.GenericKeyedObjectPool.create(GenericKeyedObjectPool.java:1041)
    at org.apache.commons.pool2.impl.GenericKeyedObjectPool.addObject(GenericKeyedObjectPool.java:1221)
    at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createJmsPoolConnection(JmsPoolConnectionFactory.java:705)
    at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createConnection(JmsPoolConnectionFactory.java:237)
    at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createConnection(JmsPoolConnectionFactory.java:232)
    at org.springframework.jms.support.JmsAccessor.createConnection(JmsAccessor.java:180)
    at org.springframework.jms.listener.AbstractJmsListeningContainer.createSharedConnection(AbstractJmsListeningContainer.java:413)
    at org.springframework.jms.listener.AbstractJmsListeningContainer.refreshSharedConnection(AbstractJmsListeningContainer.java:398)
    at org.springframework.jms.listener.DefaultMessageListenerContainer.refreshConnectionUntilSuccessful(DefaultMessageListenerContainer.java:915)
    at org.springframework.jms.listener.DefaultMessageListenerContainer.recoverAfterListenerSetupFailure(DefaultMessageListenerContainer.java:890)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1061)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.qpid.jms.provider.exceptions.ProviderIOException: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
    at org.apache.qpid.jms.provider.exceptions.ProviderExceptionSupport.createOrPassthroughFatal(ProviderExceptionSupport.java:46)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.connect(AmqpProvider.java:308)
    at org.apache.qpid.jms.JmsConnection.connect(JmsConnection.java:162)
    ... 19 more
Caused by: java.lang.AbstractMethodError: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
    at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:84)
    at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:58)
    at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:47)
    at io.netty.channel.MultithreadEventLoopGroup.<init>(MultithreadEventLoopGroup.java:50)
    at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:70)
    at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:65)
    at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:56)
    at org.apache.qpid.jms.transports.netty.NettyTcpTransport.connect(NettyTcpTransport.java:151)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.connect(AmqpProvider.java:230)
    ... 20 more
2020-10-09 15:11:09.949 Camel (orderventprocesor) thread #6 - JmsConsumer[order-demo-queue] ERROR JmsConnection:165 - Failed to connect to remote at: amqp://localhost:5672
2020-10-09 15:11:09.950 Camel (orderventprocesor) thread #6 - JmsConsumer[order-demo-queue] ERROR DefaultJmsMessageListenerContainer:934 - Could not refresh JMS Connection for destination 'order-demo-queue' - retrying using FixedBackOff{interval=5000, currentAttempts=1, maxAttempts=unlimited}. Cause: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
javax.jms.JMSException: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
    at org.apache.qpid.jms.provider.ProviderException.toJMSException(ProviderException.java:34)
    at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80)
    at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112)
    at org.apache.qpid.jms.JmsConnection.connect(JmsConnection.java:176)
    at org.apache.qpid.jms.JmsConnectionFactory.createConnection(JmsConnectionFactory.java:212)
    at org.apache.qpid.jms.JmsConnectionFactory.createConnection(JmsConnectionFactory.java:199)
    at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createProviderConnection(JmsPoolConnectionFactory.java:651)
    at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.makeObject(JmsPoolConnectionFactory.java:108)
    at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.makeObject(JmsPoolConnectionFactory.java:105)
    at org.apache.commons.pool2.impl.GenericKeyedObjectPool.create(GenericKeyedObjectPool.java:1041)
    at org.apache.commons.pool2.impl.GenericKeyedObjectPool.addObject(GenericKeyedObjectPool.java:1221)
    at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createJmsPoolConnection(JmsPoolConnectionFactory.java:705)
    at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createConnection(JmsPoolConnectionFactory.java:237)
    at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createConnection(JmsPoolConnectionFactory.java:232)
    at org.springframework.jms.support.JmsAccessor.createConnection(JmsAccessor.java:180)
    at org.springframework.jms.listener.AbstractJmsListeningContainer.createSharedConnection(AbstractJmsListeningContainer.java:413)
    at org.springframework.jms.listener.AbstractJmsListeningContainer.refreshSharedConnection(AbstractJmsListeningContainer.java:398)
    at org.springframework.jms.listener.DefaultMessageListenerContainer.refreshConnectionUntilSuccessful(DefaultMessageListenerContainer.java:915)
    at org.springframework.jms.listener.DefaultMessageListenerContainer.recoverAfterListenerSetupFailure(DefaultMessageListenerContainer.java:890)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1061)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.qpid.jms.provider.exceptions.ProviderIOException: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
    at org.apache.qpid.jms.provider.exceptions.ProviderExceptionSupport.createOrPassthroughFatal(ProviderExceptionSupport.java:46)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.connect(AmqpProvider.java:308)
    at org.apache.qpid.jms.JmsConnection.connect(JmsConnection.java:162)
    ... 19 more
Caused by: java.lang.AbstractMethodError: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
    at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:84)
    at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:58)
    at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:47)
    at io.netty.channel.MultithreadEventLoopGroup.<init>(MultithreadEventLoopGroup.java:50)
    at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:70)
    at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:65)
    at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:56)
    at org.apache.qpid.jms.transports.netty.NettyTcpTransport.connect(NettyTcpTransport.java:151)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.connect(AmqpProvider.java:230)
    ... 20 more

Qpid 客户端正在尝试使用配置的故障转移机制进行连接。客户端似乎无法连接,这可能是由于没有代理服务 运行 或根本无法访问,或者可能是由于配置错误,例如与 non-SSL 接受器的 SSL 连接或相反。如果还没有连接,客户端将在连接开始时阻塞,以符合 JMS 规范要求。您需要更深入地了解为什么无法联系到您的经纪人。

当使用 amqp 的故障转移配置时,客户端在主机之间交替并且不打印任何异常日志。

更新了 ConnectionFactory、remoteUrl 以使用单个主机 (amqp://localhost:5672),并注意到异常发生在 netty.io.resolver。

我添加了以下依赖项,现在连接已按预期建立。

    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-resolver</artifactId>
      <version>4.1.51.Final</version>
    </dependency>

   <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-transport-native-epoll</artifactId>
      <version>4.1.51.Final</version>
    </dependency>