MQTT Retained messages 在同时从不同的应用程序订阅相同主题时未收到
MQTT Retained messages not received when subscribing same topic from different applications at the same time
TL;DR
当多个客户端同时订阅同一主题树时,并非所有客户端都按预期收到保留的消息!
详细/用例
在实际项目中,多个应用程序订阅(几乎同时,因为它们是并行启动的)同一个 MQTT 主题(带通配符)。该主题包含大约 500 条保留的消息(每条消息都在自己的子主题级别),所有应用程序都希望收到这些消息(它们订阅 QoS 1)。
除了 "configuration" 消息之外,数据主题也使用相同的 MQTT 连接订阅。不需要持久状态(这里也不需要)。因此应用程序实例连接 cleanSession=true
.
根据我的理解,如果每个应用程序实例都与固定的 clientId 连接就足够了,因为 cleanSession=true 应该避免任何状态处理。但要真正确保没有状态被视为每个连接都会生成 unique MQTT clientId
。
观察行为
不幸的是,并非所有应用程序实例都能获得保留的消息。有些应用程序实例根本无法从主题中获得任何消息——无论订阅持续多长时间。我首先认为 maxInflight(客户端)或 max_queued_messages(服务器端)配置可能是原因,但在将两者都增加到 500,000 后我想这不是失败背后的原因。
重现为测试
因此,我创建了 this github 项目并进行了重现。重现MqttSubscriptionTest
中有一个单元测试class,测试方法multiThreadSubscriptionTest
。执行此测试时,一些 (1000) 保留的消息将首先在 @BeforeClass
方法中发布。之后,将实例化并执行实现了IMqttMessageListener
和Runnable
接口的MqttSubscriber
class的10个实例。每个 MqttSubscriber 实例都将在自己的线程中执行,并带有自己的 MqttClient 实例,并将使用保留的消息订阅主题树。这将记录到控制台,如下所示:
----------- perform subscriptions
Subscriber-3 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-0 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-2 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-4 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-5 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-6 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-1 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-7 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-8 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-9 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
测试将等待一段时间,然后验证订阅。预计每个订阅者收到 1000 条保留消息:
----------- validate subscriptions
Subscriber-4: receivedMessages=1000; duration=372ms; succeeded=true
Subscriber-0: receivedMessages=1000; duration=265ms; succeeded=true
Subscriber-5: receivedMessages=1000; duration=475ms; succeeded=true
Subscriber-7: receivedMessages=0; duration=0ms; succeeded=false
Subscriber-6: receivedMessages=1000; duration=473ms; succeeded=true
Subscriber-8: receivedMessages=0; duration=0ms; succeeded=false
Subscriber-9: receivedMessages=1000; duration=346ms; succeeded=true
Subscriber-3: receivedMessages=1000; duration=243ms; succeeded=true
Subscriber-1: receivedMessages=1000; duration=470ms; succeeded=true
Subscriber-2: receivedMessages=1000; duration=357ms; succeeded=true
大多数订阅者在很短的时间内(大约数百毫秒)收到了预期的 1000 条消息。但是有些(这里 Subscriber-7/8)没有收到一条消息(持续时间为 0,因为它们从未完成)。如果给订阅者更多时间来接收消息,情况也不会好转。他们只是不会得到它们。
我不知道为什么会这样。 MQTT 代理或客户端不会显示任何错误消息。如果您能提供任何帮助,这对我来说非常有用,因为我依赖于保留消息的可靠传递。
在 GitHub 上重复:FrVaBe/MQTT/mqtt-client-showcase/
- 我使用本地 EMQ 和 HiveMQ 代理进行了测试。如果你想 运行 测试你需要 运行 你机器上的代理 localhost:1883 或更改测试中的配置 class.
- 我使用 Eclipse Paho Java 客户端 MQTT
- 我订阅了
cleanSession=true
,因为我不想有任何状态(连接用于订阅几个主题,不想传递错过的消息)
您不能在连接期间使用 cleanSession(true)
,请参阅此处的说明:http://www.steves-internet-guide.com/mqtt-retained-messages-example/
我用mosquito做测试,它的内部消息队列默认只有100个。
HiveMQ 有最大队列消息数。
Emq 配置有类似的东西 http://emqtt.io/docs/v2/config.html#mqtt-message-queue.
我修复了代码库中的示例代码。适合我。
编辑:刚刚用 Emq (docker run --rm -ti --name emq -p 18083:18083 -p 1883:1883 quodt/emq-docker:latest
) 测试了它并且工作正常。
基本上就是cleanSession:必须是false。
除此之外,测试代码中的等待状态很糟糕。它们在我的机器上太短了。使用锁存器或其他真正的同步机制。
HiveMQ 的人很友好地看了一下这个问题。他们怀疑 Paho 客户端的原因以及订阅中 IMqttMessageListener
的使用。假定的竞争条件存在一个已描述的问题 #432。
经验教训:最好使用 MqttCallback
而不是 IMqttMessageListener
TL;DR
当多个客户端同时订阅同一主题树时,并非所有客户端都按预期收到保留的消息!
详细/用例
在实际项目中,多个应用程序订阅(几乎同时,因为它们是并行启动的)同一个 MQTT 主题(带通配符)。该主题包含大约 500 条保留的消息(每条消息都在自己的子主题级别),所有应用程序都希望收到这些消息(它们订阅 QoS 1)。
除了 "configuration" 消息之外,数据主题也使用相同的 MQTT 连接订阅。不需要持久状态(这里也不需要)。因此应用程序实例连接 cleanSession=true
.
根据我的理解,如果每个应用程序实例都与固定的 clientId 连接就足够了,因为 cleanSession=true 应该避免任何状态处理。但要真正确保没有状态被视为每个连接都会生成 unique MQTT clientId
。
观察行为
不幸的是,并非所有应用程序实例都能获得保留的消息。有些应用程序实例根本无法从主题中获得任何消息——无论订阅持续多长时间。我首先认为 maxInflight(客户端)或 max_queued_messages(服务器端)配置可能是原因,但在将两者都增加到 500,000 后我想这不是失败背后的原因。
重现为测试
因此,我创建了 this github 项目并进行了重现。重现MqttSubscriptionTest
中有一个单元测试class,测试方法multiThreadSubscriptionTest
。执行此测试时,一些 (1000) 保留的消息将首先在 @BeforeClass
方法中发布。之后,将实例化并执行实现了IMqttMessageListener
和Runnable
接口的MqttSubscriber
class的10个实例。每个 MqttSubscriber 实例都将在自己的线程中执行,并带有自己的 MqttClient 实例,并将使用保留的消息订阅主题树。这将记录到控制台,如下所示:
----------- perform subscriptions
Subscriber-3 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-0 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-2 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-4 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-5 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-6 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-1 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-7 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-8 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-9 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
测试将等待一段时间,然后验证订阅。预计每个订阅者收到 1000 条保留消息:
----------- validate subscriptions
Subscriber-4: receivedMessages=1000; duration=372ms; succeeded=true
Subscriber-0: receivedMessages=1000; duration=265ms; succeeded=true
Subscriber-5: receivedMessages=1000; duration=475ms; succeeded=true
Subscriber-7: receivedMessages=0; duration=0ms; succeeded=false
Subscriber-6: receivedMessages=1000; duration=473ms; succeeded=true
Subscriber-8: receivedMessages=0; duration=0ms; succeeded=false
Subscriber-9: receivedMessages=1000; duration=346ms; succeeded=true
Subscriber-3: receivedMessages=1000; duration=243ms; succeeded=true
Subscriber-1: receivedMessages=1000; duration=470ms; succeeded=true
Subscriber-2: receivedMessages=1000; duration=357ms; succeeded=true
大多数订阅者在很短的时间内(大约数百毫秒)收到了预期的 1000 条消息。但是有些(这里 Subscriber-7/8)没有收到一条消息(持续时间为 0,因为它们从未完成)。如果给订阅者更多时间来接收消息,情况也不会好转。他们只是不会得到它们。
我不知道为什么会这样。 MQTT 代理或客户端不会显示任何错误消息。如果您能提供任何帮助,这对我来说非常有用,因为我依赖于保留消息的可靠传递。
在 GitHub 上重复:FrVaBe/MQTT/mqtt-client-showcase/
- 我使用本地 EMQ 和 HiveMQ 代理进行了测试。如果你想 运行 测试你需要 运行 你机器上的代理 localhost:1883 或更改测试中的配置 class.
- 我使用 Eclipse Paho Java 客户端 MQTT
- 我订阅了
cleanSession=true
,因为我不想有任何状态(连接用于订阅几个主题,不想传递错过的消息)
您不能在连接期间使用 cleanSession(true)
,请参阅此处的说明:http://www.steves-internet-guide.com/mqtt-retained-messages-example/
我用mosquito做测试,它的内部消息队列默认只有100个。
HiveMQ 有最大队列消息数。
Emq 配置有类似的东西 http://emqtt.io/docs/v2/config.html#mqtt-message-queue.
我修复了代码库中的示例代码。适合我。
编辑:刚刚用 Emq (docker run --rm -ti --name emq -p 18083:18083 -p 1883:1883 quodt/emq-docker:latest
) 测试了它并且工作正常。
基本上就是cleanSession:必须是false。 除此之外,测试代码中的等待状态很糟糕。它们在我的机器上太短了。使用锁存器或其他真正的同步机制。
HiveMQ 的人很友好地看了一下这个问题。他们怀疑 Paho 客户端的原因以及订阅中 IMqttMessageListener
的使用。假定的竞争条件存在一个已描述的问题 #432。
经验教训:最好使用 MqttCallback
而不是 IMqttMessageListener