Python 无广告扫描并退出事件循环

Python Bleak scan for advertisements and exit event loop

我继承了一些利用 Python Bleak 扫描特定设备发出的广告的代码。每当检测到来自我们正在寻找的蓝牙 mac 地址和服务 ID 的广告并且提取的有效负载信息中的某个条件为真时,我们想要终止并 return。在附带的代码中,我屏蔽了蓝牙和服务 ID:s.

对事件循环不太熟悉,有没有办法在定时器用完之前退出?我想可能有更好的方法来解决这个问题。

示例代码:

import asyncio
import struct

from bleak import BleakScanner

timeout_seconds = 10
address_to_look_for = 'masked'
service_id_to_look_for = 'masked'

def detection_callback(device, advertisement_data):
    if device.address == address_to_look_for:
        byte_data = advertisement_data.service_data.get(service_id_to_look_for)
        num_to_test = struct.unpack_from('<I', byte_data, 0)
        if num_to_test == 1:
            print('here we want to terminate')
        

async def run():
    scanner = BleakScanner()
    scanner.register_detection_callback(detection_callback)
    await scanner.start()
    await asyncio.sleep(timeout_seconds)
    await scanner.stop()

if __name__=='__main__':    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())

我相信有很多方法可以做到这一点。一个小的 mod 到你的代码而不是在你停止扫描之前让 asyncio.sleep 整个周期,你可以有一个 while 循环,它在时间过去或设备发现事件结束。

例如:

import asyncio
import struct

from bleak import BleakScanner

timeout_seconds = 20
address_to_look_for = 'F1:D9:3B:39:4D:A2'
service_id_to_look_for = '0000feaa-0000-1000-8000-00805f9b34fb'


class MyScanner:
    def __init__(self):
        self._scanner = BleakScanner()
        self._scanner.register_detection_callback(self.detection_callback)
        self.scanning = asyncio.Event()

    def detection_callback(self, device, advertisement_data):
        # Looking for:
        # AdvertisementData(service_data={
        # '0000feaa-0000-1000-8000-00805f9b34fb': b'\x00\xf6\x00\x00\x00Jupiter\x00\x00\x00\x00\x00\x0b'},
        # service_uuids=['0000feaa-0000-1000-8000-00805f9b34fb'])
        if device.address == address_to_look_for:
            byte_data = advertisement_data.service_data.get(service_id_to_look_for)
            num_to_test, = struct.unpack_from('<I', byte_data, 0)
            if num_to_test == 62976:
                print('\t\tDevice found so we terminate')
                self.scanning.clear()

    async def run(self):
        await self._scanner.start()
        self.scanning.set()
        end_time = loop.time() + timeout_seconds
        while self.scanning.is_set():
            if loop.time() > end_time:
                self.scanning.clear()
                print('\t\tScan has timed out so we terminate')
            await asyncio.sleep(0.1)
        await self._scanner.stop()


if __name__ == '__main__':
    my_scanner = MyScanner()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(my_scanner.run())