主动 BLE 扫描 (BlueZ) - DBus 问题

Active BLE Scanning (BlueZ) - Issue with DBus

我已经启动了一个项目,我需要主动(始终)扫描 BLE 设备。我在 Linux,使用 Bluez 5.49,我使用 Python 与 dbus 1.10.20 通信)。 我能够开始扫描,使用 bluetoothctl 停止扫描并通过 DBus(BlueZ 接口的 GetManagedObjects())获取 BLE 广告数据。我遇到的问题是,当我让扫描持续数小时时,dbus-deamon 开始占用越来越多的 RAM,我无法找到如何“刷新”dbus 从 BlueZ 收集的内容。最终 RAM 变满了,Linux 不高兴。

所以我尽量不一直扫描,这样可能会让垃圾收集器进行清理。没用。

我编辑了 /etc/dbus-1/system.d/bluetooth.conf 以删除我不需要的任何界面

<policy user="root">
    <allow own="org.bluez"/>
    <allow send_destination="org.bluez"/>
</policy>

这减缓了 RAM 的积累,但没有解决问题。

我找到了一种方法来检查哪个连接有字节等待并确认它来自 blueZ

Connection :1.74 with pid 3622 '/usr/libexec/bluetooth/bluetoothd --experimental ' (org.bluez):
        IncomingBytes=1253544
        PeakIncomingBytes=1313072
        OutgoingBytes=0
        PeakOutgoingBytes=210

最后,我发现有人需要读取 DBus 中等待的内容才能释放内存。所以我发现了这个:

我收到了 BlueZ 发送过来的数据,但内存仍在累积。

我知道释放 dbus 的唯一方法是重新启动 linux。这并不理想。

我对 DBus 的理解即将结束,这就是我今天来到这里的原因。 如果您有任何见解可以帮助我从 BlueZ 消息中释放 dbus,将不胜感激。

提前致谢

编辑 添加我用来读取已发现设备的 DBus 代码:

#!/usr/bin/python3

import dbus

BLUEZ_SERVICE_NAME = "org.bluez"
DBUS_OM_IFACE = "org.freedesktop.DBus.ObjectManager"
DEVICES_IFACE = "org.bluez.Device1"

def main_loop(subproc):
    devinfo = None
    objects = None

    dbussys = dbus.SystemBus()
    dbusconnection = dbussys.get_object(BLUEZ_SERVICE_NAME, "/")
    bluezInterface = dbus.Interface(dbusconnection, DBUS_OM_IFACE)
    
        
    while True:
        try:
            objects = bluezInterface.GetManagedObjects()
        except dbus.DBusException as err:
            print("dbus Error : " + str(err))
            pass

        all_devices = (str(path) for path, interfaces in objects.items() if DEVICES_IFACE in interfaces.keys())

        for path, interfaces in objects.items():
            if "org.bluez.Adapter1" not in interfaces.keys():
                continue

            device_list = [d for d in all_devices if d.startswith(path + "/")]

            for dev_path in device_list:
                properties = objects[dev_path][DEVICES_IFACE]
                
                if "ServiceData" in properties.keys() and "Name" in properties.keys() and "RSSI" in properties.keys():
                    #[... Do someting...] 

确实,当您停止发现时,Bluez 会刷新内存。因此,为了连续扫描,您需要始终启动和停止发现。我发现 6 秒,等待 1 秒,然后再次开始发现 6 秒……依此类推。如果您检查日志,您会看到它在停止发现时删除了很多东西。

我无法真正准确地重现您的错误,但我的系统不满意 运行 如此快速的 while 循环重复从 GetManagedObjects 获取数据。 下面是我 运行 基于您的代码并进行了一些重构的代码...

import dbus

BLUEZ_SERVICE_NAME = "org.bluez"
DBUS_OM_IFACE = "org.freedesktop.DBus.ObjectManager"
ADAPTER_IFACE = "org.bluez.Adapter1"
DEVICES_IFACE = "org.bluez.Device1"

def main_loop():
    devinfo = None
    objects = None

    dbussys = dbus.SystemBus()
    dbusconnection = dbussys.get_object(BLUEZ_SERVICE_NAME, "/")
    bluezInterface = dbus.Interface(dbusconnection, DBUS_OM_IFACE)

    while True:
        objects = bluezInterface.GetManagedObjects()
        for path in objects:
            name = objects[path].get(DEVICES_IFACE, {}).get('Name')
            rssi = objects[path].get(DEVICES_IFACE, {}).get('RSSI')
            service_data = objects[path].get(DEVICES_IFACE, {}).get('ServiceData')
            if all((name, rssi, service_data)):
                print(f'{name} @ {rssi} = {service_data}')
                #[... Do someting...] 

if __name__ == '__main__':
    main_loop()

我不确定您在更广泛的项目中尝试做什么,但如果我可以提出一些建议...

一种更典型的扫描 service/manufacturer 数据的方法是订阅 D-Bus 中的信号,当感兴趣的事情发生时触发回调。

下面是我用来查找 iBeacon 和 Eddystone 信标的一些代码。这使用 GLib 事件循环运行,这可能是您已经排除的但在资源上更有效。

它确实使用了不同的 Python dbus 绑定,因为我发现 pydbus 更“pythonic”。

我留下了处理信标的代码,因为它可能是一个有用的参考。

import argparse
from gi.repository import GLib
from pydbus import SystemBus
import uuid

DEVICE_INTERFACE = 'org.bluez.Device1'

remove_list = set()


def stop_scan():
    """Stop device discovery and quit event loop"""
    adapter.StopDiscovery()
    mainloop.quit()


def clean_beacons():
    """
    BlueZ D-Bus API does not show duplicates. This is a
    workaround that removes devices that have been found
    during discovery
    """
    not_found = set()
    for rm_dev in remove_list:
        try:
            adapter.RemoveDevice(rm_dev)
        except GLib.Error as err:
            not_found.add(rm_dev)
    for lost in not_found:
        remove_list.remove(lost)


def process_eddystone(data):
    """Print Eddystone data in human readable format"""
    _url_prefix_scheme = ['http://www.', 'https://www.',
                          'http://', 'https://', ]
    _url_encoding = ['.com/', '.org/', '.edu/', '.net/', '.info/',
                     '.biz/', '.gov/', '.com', '.org', '.edu',
                     '.net', '.info', '.biz', '.gov']
    tx_pwr = int.from_bytes([data[1]], 'big', signed=True)
    # Eddystone UID Beacon format
    if data[0] == 0x00:
        namespace_id = int.from_bytes(data[2:12], 'big')
        instance_id = int.from_bytes(data[12:18], 'big')
        print(f'\t\tEddystone UID: {namespace_id} - {instance_id} \u2197 {tx_pwr}')
    # Eddystone URL beacon format
    elif data[0] == 0x10:
        prefix = data[2]
        encoded_url = data[3:]
        full_url = _url_prefix_scheme[prefix]
        for letter in encoded_url:
            if letter < len(_url_encoding):
                full_url += _url_encoding[letter]
            else:
                full_url += chr(letter)
        print(f'\t\tEddystone URL: {full_url} \u2197 {tx_pwr}')


def process_ibeacon(data, beacon_type='iBeacon'):
    """Print iBeacon data in human readable format"""
    print('DATA:', data)
    beacon_uuid = uuid.UUID(bytes=bytes(data[2:18]))
    major = int.from_bytes(bytearray(data[18:20]), 'big', signed=False)
    minor = int.from_bytes(bytearray(data[20:22]), 'big', signed=False)
    tx_pwr = int.from_bytes([data[22]], 'big', signed=True)
    print(f'\t\t{beacon_type}: {beacon_uuid} - {major} - {minor} \u2197 {tx_pwr}')


def ble_16bit_match(uuid_16, srv_data):
    """Expand 16 bit UUID to full 128 bit UUID"""
    uuid_128 = f'0000{uuid_16}-0000-1000-8000-00805f9b34fb'
    return uuid_128 == list(srv_data.keys())[0]


def on_iface_added(owner, path, iface, signal, interfaces_and_properties):
    """
    Event handler for D-Bus interface added.
    Test to see if it is a new Bluetooth device
    """
    iface_path, iface_props = interfaces_and_properties
    if DEVICE_INTERFACE in iface_props:
        on_device_found(iface_path, iface_props[DEVICE_INTERFACE])


def on_device_found(device_path, device_props):
    """
    Handle new Bluetooth device being discover.
    If it is a beacon of type iBeacon, Eddystone, AltBeacon
    then process it
    """
    address = device_props.get('Address')
    address_type = device_props.get('AddressType')
    name = device_props.get('Name')
    alias = device_props.get('Alias')
    paired = device_props.get('Paired')
    trusted = device_props.get('Trusted')
    rssi = device_props.get('RSSI')
    service_data = device_props.get('ServiceData')
    manufacturer_data = device_props.get('ManufacturerData')
    if address.casefold() == '00:c3:f4:f1:58:69':
        print('Found mac address of interest')
    if service_data and ble_16bit_match('feaa', service_data):
        process_eddystone(service_data['0000feaa-0000-1000-8000-00805f9b34fb'])
        remove_list.add(device_path)
    elif manufacturer_data:
        for mfg_id in manufacturer_data:
            # iBeacon 0x004c
            if mfg_id == 0x004c and manufacturer_data[mfg_id][0] == 0x02:
                process_ibeacon(manufacturer_data[mfg_id])
                remove_list.add(device_path)
            # AltBeacon 0xacbe
            elif mfg_id == 0xffff and manufacturer_data[mfg_id][0:2] == [0xbe, 0xac]:
                process_ibeacon(manufacturer_data[mfg_id], beacon_type='AltBeacon')
                remove_list.add(device_path)
    clean_beacons()


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('-d', '--duration', type=int, default=0,
                        help='Duration of scan [0 for continuous]')
    args = parser.parse_args()
    bus = SystemBus()
    adapter = bus.get('org.bluez', '/org/bluez/hci0')

    bus.subscribe(iface='org.freedesktop.DBus.ObjectManager',
                  signal='InterfacesAdded',
                  signal_fired=on_iface_added)

    mainloop = GLib.MainLoop()


    if args.duration > 0:
        GLib.timeout_add_seconds(args.duration, stop_scan)
    adapter.SetDiscoveryFilter({'DuplicateData': GLib.Variant.new_boolean(False)})
    adapter.StartDiscovery()

    try:
        print('\n\tUse CTRL-C to stop discovery\n')
        mainloop.run()
    except KeyboardInterrupt:
        stop_scan()