将 asyncio 与来自外部库的非异步回调方法一起使用

Use asyncio with non asynchrone callback method from external library

我正在使用 gpiozero python 库来处理 Raspberry Pi 上的简单 GPIO 设备(我在这里使用 MotionSensor 作为示例):

import asyncio
from gpiozero import MotionSensor


class MotionSensorHandler():
    __whenMotionCallback = None

    def __init__(self, pin, whenMotionCallback):
        # whenMotionCallback is an async function
        self.__whenMotionCallback = whenMotionCallback

        # Just init the sensor with gpiozero lib
        motionSensor = MotionSensor(pin)

        # Method to call when motion is detected
        motionSensor.when_motion = self.whenMotion

    async def whenMotion(self):
        await self.__whenMotionCallback()

我的问题是我试图给一个 async 函数回调 motionSensor.when_motion

所以我得到的错误是 whenMotion 函数是 async 但从来没有 await 但我实际上不能等待它:

# will not work because MotionSensor() is not using asyncio
motionSensor.when_motion = await self.whenMotion

你知道如何将我的 async 函数分配给 none 函数吗?

如果您使用协程执行此操作,则需要获取并 运行 事件循环。我假设您使用的是 python 3.7,在这种情况下,您可以执行以下操作:

import asyncio
from gpiozero import MotionSensor


class MotionSensorHandler():
    __whenMotionCallback = None

    def __init__(self, pin, whenMotionCallback):
        # whenMotionCallback is an async function
        self.__whenMotionCallback = whenMotionCallback

        # Just init the sensor with gpiozero lib
        motionSensor = MotionSensor(pin)

        # Method to call when motion is detected
        loop = asyncio.get_event_loop()
        motionSensor.when_motion = loop.run_until_complete(self.whenMotion())
        loop.close()

    async def whenMotion(self):
        await self.__whenMotionCallback()

如果您使用的是 python 3.8,您可以只使用 asyncio.run 而不是所有显式获取和 运行 事件循环。

鉴于这是 运行 在一个循环中并且 when_motion 不需要 return 值,您可以这样做:

        ...
        motionSensor.when_motion = self.whenMotion

    def whenMotion(self):
        asyncio.ensure_future(self.__whenMotionCallback())

这将在事件循环中安排异步回调并使库的调用代码保持同步。

所以经过研究我发现我必须创建一个新的异步循环来以非异步方法执行异步脚本。所以现在我的 whenMotion() 方法不再是 async 而是使用 ensure_future().

执行一个
import asyncio
from gpiozero import MotionSensor


class MotionSensorHandler():
    __whenMotionCallback = None

    def __init__(self, pin, whenMotionCallback):
        # whenMotionCallback is an async function
        self.__whenMotionCallback = whenMotionCallback

        # Just init the sensor with gpiozero lib
        motionSensor = MotionSensor(pin)

        # Method to call when motion is detected
        motionSensor.when_motion = self.whenMotion

    def whenMotion(self):
        # Create new asyncio loop
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        future = asyncio.ensure_future(self.__executeWhenMotionCallback()) # Execute async method
        loop.run_until_complete(future)
        loop.close()

    async def __executeWhenMotionCallback(self):
        await self.__whenMotionCallback()

当设置 when_motion 属性 时,gpiozero 会创建一个新线程来执行回调(这没有很好地记录)。如果回调应该在主异步循环中执行,那么您需要将控制权交还给主线程。

call_soon_threadsafe 方法可以为您做到这一点。本质上,它将回调添加到等待发生时主异步循环调用的任务列表中。

然而,asyncio 循环对于每个线程都是本地的:参见 get_running_loop

因此,当在主异步线程中创建 gpiozero 对象时,您需要在调用回调时使该循环对象对对象可用。

以下是我如何为调用 asyncio MQTT 方法的 PIR 执行此操作:

class PIR:
    def __init__(self, mqtt, pin):
        self.pir = MotionSensor(pin=pin)
        self.pir.when_motion = self.motion
        # store the mqtt client we'll need to call
        self.mqtt = mqtt
        # This PIR object is created in the main thread
        # so store that loop object
        self.loop = asyncio.get_running_loop()

    def motion(self):
        # motion is called in the gpiozero monitoring thread
        # it has to use our stored copy of the loop and then
        # tell that loop to call the callback:
        self.loop.call_soon_threadsafe(self.mqtt.publish,
                                       f'sensor/gpiod/pir/kitchen', True)