使用 django 频道和 celery 收听 mqtt 主题

Listen to mqtt topics with django channels and celery

我想要一种将 django 与 mqtt 集成的方法,为此我首先想到的是使用 django-channels 和一个支持 mqtt over web sockets 的 mqtt 代理,这样我就可以在代理之间直接通信和 django-channels.

但是,我没有找到从 django 启动 websocket 客户端的方法,根据这个 link 这是不可能的。

因为我也开始研究任务队列,所以我想知道使用 paho-mqtt 启动 mqtt 客户端然后 运行 在单独的进程中使用 celery 是否是一个好习惯。这个过程然后将代理接收到的消息通过 websockets 转发到 django 通道,这样我也可以与客户端进程通信,在需要时发布数据或停止 mqtt 客户端,所有这些都直接来自 django。

我对这个想法有点怀疑,因为我也读到 celery 中的过程 运行 应该不会花太长时间才能完成,在这种情况下,这正是我想要做的。

所以我的问题是,这是一个多么糟糕的主意?是否有任何其他选项可以直接将 django 与 mqtt 集成?

*注意:我不想在服务器上有一个单独的进程 运行ning,我希望能够从 django 启动和停止进程,以便拥有完整的从 web gui

控制 mqtt 客户端

我找到了一个更好的方法,不需要使用芹菜。

我只是在 app/apps.py 的 ready 方法上启动了一个 mqtt 客户端,所以每次我 运行 应用程序都会启动一个客户端。从这里我可以使用 django-channels 或信号与系统的其他部分通信。

apps.py:

from django.apps import AppConfig
from threading import Thread
import paho.mqtt.client as mqtt


class MqttClient(Thread):
    def __init__(self, broker, port, timeout, topics):
    super(MqttClient, self).__init__()
    self.client = mqtt.Client()
    self.broker = broker
    self.port = port
    self.timeout = timeout
    self.topics = topics
    self.total_messages = 0

#  run method override from Thread class
def run(self):
    self.connect_to_broker()

def connect_to_broker(self):
    self.client.on_connect = self.on_connect
    self.client.on_message = self.on_message
    self.client.connect(self.broker, self.port, self.timeout)
    self.client.loop_forever()

# The callback for when a PUBLISH message is received from the server.
def on_message(self, client, userdata, msg):
    self.total_messages = self.total_messages + 1
    print(str(msg.payload) + "Total: {}".format(self.total_messages))

# The callback for when the client receives a CONNACK response from the server.
def on_connect(self, client, userdata, flags, rc):
    #  Subscribe to a list of topics using a lock to guarantee that a topic is only subscribed once
    for topic in self.topics:
        client.subscribe(topic)


class CoreConfig(AppConfig):
    default_auto_field = 'django.db.models.BigAutoField'
    name = 'core'

def ready(self):
    MqttClient("192.168.0.165", 1883, 60, ["teste/01"]).start()

如果您在 Django 应用程序中使用 ASGI,则可以使用 MQTTAsgi。完全披露我是 MQTTAsgi 的作者。

它是 Django 和 MQTT 的完整协议服务器。

要利用 mqtt 协议服务器,您可以 运行 您的应用程序,首先您需要创建一个 MQTT 消费者:

from mqttasgi.consumers import MqttConsumer
class MyMqttConsumer(MqttConsumer):

    async def connect(self):
        await self.subscribe('my/testing/topic', 2)

    async def receive(self, mqtt_message):
        print('Received a message at topic:', mqtt_mesage['topic'])
        print('With payload', mqtt_message['payload'])
        print('And QOS:', mqtt_message['qos'])
        pass

    async def disconnect(self):
        await self.unsubscribe('my/testing/topic')

那么你应该将这个协议添加到协议路由器中:

application = ProtocolTypeRouter({
      'websocket': AllowedHostsOriginValidator(URLRouter([
          url('.*', WebsocketConsumer)
      ])),
      'mqtt': MyMqttConsumer,
      ....
    })

然后你可以运行 mqtt 协议服务器*:

mqttasgi -H localhost -p 1883 my_application.asgi:application

*假设代理位于本地主机和端口 1883。

我也想解决这个问题,但没有找到真正适合 Channels 架构的好的解决方案(虽然 MQTTAsgi 接近但它使用 paho-mqtt 并且没有完全使用 Channels-layer 系统)。

我创建了:https://pypi.org/project/chanmqttproxy/

(来源 https://github.com/lbt/channels-mqtt-proxy

本质上,它是 MQTT 的完全异步通道 3 代理,允许发布和订阅。该文档展示了如何扩展标准频道教程,以便在 MQTT 主题上看到聊天消息 - 并且可以从 MQTT 主题发送到所有 websocket 浏览器客户端。

我不知道这是 OP 想要的,就收听 MQTT 主题而言,但对于一般情况,我认为这是一个很好的解决方案。