使用 django 频道和 celery 收听 mqtt 主题
Listen to mqtt topics with django channels and celery
我想要一种将 django 与 mqtt 集成的方法,为此我首先想到的是使用 django-channels 和一个支持 mqtt over web sockets 的 mqtt 代理,这样我就可以在代理之间直接通信和 django-channels.
但是,我没有找到从 django 启动 websocket 客户端的方法,根据这个 link 这是不可能的。
因为我也开始研究任务队列,所以我想知道使用 paho-mqtt 启动 mqtt 客户端然后 运行 在单独的进程中使用 celery 是否是一个好习惯。这个过程然后将代理接收到的消息通过 websockets 转发到 django 通道,这样我也可以与客户端进程通信,在需要时发布数据或停止 mqtt 客户端,所有这些都直接来自 django。
我对这个想法有点怀疑,因为我也读到 celery 中的过程 运行 应该不会花太长时间才能完成,在这种情况下,这正是我想要做的。
所以我的问题是,这是一个多么糟糕的主意?是否有任何其他选项可以直接将 django 与 mqtt 集成?
*注意:我不想在服务器上有一个单独的进程 运行ning,我希望能够从 django 启动和停止进程,以便拥有完整的从 web gui
控制 mqtt 客户端
我找到了一个更好的方法,不需要使用芹菜。
我只是在 app/apps.py 的 ready 方法上启动了一个 mqtt 客户端,所以每次我 运行 应用程序都会启动一个客户端。从这里我可以使用 django-channels 或信号与系统的其他部分通信。
apps.py:
from django.apps import AppConfig
from threading import Thread
import paho.mqtt.client as mqtt
class MqttClient(Thread):
def __init__(self, broker, port, timeout, topics):
super(MqttClient, self).__init__()
self.client = mqtt.Client()
self.broker = broker
self.port = port
self.timeout = timeout
self.topics = topics
self.total_messages = 0
# run method override from Thread class
def run(self):
self.connect_to_broker()
def connect_to_broker(self):
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.connect(self.broker, self.port, self.timeout)
self.client.loop_forever()
# The callback for when a PUBLISH message is received from the server.
def on_message(self, client, userdata, msg):
self.total_messages = self.total_messages + 1
print(str(msg.payload) + "Total: {}".format(self.total_messages))
# The callback for when the client receives a CONNACK response from the server.
def on_connect(self, client, userdata, flags, rc):
# Subscribe to a list of topics using a lock to guarantee that a topic is only subscribed once
for topic in self.topics:
client.subscribe(topic)
class CoreConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'core'
def ready(self):
MqttClient("192.168.0.165", 1883, 60, ["teste/01"]).start()
如果您在 Django 应用程序中使用 ASGI,则可以使用 MQTTAsgi。完全披露我是 MQTTAsgi 的作者。
它是 Django 和 MQTT 的完整协议服务器。
要利用 mqtt 协议服务器,您可以 运行 您的应用程序,首先您需要创建一个 MQTT 消费者:
from mqttasgi.consumers import MqttConsumer
class MyMqttConsumer(MqttConsumer):
async def connect(self):
await self.subscribe('my/testing/topic', 2)
async def receive(self, mqtt_message):
print('Received a message at topic:', mqtt_mesage['topic'])
print('With payload', mqtt_message['payload'])
print('And QOS:', mqtt_message['qos'])
pass
async def disconnect(self):
await self.unsubscribe('my/testing/topic')
那么你应该将这个协议添加到协议路由器中:
application = ProtocolTypeRouter({
'websocket': AllowedHostsOriginValidator(URLRouter([
url('.*', WebsocketConsumer)
])),
'mqtt': MyMqttConsumer,
....
})
然后你可以运行 mqtt 协议服务器*:
mqttasgi -H localhost -p 1883 my_application.asgi:application
*假设代理位于本地主机和端口 1883。
我也想解决这个问题,但没有找到真正适合 Channels 架构的好的解决方案(虽然 MQTTAsgi 接近但它使用 paho-mqtt 并且没有完全使用 Channels-layer 系统)。
我创建了:https://pypi.org/project/chanmqttproxy/
(来源 https://github.com/lbt/channels-mqtt-proxy)
本质上,它是 MQTT 的完全异步通道 3 代理,允许发布和订阅。该文档展示了如何扩展标准频道教程,以便在 MQTT 主题上看到聊天消息 - 并且可以从 MQTT 主题发送到所有 websocket 浏览器客户端。
我不知道这是 OP 想要的,就收听 MQTT 主题而言,但对于一般情况,我认为这是一个很好的解决方案。
我想要一种将 django 与 mqtt 集成的方法,为此我首先想到的是使用 django-channels 和一个支持 mqtt over web sockets 的 mqtt 代理,这样我就可以在代理之间直接通信和 django-channels.
但是,我没有找到从 django 启动 websocket 客户端的方法,根据这个 link 这是不可能的。
因为我也开始研究任务队列,所以我想知道使用 paho-mqtt 启动 mqtt 客户端然后 运行 在单独的进程中使用 celery 是否是一个好习惯。这个过程然后将代理接收到的消息通过 websockets 转发到 django 通道,这样我也可以与客户端进程通信,在需要时发布数据或停止 mqtt 客户端,所有这些都直接来自 django。
我对这个想法有点怀疑,因为我也读到 celery 中的过程 运行 应该不会花太长时间才能完成,在这种情况下,这正是我想要做的。
所以我的问题是,这是一个多么糟糕的主意?是否有任何其他选项可以直接将 django 与 mqtt 集成?
*注意:我不想在服务器上有一个单独的进程 运行ning,我希望能够从 django 启动和停止进程,以便拥有完整的从 web gui
控制 mqtt 客户端我找到了一个更好的方法,不需要使用芹菜。
我只是在 app/apps.py 的 ready 方法上启动了一个 mqtt 客户端,所以每次我 运行 应用程序都会启动一个客户端。从这里我可以使用 django-channels 或信号与系统的其他部分通信。
apps.py:
from django.apps import AppConfig
from threading import Thread
import paho.mqtt.client as mqtt
class MqttClient(Thread):
def __init__(self, broker, port, timeout, topics):
super(MqttClient, self).__init__()
self.client = mqtt.Client()
self.broker = broker
self.port = port
self.timeout = timeout
self.topics = topics
self.total_messages = 0
# run method override from Thread class
def run(self):
self.connect_to_broker()
def connect_to_broker(self):
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.connect(self.broker, self.port, self.timeout)
self.client.loop_forever()
# The callback for when a PUBLISH message is received from the server.
def on_message(self, client, userdata, msg):
self.total_messages = self.total_messages + 1
print(str(msg.payload) + "Total: {}".format(self.total_messages))
# The callback for when the client receives a CONNACK response from the server.
def on_connect(self, client, userdata, flags, rc):
# Subscribe to a list of topics using a lock to guarantee that a topic is only subscribed once
for topic in self.topics:
client.subscribe(topic)
class CoreConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'core'
def ready(self):
MqttClient("192.168.0.165", 1883, 60, ["teste/01"]).start()
如果您在 Django 应用程序中使用 ASGI,则可以使用 MQTTAsgi。完全披露我是 MQTTAsgi 的作者。
它是 Django 和 MQTT 的完整协议服务器。
要利用 mqtt 协议服务器,您可以 运行 您的应用程序,首先您需要创建一个 MQTT 消费者:
from mqttasgi.consumers import MqttConsumer
class MyMqttConsumer(MqttConsumer):
async def connect(self):
await self.subscribe('my/testing/topic', 2)
async def receive(self, mqtt_message):
print('Received a message at topic:', mqtt_mesage['topic'])
print('With payload', mqtt_message['payload'])
print('And QOS:', mqtt_message['qos'])
pass
async def disconnect(self):
await self.unsubscribe('my/testing/topic')
那么你应该将这个协议添加到协议路由器中:
application = ProtocolTypeRouter({
'websocket': AllowedHostsOriginValidator(URLRouter([
url('.*', WebsocketConsumer)
])),
'mqtt': MyMqttConsumer,
....
})
然后你可以运行 mqtt 协议服务器*:
mqttasgi -H localhost -p 1883 my_application.asgi:application
*假设代理位于本地主机和端口 1883。
我也想解决这个问题,但没有找到真正适合 Channels 架构的好的解决方案(虽然 MQTTAsgi 接近但它使用 paho-mqtt 并且没有完全使用 Channels-layer 系统)。
我创建了:https://pypi.org/project/chanmqttproxy/
(来源 https://github.com/lbt/channels-mqtt-proxy)
本质上,它是 MQTT 的完全异步通道 3 代理,允许发布和订阅。该文档展示了如何扩展标准频道教程,以便在 MQTT 主题上看到聊天消息 - 并且可以从 MQTT 主题发送到所有 websocket 浏览器客户端。
我不知道这是 OP 想要的,就收听 MQTT 主题而言,但对于一般情况,我认为这是一个很好的解决方案。