Qpid 客户端 Apache Artemis 2.14.0 高可用性无法正常工作

Qpid client Apache Artemis 2.14.0 High availability not working

使用 Qpid 客户端连接 Apache Artemis 代理以实现高可用性。

代理实例在两个节点中运行,配置在 broker.xml

中列出了复制

代理实例在 node1(主)和 node2(从)上启动,运行 没有任何问题。

camel qpid jms 客户端配置为 URL 作为 failover:(amqp://localhost:5672,amqp://localhost:5673),在执行 camel 客户端时上下文启动没有任何问题,并且还注意到 Broker UI 控制台中的连接列为 AMQP 协议。 [下面提供了配置详细信息]

使用以下配置一切正常。

为了测试高可用性,我停止了节点 1 上的代理实例,并期望 Qpid Camel 客户端自动发现节点 2 代理并处理消息。但是它没有按预期连接。

但是当我使用带有 URL 的 aretmis-jms camel 客户端(包括 tcp 方案连接)时,我能够成功验证高可用性,当节点 1 中的代理 运行 由于某种原因客户端停止时自动发现 node2 上的代理。另外当node1启动后,备份客户端自动连接到node1。

Qpid 客户端无法发现备份代理。以下配置中的任何问题

master: 详细配置参考

<ha-policy>
   <replication>
      <master>
         <check-for-live-server>true</check-for-live-server>
      </master>
   </replication>
</ha-policy>

奴隶:

<ha-policy>
   <replication>
      <slave>
         <allow-failback>true</allow-failback>
      </slave>
   </replication>
</ha-policy>

客户使用骆驼

<bean id="jmsampqConnectionFactory" class="org.apache.qpid.jms.JmsConnectionFactory">
       <property name="remoteURI" value="failover:(ampq://localhost:5672,ampq://localhost:5673)" />
       <property name="username" value="user"/>
       <property name="password" value="pass"/>
   </bean>

  <bean id="jmsPooledConnectionFactory" class="org.messaginghub.pooled.jms.JmsPoolConnectionFactory" init-method="start" destroy-method="stop">
    <property name="maxConnections" value="5" />
    <property name="connectionFactory" ref="jmsamqpConnectionFactory" />
  </bean>

  <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="jmsPooledConnectionFactory" />
    <property name="concurrentConsumers" value="5" />
  </bean>

   <bean id="jms" class="org.apache.camel.component.amqp.AMQPComponent">
    <property name="configuration" ref="jmsConfig" />
  </bean>
  
   <bean id="CustomBean1" class="org.specific.process.class" /> 
   <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
    <route>
      <from uri="jms:queue:enterprise1.queue" />
      <convertBodyTo type="java.lang.String" />
      <bean ref="CustomBean1" method="processCamelExchangeData" />
    </route>
    </camelContext>

maven 依赖

   <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-amqp-starter</artifactId>
      <version>2.23.0</version>
    </dependency>
    <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-jms-client</artifactId>
       <version>0.54.0</version>
     </dependency> 
     <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>artemis-jms-client</artifactId>
      <version>2.14.0</version>
    </dependency>
    <dependency>
      <groupId>org.messaginghub</groupId>
      <artifactId>pooled-jms</artifactId>
      <version>1.1.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-amqp</artifactId>
      <version>2.23.0</version> 
    </dependency>

日志:

2020-09-17 20:15:18,684: main DEBUG (FailoverProvider.java:153) - Initiating initial connection attempt task
2020-09-17 20:15:18,690: main DEBUG (AbstractJmsListeningContainer.java:382) - Established shared JMS Connection
2020-09-17 20:15:18,692: main DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@3962ec84
2020-09-17 20:15:18,692: main DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@147e0734
2020-09-17 20:15:18,693: main DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@2bdab835
2020-09-17 20:15:18,694: main DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@7b8aebd0
2020-09-17 20:15:18,695: main DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@55222ee9
2020-09-17 20:15:18,743: FailoverProvider: async work thread DEBUG (FailoverProvider.java:744) - Connection attempt:[1] to: amqp://localhost:5672 in-progress
2020-09-17 20:15:19,102: AmqpProvider :(1):[amqp://localhost:5672] DEBUG (SaslMechanismFinder.java:106) - Best match for SASL auth was: SASL-PLAIN
2020-09-17 20:15:19,119: FailoverProvider: async work thread DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsConnectionInfo { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1, configuredURI = failover:(amqp://localhost:5672,amqp://localhost:5673), connectedURI = null } (1)
2020-09-17 20:15:19,162: AmqpProvider :(1):[amqp://localhost:5672] DEBUG (AmqpConnectionBuilder.java:84) - AmqpConnection { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1 } is now open:
2020-09-17 20:15:19,164: AmqpProvider :(1):[amqp://localhost:5672] DEBUG (FailoverProvider.java:884) - Processing alternates uris:URI Pool { [amqp://localhost:5673, amqp://localhost:5672] } with new set: [amqp://localhost:61617?amqp.vhost=localhost]
2020-09-17 20:15:19,164: AmqpProvider :(1):[amqp://localhost:5672] DEBUG (FailoverProvider.java:899) - Replacing uris:URI Pool { [amqp://localhost:5673, amqp://localhost:5672] } with new set: [amqp://localhost:5672, amqp://localhost:61617?amqp.vhost=localhost]
2020-09-17 20:15:19,164: AmqpProvider :(1):[amqp://localhost:5672] DEBUG (FailoverProvider.java:913) - Processing alternates done new uris:URI Pool { [amqp://localhost:5672, amqp://localhost:61617?amqp.vhost=localhost] }
2020-09-17 20:15:19,165: AmqpProvider :(1):[amqp://localhost:5672] INFO (JmsConnection.java:1339) - Connection ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1 connected to server: amqp://localhost:5672
2020-09-17 20:15:19,168: main DEBUG (JmsConsumer.java:106) - Started listener container org.apache.camel.component.jms.DefaultJmsMessageListenerContainer@5e9bf744 on destination enterprise1.queue
2020-09-17 20:15:19,168: main INFO (DefaultCamelContext.java:4013) - Route: route1 started and consuming from: jms://queue:enterprise1.queue
2020-09-17 20:15:19,169: main DEBUG (DefaultCamelContext.java:3989) - Route: route2 >>> EventDrivenConsumerRoute[jmsTopic://topic:enterprise2.topic -> Pipeline[[Channel[convertBodyTo[java.lang.String]], Channel[org.apache.camel.component.bean.BeanProcessor@69d103f0]]]]
2020-09-17 20:15:19,169: main DEBUG (DefaultCamelContext.java:3993) - Starting consumer (order: 1001) on route: route2
2020-09-17 20:15:19,216: Camel (camel) thread #4 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsSessionInfo { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:1 } (2)
2020-09-17 20:15:19,216: Camel (camel) thread #5 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsSessionInfo { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:2 } (3)
2020-09-17 20:15:19,216: Camel (camel) thread #6 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsSessionInfo { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:3 } (4)
2020-09-17 20:15:19,217: Camel (camel) thread #7 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsSessionInfo { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:4 } (5)
2020-09-17 20:15:19,238: Camel (camel) thread #8 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsSessionInfo { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:5 } (6)
2020-09-17 20:15:19,361: Camel (camel) thread #4 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsConsumerInfo: { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:1:1, destination = enterprise1.queue } (7)
2020-09-17 20:15:19,362: main DEBUG (DefaultManagementAgent.java:470) - Registered MBean with ObjectName: org.apache.camel:context=camel,type=consumers,name=JmsConsumer(0x1132baa3)
2020-09-17 20:15:19,362: main DEBUG (DefaultConsumer.java:144) - Starting consumer: Consumer[jmsTopic://topic:enterprise2.topic]
2020-09-17 20:15:19,364: main DEBUG (FailoverProvider.java:153) - Initiating initial connection attempt task
2020-09-17 20:15:19,365: main DEBUG (AbstractJmsListeningContainer.java:382) - Established shared JMS Connection
...
2020-09-17 20:15:22,486: Camel (camel) thread #7 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: start -> JmsConsumerInfo: { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:4:1, destination = enterprise1.queue } (40)
2020-09-17 20:15:22,490: Camel (camel) thread #5 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: message pull -> ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:2:1 (41)
2020-09-17 20:15:22,491: Camel (camel) thread #4 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: message pull -> ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:1:1 (42)
...

客户端正在从其连接的 Artemis 服务器接收已知代理 URI 的更新,该服务器似乎只知道通告的服务器 运行 端口 61617。这导致客户端用日志中显示的代理返回的新集替换 URI:

Replacing uris:URI Pool { [amqp://localhost:5673, amqp://localhost:5672] } with new set: [amqp://localhost:5672, amqp://localhost:61617?amqp.vhost=localhost]

因此,如果要重新连接的正确服务器是端口 5673 上的原始备用服务器,那么它将永远不会尝试该服务器,因为第一个代理告诉它它发送备用服务器的唯一已知代理。您可以通过将下面的故障转移 URI 选项设置为 ADDIGNORE.[= 来更改客户端行为以不替换其已知主机的原始配置。 12=]

failover.amqpOpenServerListAction 控制当来自远程对等方的连接开放帧向客户端提供故障转移主机列表时故障转移传输的行为方式。此选项接受三个值之一; REPLACE、ADD 或 IGNORE(默认为 REPLACE)。如果配置了 REPLACE,则除当前服务器的故障转移 URI 之外的所有故障转移 URI 都将替换为远程对等方提供的故障转移 URI。如果配置了 ADD,则远程提供的 URI 将添加到现有的故障转移 URI 集,de-duplication。如果配置了 IGNORE,那么来自远程的任何更新都会被丢弃,并且不会对正在使用的故障转移 URI 集进行任何更改。

有关详细信息,请参阅 Qpid JMS 客户端 configuration page