等待函数,直到它在 python 异步 IO 中收到回调
Await for function until it receives a callback in python async IO
我使用 Bleak 库在 python 中创建了一个 GATT 客户端。我订阅了一个通知特性,当我在通知处理函数中获得回调后,我现在写了一个 characteristic.Until,在我开始通知后,我用了一个 asyncio.sleep 5秒,然后然后我会写一篇关于这个特性的文章。但这是非常脆弱的,无法控制代码,所以我想等到我在通知处理程序函数中收到回调,当我收到响应时,我想进行写入。我是 python 异步 IO 的新手,我不确定如何实现。感谢您的时间和努力。
代码如下:
import logging
import asyncio
import platform
from bleak import BleakClient
from bleak import _logger as logger
import time
from concurrent.futures import ThreadPoolExecutor
CHARACTERISTIC_UUID = "0000c305-0000-1000-8000-00805f9b34fb"
CHARACTERISTIC_UUID_WR = "0000c303-0000-1000-8000-00805f9b34fb"
CHARACTERISTIC_UUID_RD = "0000c301-0000-1000-8000-00805f9b34fb"
_executor = ThreadPoolExecutor(1)
async def notification_handler(sender, data):
"""Simple notification handler which prints the data received."""
print("{0}: {1}".format(sender, data))
print(hexdump(data, 16))
def hexdump(src, length=16):
result = []
digits = 4 if isinstance(src, str) else 2
for i in range(0, len(src), length):
s = src[i:i + length]
hexa = " ".join(map("{0:0>2X}".format, src))
text = "".join([chr(x) if 0x20 <= x < 0x7F else "." for x in s])
result.append("%04X %-*s %s" % (i, length * (digits + 1), hexa, text))
return "\n".join(result)
async def run(address, debug=False):
if debug:
import sys
l = logging.getLogger("asyncio")
l.setLevel(logging.DEBUG)
h = logging.StreamHandler(sys.stdout)
h.setLevel(logging.DEBUG)
l.addHandler(h)
logger.addHandler(h)
async with BleakClient(address) as client:
x = await client.is_connected()
logger.info("Connected: {0}".format(x))
bleDevice = client
await client.start_notify(CHARACTERISTIC_UUID, notification_handler)
# before: asyncio.sleep(5)
# now: wait here until I the notification_hanlder get a callback, then do the write
await client.write_gatt_char(CHARACTERISTIC_UUID_WR, [0xA6, 0x00, 0x02, 0xD0, 0x01, 0x77])
await client.write_gatt_char(CHARACTERISTIC_UUID_WR, [0xA6])
await client.write_gatt_char(CHARACTERISTIC_UUID_WR, [0xA7])
if __name__ == "__main__":
import os
os.environ["PYTHONASYNCIODEBUG"] = str(1)
address = (
"48:23:35:00:14:be" # <--- Change to your device's address here if you are using Windows or Linux
if platform.system() != "Darwin"
else "B9EA5233-37EF-4DD6-87A8-2A875E821C46" # <--- Change to your device's address here if you are using macOS
)
# for i in range(5):
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(run(address, True))
我将从旁注开始。您似乎选择了一些自定义 UUID 值,这些值在 Bluetooth SIG 批准的 UUID 的保留范围内。有一篇关于此的有用文章:https://www.novelbits.io/uuid-for-custom-services-and-characteristics/
关于你的主要问题。我自己也只是在学习asyncio,不过看了看,有东西运行。
您的 write_gatt_char
命令在客户端初始化时执行,而不是在收到通知时执行。一旦完成写入,它就会退出。
我已将您的示例修改为使用 BBC micro:bit。当发送按钮“A”通知时,这会使 Bleak 客户端向 micro:bit 发送一封随机信件。当按钮“B”通知与客户端断开连接时。
我已将 write_gatt_char
移到通知代码中并使用 create_task
来执行写入。
为了让客户端保持活动状态,直到 micro:bit 上的按钮“B”被按下,我在客户端中放置了一个 while 循环,并在完成时停止睡眠。怀疑这不是最好的方法。
这是对我有用的例子:
import logging
import asyncio
import platform
from random import randint
from bleak import BleakClient
from bleak import _logger as logger
BTN_A_UUID = "E95DDA90-251D-470A-A062-FA1922DFA9A8"
BTN_B_UUID = "E95DDA91-251D-470A-A062-FA1922DFA9A8"
LED_TXT_UUID = "E95D93EE-251D-470A-A062-FA1922DFA9A8"
async def run(address, debug=False):
if debug:
import sys
l = logging.getLogger("asyncio")
l.setLevel(logging.DEBUG)
h = logging.StreamHandler(sys.stdout)
h.setLevel(logging.DEBUG)
l.addHandler(h)
logger.addHandler(h)
async with BleakClient(address) as client:
x = await client.is_connected()
logger.info("Connected: {0}".format(x))
def btn_a_handler(sender, data):
"""Simple notification handler for btn a events."""
print("{0}: {1}".format(sender, data))
# Pick random letter to send
if int.from_bytes(data, byteorder='little', signed=False) > 0:
letter = [randint(99, 122)]
loop.create_task(write_txt(letter))
def btn_b_handler(sender, data):
"""Simple notification handler for btn b events."""
print("{0}: {1}".format(sender, data))
if int.from_bytes(data, byteorder='little', signed=False) > 0:
loop.create_task(client.disconnect())
async def write_txt(data):
await client.write_gatt_char(LED_TXT_UUID,
data)
await client.start_notify(BTN_A_UUID, btn_a_handler)
await client.start_notify(BTN_B_UUID, btn_b_handler)
while await client.is_connected():
await asyncio.sleep(1)
if __name__ == "__main__":
address = ("E9:06:4D:45:FC:8D")
loop = asyncio.get_event_loop()
# loop.set_debug(True)
loop.run_until_complete(run(address, True))
我使用 Bleak 库在 python 中创建了一个 GATT 客户端。我订阅了一个通知特性,当我在通知处理函数中获得回调后,我现在写了一个 characteristic.Until,在我开始通知后,我用了一个 asyncio.sleep 5秒,然后然后我会写一篇关于这个特性的文章。但这是非常脆弱的,无法控制代码,所以我想等到我在通知处理程序函数中收到回调,当我收到响应时,我想进行写入。我是 python 异步 IO 的新手,我不确定如何实现。感谢您的时间和努力。
代码如下:
import logging
import asyncio
import platform
from bleak import BleakClient
from bleak import _logger as logger
import time
from concurrent.futures import ThreadPoolExecutor
CHARACTERISTIC_UUID = "0000c305-0000-1000-8000-00805f9b34fb"
CHARACTERISTIC_UUID_WR = "0000c303-0000-1000-8000-00805f9b34fb"
CHARACTERISTIC_UUID_RD = "0000c301-0000-1000-8000-00805f9b34fb"
_executor = ThreadPoolExecutor(1)
async def notification_handler(sender, data):
"""Simple notification handler which prints the data received."""
print("{0}: {1}".format(sender, data))
print(hexdump(data, 16))
def hexdump(src, length=16):
result = []
digits = 4 if isinstance(src, str) else 2
for i in range(0, len(src), length):
s = src[i:i + length]
hexa = " ".join(map("{0:0>2X}".format, src))
text = "".join([chr(x) if 0x20 <= x < 0x7F else "." for x in s])
result.append("%04X %-*s %s" % (i, length * (digits + 1), hexa, text))
return "\n".join(result)
async def run(address, debug=False):
if debug:
import sys
l = logging.getLogger("asyncio")
l.setLevel(logging.DEBUG)
h = logging.StreamHandler(sys.stdout)
h.setLevel(logging.DEBUG)
l.addHandler(h)
logger.addHandler(h)
async with BleakClient(address) as client:
x = await client.is_connected()
logger.info("Connected: {0}".format(x))
bleDevice = client
await client.start_notify(CHARACTERISTIC_UUID, notification_handler)
# before: asyncio.sleep(5)
# now: wait here until I the notification_hanlder get a callback, then do the write
await client.write_gatt_char(CHARACTERISTIC_UUID_WR, [0xA6, 0x00, 0x02, 0xD0, 0x01, 0x77])
await client.write_gatt_char(CHARACTERISTIC_UUID_WR, [0xA6])
await client.write_gatt_char(CHARACTERISTIC_UUID_WR, [0xA7])
if __name__ == "__main__":
import os
os.environ["PYTHONASYNCIODEBUG"] = str(1)
address = (
"48:23:35:00:14:be" # <--- Change to your device's address here if you are using Windows or Linux
if platform.system() != "Darwin"
else "B9EA5233-37EF-4DD6-87A8-2A875E821C46" # <--- Change to your device's address here if you are using macOS
)
# for i in range(5):
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(run(address, True))
我将从旁注开始。您似乎选择了一些自定义 UUID 值,这些值在 Bluetooth SIG 批准的 UUID 的保留范围内。有一篇关于此的有用文章:https://www.novelbits.io/uuid-for-custom-services-and-characteristics/
关于你的主要问题。我自己也只是在学习asyncio,不过看了看,有东西运行。
您的 write_gatt_char
命令在客户端初始化时执行,而不是在收到通知时执行。一旦完成写入,它就会退出。
我已将您的示例修改为使用 BBC micro:bit。当发送按钮“A”通知时,这会使 Bleak 客户端向 micro:bit 发送一封随机信件。当按钮“B”通知与客户端断开连接时。
我已将 write_gatt_char
移到通知代码中并使用 create_task
来执行写入。
为了让客户端保持活动状态,直到 micro:bit 上的按钮“B”被按下,我在客户端中放置了一个 while 循环,并在完成时停止睡眠。怀疑这不是最好的方法。
这是对我有用的例子:
import logging
import asyncio
import platform
from random import randint
from bleak import BleakClient
from bleak import _logger as logger
BTN_A_UUID = "E95DDA90-251D-470A-A062-FA1922DFA9A8"
BTN_B_UUID = "E95DDA91-251D-470A-A062-FA1922DFA9A8"
LED_TXT_UUID = "E95D93EE-251D-470A-A062-FA1922DFA9A8"
async def run(address, debug=False):
if debug:
import sys
l = logging.getLogger("asyncio")
l.setLevel(logging.DEBUG)
h = logging.StreamHandler(sys.stdout)
h.setLevel(logging.DEBUG)
l.addHandler(h)
logger.addHandler(h)
async with BleakClient(address) as client:
x = await client.is_connected()
logger.info("Connected: {0}".format(x))
def btn_a_handler(sender, data):
"""Simple notification handler for btn a events."""
print("{0}: {1}".format(sender, data))
# Pick random letter to send
if int.from_bytes(data, byteorder='little', signed=False) > 0:
letter = [randint(99, 122)]
loop.create_task(write_txt(letter))
def btn_b_handler(sender, data):
"""Simple notification handler for btn b events."""
print("{0}: {1}".format(sender, data))
if int.from_bytes(data, byteorder='little', signed=False) > 0:
loop.create_task(client.disconnect())
async def write_txt(data):
await client.write_gatt_char(LED_TXT_UUID,
data)
await client.start_notify(BTN_A_UUID, btn_a_handler)
await client.start_notify(BTN_B_UUID, btn_b_handler)
while await client.is_connected():
await asyncio.sleep(1)
if __name__ == "__main__":
address = ("E9:06:4D:45:FC:8D")
loop = asyncio.get_event_loop()
# loop.set_debug(True)
loop.run_until_complete(run(address, True))