是否可以在使用 Redis 后端的 Celery Worker 中使用 mqttasgi 发送 MQTT 消息
Is it possible to send an MQTT message with mqttasgi inside a Celery Worker that use Redis Backend
我在 Django 中使用 mqttasgi 库接收大量消息,并使用 REDIS 队列处理它们,我想将此信息发布回另一个主题。这可能吗?如果是,我该怎么做?
目前我只是将发布功能覆盖到我的消费者中,如下所示。
from mqttasgi.consumers import MqttConsumer
from mqtt_handler.tasks import processmqttmessage
import json
class MyMqttConsumer(MqttConsumer):
async def connect(self):
await self.subscribe('application/5/device/+/event/up', 2)
async def receive(self, mqtt_message):
print('Received a message at topic:', mqtt_message['topic'])
print('With payload', mqtt_message['payload'])
print('And QOS:', mqtt_message['qos'])
print(type(mqtt_message['payload']))
dictresult = json.loads(mqtt_message['payload'])
print(type(dictresult))
print(dictresult)
jsonresult = json.dumps(dictresult)
print(type(jsonresult))
print(jsonresult)
processmqttmessage.delay(jsonresult)
print("test")
pass
async def publish(self, topic, payload, qos=1, retain=False):
await self.send({
'type': 'mqtt.pub',
'mqtt': {
'topic': topic,
'payload': payload,
'qos': qos,
'retain': retain,
}
})
async def disconnect(self):
await self.unsubscribe('application/5/device/+/event/up')
我希望现在能够从我的任务进程内部发布 mqttmessage。
谢谢。
Pd:@Santiago Ivulich 也许你可以帮助我。
是的,这是可能的,没有必要覆盖基础消费者的发布。我会向 return 推荐需要发布回 MQTTAsgi 的结果,以维护单个 MQTT 连接。为此,您可以在通道层中使用一个组,以便将需要发送的内容发送回 mqttasgi。
from mqttasgi.consumers import MqttConsumer
from mqtt_handler.tasks import processmqttmessage
import json
class MyMqttConsumer(MqttConsumer):
async def connect(self):
await self.subscribe('application/5/device/+/event/up', 2)
# Subscribe consumer to channel layer group.
await self.channel_layer.group_add("my.group", self.channel_name)
async def receive(self, mqtt_message):
print('Received a message at topic:', mqtt_message['topic'])
print('With payload', mqtt_message['payload'])
print('And QOS:', mqtt_message['qos'])
print(type(mqtt_message['payload']))
dictresult = json.loads(mqtt_message['payload'])
print(type(dictresult))
print(dictresult)
jsonresult = json.dumps(dictresult)
print(type(jsonresult))
print(jsonresult)
processmqttmessage.delay(jsonresult)
print("test")
pass
async def publish_results(self, event):
data = event['text']
self.publish('my/publish/topic', data, qos=2, retain=False)
async def disconnect(self):
await self.unsubscribe('application/5/device/+/event/up')
并且来自 celery 任务:
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
def processmqttmessage():
...
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)("my.group",
{"type": "publish.results", "text":"Hi from outside of the consumer"})
如果多个消费者将同时 运行 您可以通过编程方式为组生成一个名称并将其作为参数传递给任务。
重要提示:确保您在 celery 和 mqttasgi 项目中使用相同的后端通道。
我在 Django 中使用 mqttasgi 库接收大量消息,并使用 REDIS 队列处理它们,我想将此信息发布回另一个主题。这可能吗?如果是,我该怎么做? 目前我只是将发布功能覆盖到我的消费者中,如下所示。
from mqttasgi.consumers import MqttConsumer
from mqtt_handler.tasks import processmqttmessage
import json
class MyMqttConsumer(MqttConsumer):
async def connect(self):
await self.subscribe('application/5/device/+/event/up', 2)
async def receive(self, mqtt_message):
print('Received a message at topic:', mqtt_message['topic'])
print('With payload', mqtt_message['payload'])
print('And QOS:', mqtt_message['qos'])
print(type(mqtt_message['payload']))
dictresult = json.loads(mqtt_message['payload'])
print(type(dictresult))
print(dictresult)
jsonresult = json.dumps(dictresult)
print(type(jsonresult))
print(jsonresult)
processmqttmessage.delay(jsonresult)
print("test")
pass
async def publish(self, topic, payload, qos=1, retain=False):
await self.send({
'type': 'mqtt.pub',
'mqtt': {
'topic': topic,
'payload': payload,
'qos': qos,
'retain': retain,
}
})
async def disconnect(self):
await self.unsubscribe('application/5/device/+/event/up')
我希望现在能够从我的任务进程内部发布 mqttmessage。
谢谢。
Pd:@Santiago Ivulich 也许你可以帮助我。
是的,这是可能的,没有必要覆盖基础消费者的发布。我会向 return 推荐需要发布回 MQTTAsgi 的结果,以维护单个 MQTT 连接。为此,您可以在通道层中使用一个组,以便将需要发送的内容发送回 mqttasgi。
from mqttasgi.consumers import MqttConsumer
from mqtt_handler.tasks import processmqttmessage
import json
class MyMqttConsumer(MqttConsumer):
async def connect(self):
await self.subscribe('application/5/device/+/event/up', 2)
# Subscribe consumer to channel layer group.
await self.channel_layer.group_add("my.group", self.channel_name)
async def receive(self, mqtt_message):
print('Received a message at topic:', mqtt_message['topic'])
print('With payload', mqtt_message['payload'])
print('And QOS:', mqtt_message['qos'])
print(type(mqtt_message['payload']))
dictresult = json.loads(mqtt_message['payload'])
print(type(dictresult))
print(dictresult)
jsonresult = json.dumps(dictresult)
print(type(jsonresult))
print(jsonresult)
processmqttmessage.delay(jsonresult)
print("test")
pass
async def publish_results(self, event):
data = event['text']
self.publish('my/publish/topic', data, qos=2, retain=False)
async def disconnect(self):
await self.unsubscribe('application/5/device/+/event/up')
并且来自 celery 任务:
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
def processmqttmessage():
...
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)("my.group",
{"type": "publish.results", "text":"Hi from outside of the consumer"})
如果多个消费者将同时 运行 您可以通过编程方式为组生成一个名称并将其作为参数传递给任务。
重要提示:确保您在 celery 和 mqttasgi 项目中使用相同的后端通道。