客户端工具上定义的主题未使用 ActiveMQ Artemis 消息 MQTT.fx

ActiveMQ Artemis messages not being consumed by topics defined on client tool MQTT.fx

我的用例保证使用发布-订阅模型,因此我使用了在我的 Spring 引导客户端中定义的主题。我已经使用工具 MQTT.fx 和两个 Spring 启动客户端来测试东西。两个 Spring 引导客户端能够相互通信,但 MQTT.fx 客户端仅连接到代理,不产生或使用消息。两个 Spring 引导客户端都连接到端口 61616,MQTT.fx 客户端连接到端口 1883.

对于我的项目,我需要让 MQTT.fx 客户端正常工作,因为它高度代表了旨在与服务器一起工作的微控制器。 MQTT.fx 上的故障是微控制器上的故障。

我的broker.xml文件中的acceptor配置如下所示。

<acceptors>
     <!-- Acceptor for every supported protocol -->
     <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor>

     <!-- AMQP Acceptor.  Listens on default AMQP port for AMQP traffic.-->
     <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>

     <!-- STOMP Acceptor. -->
     <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>

     <!-- HornetQ Compatibility Acceptor.  Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
     <acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>

     <!-- MQTT Acceptor -->
     <acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
</acceptors>

Spring引导客户端通信时的日志如下。首先,一条消息被发送到主题 server.weatherForecast,随后,经过一些处理,一个响应被发送到 AMEBAA000105.device.weatherForecast.

2021-10-04 14:14:04,860 [AUDIT](Thread-8 (activemq-netty-threads)) 

AMQ601715: User admin(admins)@127.0.0.1:1125 successfully authenticated
2021-10-04 14:14:04,860 [AUDIT](Thread-8 (activemq-netty-threads)) AMQ601267: User admin(admins)@127.0.0.1:1125 is creating a core session on target resource ActiveMQServerImpl::name=0.0.0.0 [with parameters: [3b141c41-24ef-11ec-aa52-00155d831300, null, ****, 102400, RemotingConnectionImpl [ID=6280c69f, clientID=null, nodeID=b719c384-1d0a-11ec-8b7c-00155d831300, transportConnection=org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection@417b6fef[ID=6280c69f, local= /127.0.0.1:61616, remote=/127.0.0.1:1125]], true, true, false, false, null, org.apache.activemq.artemis.core.protocol.core.impl.CoreSessionCallback@103d417f, true, OperationContextImpl [61828916] [minimalStore=9223372036854775807, storeLineUp=0, stored=0, minimalReplicated=9223372036854775807, replicationLineUp=0, replicated=0, paged=0, minimalPage=9223372036854775807, pageLineUp=0, errorCode=-1, errorMessage=null, executorsPending=0, executor=OrderedExecutor(tasks=[])], {}]]
2021-10-04 14:14:04,864 [AUDIT](Thread-7 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl@b62d79)) AMQ601500: User admin(admins)@127.0.0.1:1125 is sending a message CoreMessage[messageID=528267,durable=true,userID=3b146a62-24ef-11ec-aa52-00155d831300,priority=4, timestamp=Mon Oct 04 14:14:04 IST 2021,expiration=0, durable=true, address=jms.topic.server.weatherForecast,size=314,properties=TypedProperties[__AMQ_CID=3b13ce1f-24ef-11ec-aa52-00155d831300]]@1994364957, with Context: RoutingContextImpl(Address=null, routingType=null, PreviousAddress=null previousRoute:null, reusable=null, version=0)
..................................................

2021-10-04 14:14:04,865 [AUDIT](Thread-7 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl@b62d79)) AMQ601501: User admin(admins)@127.0.0.1:1110 is consuming a message from 41207748-6ed3-42d2-b75e-044805212686: Reference[528267]:RELIABLE:CoreMessage[messageID=528267,durable=true,userID=3b146a62-24ef-11ec-aa52-00155d831300,priority=4, timestamp=Mon Oct 04 14:14:04 IST 2021,expiration=0, durable=true, address=jms.topic.server.weatherForecast,size=314,properties=TypedProperties[__AMQ_CID=3b13ce1f-24ef-11ec-aa52-00155d831300]]@1994364957
2021-10-04 14:14:04,868 [AUDIT](Thread-4 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl@b62d79)) AMQ601502: User admin(admins)@127.0.0.1:1110 is acknowledging a message from 41207748-6ed3-42d2-b75e-044805212686: CoreMessage[messageID=528267,durable=true,userID=3b146a62-24ef-11ec-aa52-00155d831300,priority=4, timestamp=Mon Oct 04 14:14:04 IST 2021,expiration=0, durable=true, address=jms.topic.server.weatherForecast,size=314,properties=TypedProperties[__AMQ_CID=3b13ce1f-24ef-11ec-aa52-00155d831300]]@1994364957
2021-10-04 14:14:08,059 [AUDIT](Thread-8 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl@b62d79)) AMQ601500: User admin(admins)@127.0.0.1:1110 is sending a message CoreMessage[messageID=528349,durable=true,userID=3cfc1623-24ef-11ec-aa52-00155d831300,priority=4, timestamp=Mon Oct 04 14:14:08 IST 2021,expiration=0, durable=true, address=null,size=190,properties=TypedProperties[__AMQ_CID=d04d0e87-24ee-11ec-aa52-00155d831300]]@600964226, with Context: RoutingContextImpl(Address=jms.topic.AMEBAA000105.device.weatherForecast, routingType=null, PreviousAddress=jms.topic.AMEBAA000105.device.weatherForecast previousRoute:null, reusable=null, version=0)
..................................................

使用MQTT.fx时的日志如下。最初订阅 AMEBAA000105.device.weatherForecast,然后向 server.weatherForecast 发送消息。 Spring 用于接收这些消息的引导客户端未接收到任何内容,因此不会向 AMEBAA000105.device.weatherForecast 发送任何内容。当另一个 Spring 引导客户端触发从 Spring 引导客户端向 AMEBAA000105.device.weatherForecast 发布消息时,MQTT.fx 不会捕获该消息。

2021-10-04 14:24:35,443 [AUDIT](Thread-17 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl@b62d79)) AMQ601265: User admin(admins)@192.168.0.107:18640 is creating a core consumer on target resource ServerSessionImpl() [with parameters: [534062, MQTT_FX_Client.AMEBAA000105.device.weatherForecast, null, 0, false, false, -1]]
2021-10-04 14:24:43,719 [AUDIT](Thread-10 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl@b62d79)) AMQ601500: User admin(admins)@192.168.0.107:18640 is sending a message CoreMessage[messageID=534380,durable=false,userID=null,priority=0, timestamp=0,expiration=0, durable=false, address=server.weatherForecast,size=200,properties=TypedProperties[mqtt.message.retain=false,mqtt.qos.level=0]]@1214406991, with Context: RoutingContextImpl(Address=server.weatherForecast, routingType=null, PreviousAddress=null previousRoute:null, reusable=null, version=0)

以上日志为审计日志。在 artemis.log 文件中,MQTT.fx 一直在 ping 代理。

2021-10-04 14:30:43,727 TRACE [org.apache.activemq.artemis.core.protocol.mqtt] MQTT(MQTT_FX_Client): OUT >> PINGRESP
2021-10-04 14:31:43,728 TRACE [org.apache.activemq.artemis.core.protocol.mqtt] MQTT(MQTT_FX_Client): OUT >> PINGRESP
2021-10-04 14:32:43,729 TRACE [org.apache.activemq.artemis.core.protocol.mqtt] MQTT(MQTT_FX_Client): OUT >> PINGRESP
2021-10-04 14:33:43,731 TRACE [org.apache.activemq.artemis.core.protocol.mqtt] MQTT(MQTT_FX_Client): OUT >> PINGRESP
2021-10-04 14:34:43,732 TRACE [org.apache.activemq.artemis.core.protocol.mqtt] MQTT(MQTT_FX_Client): OUT >> PINGRESP
2021-10-04 14:35:43,733 TRACE [org.apache.activemq.artemis.core.protocol.mqtt] MQTT(MQTT_FX_Client): OUT >> PINGRESP

artemis.log 文件中的 MQTT.fx 订阅或发布时的日志如下

2021-10-04 14:22:27,702 TRACE [org.apache.activemq.artemis.core.protocol.mqtt] MQTT(MQTT_FX_Client): IN << SUBSCRIBE(3)
AMEBAA000105.device.weatherForecast : AT_MOST_ONCE
2021-10-04 14:24:43,718 TRACE [org.apache.activemq.artemis.core.protocol.mqtt] MQTT(MQTT_FX_Client): IN << PUBLISH(-1) topic=server.weatherForecast, qos=AT_MOST_ONCE, retain=false, dup=false, payload={"serialNumber" : "AMEBAA000105"}

非常感谢任何帮助。

consumeWeatherForecastRequest 方法的签名更改为 private void consumeWeatherForecastRequest(String incomingMessage) 应该可以解决您的问题。

您说当您测试 Spring 引导客户端时,您会向 server.weatherForecast 发送一条消息,并在经过一些处理后将响应发送到 AMEBAA000105.device.weatherForecast。但是,不是日志显示的内容。日志表明您分别向 jms.topic.server.weatherForecastjms.topic.AMEBAA000105.device.weatherForecast 发送了一条消息。注意 jms.topic. 前缀。

我相信这是问题的根源,因为根据日志,MQTT 实际上 使用 server.weatherForecastAMEBAA000105.device.weatherForecast。由于这两个客户端没有使用匹配的名称,因此它们永远无法与您当前的配置一起使用。

我的猜测是您在 Spring 引导应用程序中使用旧版 Artemis 1.x 客户端,这就是使用 jms.topic. 前缀的原因。您应该移动到更新的客户端(例如与您正在使用的代理版本相匹配的客户端)或配置 anycastPrefixmulticastPrefix 以支持旧版 1.x 客户端。默认 broker.xml 有一条注释解释了如何做到这一点:

<!-- Note: If an acceptor needs to be compatible with HornetQ and/or Artemis 1.x clients add
           "anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to the acceptor url.
           See https://issues.apache.org/jira/browse/ARTEMIS-1644 for more information. -->

此评论默认出现在 acceptors 块中,但您似乎已将其删除,因为它不在您粘贴的 XML 块中。

如果要确认客户端使用的是哪个库,最简单的方法就是检查客户端的环境。但是,如果这不起作用并且您可以访问代理日志,那么您可以 enable TRACE logging 获取 org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl,然后查看 CreateSessionMessage 中传递的 version。如果它 < 131 则它不是 2.18.0 客户端。