如何使用 Django Channels 多线程 AsyncConsumer

How to multithread AsyncConsumer with Django Channels

我使用 Django Channels 已经有一个星期了,runworker 并行性让我感到困扰。

例如,我有这个 MQTT 客户端,它在收到消息时在频道中发布,基本。

async def treat_message(msg):
    channel_layer = get_channel_layer()
    payload = json.loads(msg.payload, encoding="utf-8")

    await channel_layer.send("mqtt", {
        "type": "value.change",
        "message": payload
    })

这个送的不错。我可以发送多少,就发送到redis队列。到频道 mqtt.

然后我 运行 工作人员将重定向 mqtt 队列中的消息:

python manage.py runworker mqtt
2018-09-12 16:33:42,232 - INFO - runworker - Running worker for channels ['mqtt']

这就是问题开始的地方。下面是AsyncConsumer读取数据的内容:

class MQTTConsumer(AsyncConsumer):
    async def value_change(self, event):
        await asyncio.sleep(5)
        print("I received changes : {}".format(event["message"]))

为了模拟任务的业务,我放了个sleep。这就是我要去的地方:异步消费者不是多线程的!当我向通道发送两条消息时,消费者需要 10 秒来处理第二条消息,如果是多线程则需要 5 秒。如下图。

2018-09-12 16:45:25,271 - INFO - runworker - Running worker for channels ['mqtt']
2018-09-12 16:45:32,559 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:37,561 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:42,563 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:47,565 - INFO - mqtt - I received changes : {'oui': 'non'}

任何有关该主题的情报都会有很大帮助,在此先感谢!

编辑:我发现管理它的唯一方法是创建一个执行程序,其中包含异步执行它的工作人员。但我不确定它的部署效率

def handle_mqtt(event):
    time.sleep(3)
    logger.info("I received changes : {}".format(event["message"]))


class MQTTConsumer(AsyncConsumer):
    def __init__(self, scope):
        super().__init__(scope)
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)

    async def value_change(self, event):
        loop = asyncio.get_event_loop()
        future = loop.run_in_executor(self.executor, handle_mqtt, event)

这是目前的设计

Yes, that is the intended design, as it's the safest way (it prevents race conditions if you're not aware of it). If you are happy to run messages in parallel, just spin off your own coroutines whenever you need them (using asyncio.create_task), making sure that you clean them up and wait for them on shutdown. It's quite a lot of overhead, so hopefully we'll ship an opt-in mode for that in the consumer in future, but for now all we ship with is the safe option.

https://github.com/django/channels/issues/1203