等待函数,直到它在 python 异步 IO 中收到回调

Await for function until it receives a callback in python async IO

我使用 Bleak 库在 python 中创建了一个 GATT 客户端。我订阅了一个通知特性,当我在通知处理函数中获得回调后,我现在写了一个 characteristic.Until,在我开始通知后,我用了一个 asyncio.sleep 5秒,然后然后我会写一篇关于这个特性的文章。但这是非常脆弱的,无法控制代码,所以我想等到我在通知处理程序函数中收到回调,当我收到响应时,我想进行写入。我是 python 异步 IO 的新手,我不确定如何实现。感谢您的时间和努力。

代码如下:


import logging
import asyncio
import platform

from bleak import BleakClient
from bleak import _logger as logger
import time

from concurrent.futures import ThreadPoolExecutor

CHARACTERISTIC_UUID = "0000c305-0000-1000-8000-00805f9b34fb"
CHARACTERISTIC_UUID_WR = "0000c303-0000-1000-8000-00805f9b34fb"
CHARACTERISTIC_UUID_RD = "0000c301-0000-1000-8000-00805f9b34fb"

_executor = ThreadPoolExecutor(1)

async def notification_handler(sender, data):
    """Simple notification handler which prints the data received."""
    print("{0}: {1}".format(sender, data))
    print(hexdump(data, 16))


def hexdump(src, length=16):
        result = []
        digits = 4 if isinstance(src, str) else 2
        for i in range(0, len(src), length):
            s = src[i:i + length]
            hexa = " ".join(map("{0:0>2X}".format, src))
            text = "".join([chr(x) if 0x20 <= x < 0x7F else "." for x in s])
            result.append("%04X   %-*s   %s" % (i, length * (digits + 1), hexa, text))
        return "\n".join(result)

async def run(address, debug=False):
    if debug:
        import sys

        l = logging.getLogger("asyncio")
        l.setLevel(logging.DEBUG)
        h = logging.StreamHandler(sys.stdout)
        h.setLevel(logging.DEBUG)
        l.addHandler(h)
        logger.addHandler(h)



    async with BleakClient(address) as client:
        x = await client.is_connected()
        logger.info("Connected: {0}".format(x))

        bleDevice = client
        await client.start_notify(CHARACTERISTIC_UUID, notification_handler)


        # before: asyncio.sleep(5)


        # now: wait here until I the notification_hanlder get a callback, then do the write

        await client.write_gatt_char(CHARACTERISTIC_UUID_WR, [0xA6, 0x00, 0x02, 0xD0, 0x01, 0x77])
        
        await client.write_gatt_char(CHARACTERISTIC_UUID_WR, [0xA6])

        await client.write_gatt_char(CHARACTERISTIC_UUID_WR, [0xA7])

if __name__ == "__main__":
    import os

    os.environ["PYTHONASYNCIODEBUG"] = str(1)
    address = (
        "48:23:35:00:14:be"  # <--- Change to your device's address here if you are using Windows or Linux
        if platform.system() != "Darwin"
        else "B9EA5233-37EF-4DD6-87A8-2A875E821C46"  # <--- Change to your device's address here if you are using macOS
    )

    # for i in range(5):
    loop = asyncio.get_event_loop()
    loop.set_debug(True)
    loop.run_until_complete(run(address, True))

我将从旁注开始。您似乎选择了一些自定义 UUID 值,这些值在 Bluetooth SIG 批准的 UUID 的保留范围内。有一篇关于此的有用文章:https://www.novelbits.io/uuid-for-custom-services-and-characteristics/

关于你的主要问题。我自己也只是在学习asyncio,不过看了看,有东西运行。 您的 write_gatt_char 命令在客户端初始化时执行,而不是在收到通知时执行。一旦完成写入,它就会退出。

我已将您的示例修改为使用 BBC micro:bit。当发送按钮“A”通知时,这会使 Bleak 客户端向 micro:bit 发送一封随机信件。当按钮“B”通知与客户端断开连接时。 我已将 write_gatt_char 移到通知代码中并使用 create_task 来执行写入。 为了让客户端保持活动状态,直到 micro:bit 上的按钮“B”被按下,我在客户端中放置了一个 while 循环,并在完成时停止睡眠。怀疑这不是最好的方法。

这是对我有用的例子:

import logging
import asyncio
import platform
from random import randint

from bleak import BleakClient
from bleak import _logger as logger


BTN_A_UUID = "E95DDA90-251D-470A-A062-FA1922DFA9A8"
BTN_B_UUID = "E95DDA91-251D-470A-A062-FA1922DFA9A8"
LED_TXT_UUID = "E95D93EE-251D-470A-A062-FA1922DFA9A8"



async def run(address, debug=False):
    if debug:
        import sys

        l = logging.getLogger("asyncio")
        l.setLevel(logging.DEBUG)
        h = logging.StreamHandler(sys.stdout)
        h.setLevel(logging.DEBUG)
        l.addHandler(h)
        logger.addHandler(h)


    async with BleakClient(address) as client:
        x = await client.is_connected()
        logger.info("Connected: {0}".format(x))

        def btn_a_handler(sender, data):
            """Simple notification handler for btn a events."""
            print("{0}: {1}".format(sender, data))
            # Pick random letter to send
            if int.from_bytes(data, byteorder='little', signed=False) > 0:
                letter = [randint(99, 122)]
                loop.create_task(write_txt(letter))

        def btn_b_handler(sender, data):
            """Simple notification handler for btn b events."""
            print("{0}: {1}".format(sender, data))
            if int.from_bytes(data, byteorder='little', signed=False) > 0:
                loop.create_task(client.disconnect())

        async def write_txt(data):
            await client.write_gatt_char(LED_TXT_UUID,
                                         data)

        await client.start_notify(BTN_A_UUID, btn_a_handler)
        await client.start_notify(BTN_B_UUID, btn_b_handler)

        while await client.is_connected():
            await asyncio.sleep(1)

if __name__ == "__main__":
    address = ("E9:06:4D:45:FC:8D")

    loop = asyncio.get_event_loop()
    # loop.set_debug(True)
    loop.run_until_complete(run(address, True))