如何使用 HiveMQ mqtt 客户端启用订阅 topics/messages 的并行 consuming/processing
How to enable parallel consuming/processing of subscribed topics/messages with HiveMQ mqtt client
我们目前正在从旧版本的 Eclipe Paho MQTT 客户端切换到版本 1.2 的 HiveMQ MQTT 客户端。
https://github.com/hivemq/hivemq-mqtt-client
目前正在使用客户端的 Aync- 版本,它需要一个消费者函数作为回调。
我们的一个 MQTT 客户端应用程序必须 process/consumer 关于许多不同主题的大量消息,并且一条消息的处理不必等待前一条消息完成。
我们不确定实现仅使用一个客户端实例并行处理消息的最佳方法是什么。
在上面的文档中有一个可选的执行器可以定义
client.subscribeWith()
.topicFilter("test/topic")
.qos(MqttQos.EXACTLY_ONCE)
.callback(System.out::println)
.executor(executor) // optional
.send();
当没有定义执行器时,AsyncClient 应该如何表现?
然后一切都以阻塞的方式串行处理?
这似乎以某种方式违背了使用回调定义异步的目的....
在我们的旧实现中,我们使用 共享订阅(这是 HiveMQ 3 中的非标准功能,现在是 MQTT 5 的标准功能)和多个客户端实例不断等待相同的主题交替处理。
但是考虑到 HiveMQ CLient API(遗憾的是缺少更多解释或示例)我们希望找到一种更优雅和简单的方法来实现并行处理或者线程池什么的!
感谢任何帮助!
通常只有在将应用程序扩展到多台机器时才需要共享订阅。
如果您可以并行处理消息,那么就没有理由在一台机器上使用共享订阅。
如果以后消息负载会增加,以后还是可以选择共享订阅横向扩展到多台机器。
由于 MQTT 提供排序保证,因此 HiveMQ MQTT 客户端会连续调用相同的回调。并行执行不同订阅的多个回调。
对于单个回调,只有您的应用程序可以选择打散排序。
为此,您只需将回调中的消息交给并行工作人员即可。
我现在正在处理同样的问题。据我所知,可以对 Qos 0 进行并行处理,因为不需要 PUBACK 但是...对于 Qos 1 和 2 问题更多复杂。
经纪人不会发送消息,除非前一个消息没有被 PUBACKed。当然,客户端仍然可以在开始处理消息之前尽快发送 PUBACK。但是,如果使用 Qos 1 订阅,则使用它是有原因的 - 订阅者不想丢失消息。
如果您立即 PUBACK 消息,您的订阅者可能会在处理过程中失败,并且永远不会再收到该消息。在那种情况下,接收、处理、PUBACK 更安全。这是我们回到串行处理的地方。
目前我还没有找到解决这个问题的办法。带有队列预取的 RabbitMQ 可以处理这个问题,但我想使用 MQTT 来解决它:)
我们目前正在从旧版本的 Eclipe Paho MQTT 客户端切换到版本 1.2 的 HiveMQ MQTT 客户端。 https://github.com/hivemq/hivemq-mqtt-client
目前正在使用客户端的 Aync- 版本,它需要一个消费者函数作为回调。
我们的一个 MQTT 客户端应用程序必须 process/consumer 关于许多不同主题的大量消息,并且一条消息的处理不必等待前一条消息完成。 我们不确定实现仅使用一个客户端实例并行处理消息的最佳方法是什么。
在上面的文档中有一个可选的执行器可以定义
client.subscribeWith()
.topicFilter("test/topic")
.qos(MqttQos.EXACTLY_ONCE)
.callback(System.out::println)
.executor(executor) // optional
.send();
当没有定义执行器时,AsyncClient 应该如何表现? 然后一切都以阻塞的方式串行处理? 这似乎以某种方式违背了使用回调定义异步的目的....
在我们的旧实现中,我们使用 共享订阅(这是 HiveMQ 3 中的非标准功能,现在是 MQTT 5 的标准功能)和多个客户端实例不断等待相同的主题交替处理。
但是考虑到 HiveMQ CLient API(遗憾的是缺少更多解释或示例)我们希望找到一种更优雅和简单的方法来实现并行处理或者线程池什么的!
感谢任何帮助!
通常只有在将应用程序扩展到多台机器时才需要共享订阅。 如果您可以并行处理消息,那么就没有理由在一台机器上使用共享订阅。 如果以后消息负载会增加,以后还是可以选择共享订阅横向扩展到多台机器。
由于 MQTT 提供排序保证,因此 HiveMQ MQTT 客户端会连续调用相同的回调。并行执行不同订阅的多个回调。 对于单个回调,只有您的应用程序可以选择打散排序。 为此,您只需将回调中的消息交给并行工作人员即可。
我现在正在处理同样的问题。据我所知,可以对 Qos 0 进行并行处理,因为不需要 PUBACK 但是...对于 Qos 1 和 2 问题更多复杂。
经纪人不会发送消息,除非前一个消息没有被 PUBACKed。当然,客户端仍然可以在开始处理消息之前尽快发送 PUBACK。但是,如果使用 Qos 1 订阅,则使用它是有原因的 - 订阅者不想丢失消息。
如果您立即 PUBACK 消息,您的订阅者可能会在处理过程中失败,并且永远不会再收到该消息。在那种情况下,接收、处理、PUBACK 更安全。这是我们回到串行处理的地方。
目前我还没有找到解决这个问题的办法。带有队列预取的 RabbitMQ 可以处理这个问题,但我想使用 MQTT 来解决它:)