主动 BLE 扫描 (BlueZ) - DBus 问题
Active BLE Scanning (BlueZ) - Issue with DBus
我已经启动了一个项目,我需要主动(始终)扫描 BLE 设备。我在 Linux,使用 Bluez 5.49,我使用 Python 与 dbus 1.10.20 通信)。
我能够开始扫描,使用 bluetoothctl 停止扫描并通过 DBus(BlueZ 接口的 GetManagedObjects())获取 BLE 广告数据。我遇到的问题是,当我让扫描持续数小时时,dbus-deamon 开始占用越来越多的 RAM,我无法找到如何“刷新”dbus 从 BlueZ 收集的内容。最终 RAM 变满了,Linux 不高兴。
所以我尽量不一直扫描,这样可能会让垃圾收集器进行清理。没用。
我编辑了 /etc/dbus-1/system.d/bluetooth.conf 以删除我不需要的任何界面
<policy user="root">
<allow own="org.bluez"/>
<allow send_destination="org.bluez"/>
</policy>
这减缓了 RAM 的积累,但没有解决问题。
我找到了一种方法来检查哪个连接有字节等待并确认它来自 blueZ
Connection :1.74 with pid 3622 '/usr/libexec/bluetooth/bluetoothd --experimental ' (org.bluez):
IncomingBytes=1253544
PeakIncomingBytes=1313072
OutgoingBytes=0
PeakOutgoingBytes=210
最后,我发现有人需要读取 DBus 中等待的内容才能释放内存。所以我发现了这个:
我收到了 BlueZ 发送过来的数据,但内存仍在累积。
我知道释放 dbus 的唯一方法是重新启动 linux。这并不理想。
我对 DBus 的理解即将结束,这就是我今天来到这里的原因。
如果您有任何见解可以帮助我从 BlueZ 消息中释放 dbus,将不胜感激。
提前致谢
编辑 添加我用来读取已发现设备的 DBus 代码:
#!/usr/bin/python3
import dbus
BLUEZ_SERVICE_NAME = "org.bluez"
DBUS_OM_IFACE = "org.freedesktop.DBus.ObjectManager"
DEVICES_IFACE = "org.bluez.Device1"
def main_loop(subproc):
devinfo = None
objects = None
dbussys = dbus.SystemBus()
dbusconnection = dbussys.get_object(BLUEZ_SERVICE_NAME, "/")
bluezInterface = dbus.Interface(dbusconnection, DBUS_OM_IFACE)
while True:
try:
objects = bluezInterface.GetManagedObjects()
except dbus.DBusException as err:
print("dbus Error : " + str(err))
pass
all_devices = (str(path) for path, interfaces in objects.items() if DEVICES_IFACE in interfaces.keys())
for path, interfaces in objects.items():
if "org.bluez.Adapter1" not in interfaces.keys():
continue
device_list = [d for d in all_devices if d.startswith(path + "/")]
for dev_path in device_list:
properties = objects[dev_path][DEVICES_IFACE]
if "ServiceData" in properties.keys() and "Name" in properties.keys() and "RSSI" in properties.keys():
#[... Do someting...]
确实,当您停止发现时,Bluez 会刷新内存。因此,为了连续扫描,您需要始终启动和停止发现。我发现 6 秒,等待 1 秒,然后再次开始发现 6 秒……依此类推。如果您检查日志,您会看到它在停止发现时删除了很多东西。
我无法真正准确地重现您的错误,但我的系统不满意 运行 如此快速的 while 循环重复从 GetManagedObjects 获取数据。
下面是我 运行 基于您的代码并进行了一些重构的代码...
import dbus
BLUEZ_SERVICE_NAME = "org.bluez"
DBUS_OM_IFACE = "org.freedesktop.DBus.ObjectManager"
ADAPTER_IFACE = "org.bluez.Adapter1"
DEVICES_IFACE = "org.bluez.Device1"
def main_loop():
devinfo = None
objects = None
dbussys = dbus.SystemBus()
dbusconnection = dbussys.get_object(BLUEZ_SERVICE_NAME, "/")
bluezInterface = dbus.Interface(dbusconnection, DBUS_OM_IFACE)
while True:
objects = bluezInterface.GetManagedObjects()
for path in objects:
name = objects[path].get(DEVICES_IFACE, {}).get('Name')
rssi = objects[path].get(DEVICES_IFACE, {}).get('RSSI')
service_data = objects[path].get(DEVICES_IFACE, {}).get('ServiceData')
if all((name, rssi, service_data)):
print(f'{name} @ {rssi} = {service_data}')
#[... Do someting...]
if __name__ == '__main__':
main_loop()
我不确定您在更广泛的项目中尝试做什么,但如果我可以提出一些建议...
一种更典型的扫描 service/manufacturer 数据的方法是订阅 D-Bus 中的信号,当感兴趣的事情发生时触发回调。
下面是我用来查找 iBeacon 和 Eddystone 信标的一些代码。这使用 GLib 事件循环运行,这可能是您已经排除的但在资源上更有效。
它确实使用了不同的 Python dbus 绑定,因为我发现 pydbus
更“pythonic”。
我留下了处理信标的代码,因为它可能是一个有用的参考。
import argparse
from gi.repository import GLib
from pydbus import SystemBus
import uuid
DEVICE_INTERFACE = 'org.bluez.Device1'
remove_list = set()
def stop_scan():
"""Stop device discovery and quit event loop"""
adapter.StopDiscovery()
mainloop.quit()
def clean_beacons():
"""
BlueZ D-Bus API does not show duplicates. This is a
workaround that removes devices that have been found
during discovery
"""
not_found = set()
for rm_dev in remove_list:
try:
adapter.RemoveDevice(rm_dev)
except GLib.Error as err:
not_found.add(rm_dev)
for lost in not_found:
remove_list.remove(lost)
def process_eddystone(data):
"""Print Eddystone data in human readable format"""
_url_prefix_scheme = ['http://www.', 'https://www.',
'http://', 'https://', ]
_url_encoding = ['.com/', '.org/', '.edu/', '.net/', '.info/',
'.biz/', '.gov/', '.com', '.org', '.edu',
'.net', '.info', '.biz', '.gov']
tx_pwr = int.from_bytes([data[1]], 'big', signed=True)
# Eddystone UID Beacon format
if data[0] == 0x00:
namespace_id = int.from_bytes(data[2:12], 'big')
instance_id = int.from_bytes(data[12:18], 'big')
print(f'\t\tEddystone UID: {namespace_id} - {instance_id} \u2197 {tx_pwr}')
# Eddystone URL beacon format
elif data[0] == 0x10:
prefix = data[2]
encoded_url = data[3:]
full_url = _url_prefix_scheme[prefix]
for letter in encoded_url:
if letter < len(_url_encoding):
full_url += _url_encoding[letter]
else:
full_url += chr(letter)
print(f'\t\tEddystone URL: {full_url} \u2197 {tx_pwr}')
def process_ibeacon(data, beacon_type='iBeacon'):
"""Print iBeacon data in human readable format"""
print('DATA:', data)
beacon_uuid = uuid.UUID(bytes=bytes(data[2:18]))
major = int.from_bytes(bytearray(data[18:20]), 'big', signed=False)
minor = int.from_bytes(bytearray(data[20:22]), 'big', signed=False)
tx_pwr = int.from_bytes([data[22]], 'big', signed=True)
print(f'\t\t{beacon_type}: {beacon_uuid} - {major} - {minor} \u2197 {tx_pwr}')
def ble_16bit_match(uuid_16, srv_data):
"""Expand 16 bit UUID to full 128 bit UUID"""
uuid_128 = f'0000{uuid_16}-0000-1000-8000-00805f9b34fb'
return uuid_128 == list(srv_data.keys())[0]
def on_iface_added(owner, path, iface, signal, interfaces_and_properties):
"""
Event handler for D-Bus interface added.
Test to see if it is a new Bluetooth device
"""
iface_path, iface_props = interfaces_and_properties
if DEVICE_INTERFACE in iface_props:
on_device_found(iface_path, iface_props[DEVICE_INTERFACE])
def on_device_found(device_path, device_props):
"""
Handle new Bluetooth device being discover.
If it is a beacon of type iBeacon, Eddystone, AltBeacon
then process it
"""
address = device_props.get('Address')
address_type = device_props.get('AddressType')
name = device_props.get('Name')
alias = device_props.get('Alias')
paired = device_props.get('Paired')
trusted = device_props.get('Trusted')
rssi = device_props.get('RSSI')
service_data = device_props.get('ServiceData')
manufacturer_data = device_props.get('ManufacturerData')
if address.casefold() == '00:c3:f4:f1:58:69':
print('Found mac address of interest')
if service_data and ble_16bit_match('feaa', service_data):
process_eddystone(service_data['0000feaa-0000-1000-8000-00805f9b34fb'])
remove_list.add(device_path)
elif manufacturer_data:
for mfg_id in manufacturer_data:
# iBeacon 0x004c
if mfg_id == 0x004c and manufacturer_data[mfg_id][0] == 0x02:
process_ibeacon(manufacturer_data[mfg_id])
remove_list.add(device_path)
# AltBeacon 0xacbe
elif mfg_id == 0xffff and manufacturer_data[mfg_id][0:2] == [0xbe, 0xac]:
process_ibeacon(manufacturer_data[mfg_id], beacon_type='AltBeacon')
remove_list.add(device_path)
clean_beacons()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-d', '--duration', type=int, default=0,
help='Duration of scan [0 for continuous]')
args = parser.parse_args()
bus = SystemBus()
adapter = bus.get('org.bluez', '/org/bluez/hci0')
bus.subscribe(iface='org.freedesktop.DBus.ObjectManager',
signal='InterfacesAdded',
signal_fired=on_iface_added)
mainloop = GLib.MainLoop()
if args.duration > 0:
GLib.timeout_add_seconds(args.duration, stop_scan)
adapter.SetDiscoveryFilter({'DuplicateData': GLib.Variant.new_boolean(False)})
adapter.StartDiscovery()
try:
print('\n\tUse CTRL-C to stop discovery\n')
mainloop.run()
except KeyboardInterrupt:
stop_scan()
我已经启动了一个项目,我需要主动(始终)扫描 BLE 设备。我在 Linux,使用 Bluez 5.49,我使用 Python 与 dbus 1.10.20 通信)。 我能够开始扫描,使用 bluetoothctl 停止扫描并通过 DBus(BlueZ 接口的 GetManagedObjects())获取 BLE 广告数据。我遇到的问题是,当我让扫描持续数小时时,dbus-deamon 开始占用越来越多的 RAM,我无法找到如何“刷新”dbus 从 BlueZ 收集的内容。最终 RAM 变满了,Linux 不高兴。
所以我尽量不一直扫描,这样可能会让垃圾收集器进行清理。没用。
我编辑了 /etc/dbus-1/system.d/bluetooth.conf 以删除我不需要的任何界面
<policy user="root">
<allow own="org.bluez"/>
<allow send_destination="org.bluez"/>
</policy>
这减缓了 RAM 的积累,但没有解决问题。
我找到了一种方法来检查哪个连接有字节等待并确认它来自 blueZ
Connection :1.74 with pid 3622 '/usr/libexec/bluetooth/bluetoothd --experimental ' (org.bluez):
IncomingBytes=1253544
PeakIncomingBytes=1313072
OutgoingBytes=0
PeakOutgoingBytes=210
最后,我发现有人需要读取 DBus 中等待的内容才能释放内存。所以我发现了这个:
我收到了 BlueZ 发送过来的数据,但内存仍在累积。
我知道释放 dbus 的唯一方法是重新启动 linux。这并不理想。
我对 DBus 的理解即将结束,这就是我今天来到这里的原因。 如果您有任何见解可以帮助我从 BlueZ 消息中释放 dbus,将不胜感激。
提前致谢
编辑 添加我用来读取已发现设备的 DBus 代码:
#!/usr/bin/python3
import dbus
BLUEZ_SERVICE_NAME = "org.bluez"
DBUS_OM_IFACE = "org.freedesktop.DBus.ObjectManager"
DEVICES_IFACE = "org.bluez.Device1"
def main_loop(subproc):
devinfo = None
objects = None
dbussys = dbus.SystemBus()
dbusconnection = dbussys.get_object(BLUEZ_SERVICE_NAME, "/")
bluezInterface = dbus.Interface(dbusconnection, DBUS_OM_IFACE)
while True:
try:
objects = bluezInterface.GetManagedObjects()
except dbus.DBusException as err:
print("dbus Error : " + str(err))
pass
all_devices = (str(path) for path, interfaces in objects.items() if DEVICES_IFACE in interfaces.keys())
for path, interfaces in objects.items():
if "org.bluez.Adapter1" not in interfaces.keys():
continue
device_list = [d for d in all_devices if d.startswith(path + "/")]
for dev_path in device_list:
properties = objects[dev_path][DEVICES_IFACE]
if "ServiceData" in properties.keys() and "Name" in properties.keys() and "RSSI" in properties.keys():
#[... Do someting...]
确实,当您停止发现时,Bluez 会刷新内存。因此,为了连续扫描,您需要始终启动和停止发现。我发现 6 秒,等待 1 秒,然后再次开始发现 6 秒……依此类推。如果您检查日志,您会看到它在停止发现时删除了很多东西。
我无法真正准确地重现您的错误,但我的系统不满意 运行 如此快速的 while 循环重复从 GetManagedObjects 获取数据。 下面是我 运行 基于您的代码并进行了一些重构的代码...
import dbus
BLUEZ_SERVICE_NAME = "org.bluez"
DBUS_OM_IFACE = "org.freedesktop.DBus.ObjectManager"
ADAPTER_IFACE = "org.bluez.Adapter1"
DEVICES_IFACE = "org.bluez.Device1"
def main_loop():
devinfo = None
objects = None
dbussys = dbus.SystemBus()
dbusconnection = dbussys.get_object(BLUEZ_SERVICE_NAME, "/")
bluezInterface = dbus.Interface(dbusconnection, DBUS_OM_IFACE)
while True:
objects = bluezInterface.GetManagedObjects()
for path in objects:
name = objects[path].get(DEVICES_IFACE, {}).get('Name')
rssi = objects[path].get(DEVICES_IFACE, {}).get('RSSI')
service_data = objects[path].get(DEVICES_IFACE, {}).get('ServiceData')
if all((name, rssi, service_data)):
print(f'{name} @ {rssi} = {service_data}')
#[... Do someting...]
if __name__ == '__main__':
main_loop()
我不确定您在更广泛的项目中尝试做什么,但如果我可以提出一些建议...
一种更典型的扫描 service/manufacturer 数据的方法是订阅 D-Bus 中的信号,当感兴趣的事情发生时触发回调。
下面是我用来查找 iBeacon 和 Eddystone 信标的一些代码。这使用 GLib 事件循环运行,这可能是您已经排除的但在资源上更有效。
它确实使用了不同的 Python dbus 绑定,因为我发现 pydbus
更“pythonic”。
我留下了处理信标的代码,因为它可能是一个有用的参考。
import argparse
from gi.repository import GLib
from pydbus import SystemBus
import uuid
DEVICE_INTERFACE = 'org.bluez.Device1'
remove_list = set()
def stop_scan():
"""Stop device discovery and quit event loop"""
adapter.StopDiscovery()
mainloop.quit()
def clean_beacons():
"""
BlueZ D-Bus API does not show duplicates. This is a
workaround that removes devices that have been found
during discovery
"""
not_found = set()
for rm_dev in remove_list:
try:
adapter.RemoveDevice(rm_dev)
except GLib.Error as err:
not_found.add(rm_dev)
for lost in not_found:
remove_list.remove(lost)
def process_eddystone(data):
"""Print Eddystone data in human readable format"""
_url_prefix_scheme = ['http://www.', 'https://www.',
'http://', 'https://', ]
_url_encoding = ['.com/', '.org/', '.edu/', '.net/', '.info/',
'.biz/', '.gov/', '.com', '.org', '.edu',
'.net', '.info', '.biz', '.gov']
tx_pwr = int.from_bytes([data[1]], 'big', signed=True)
# Eddystone UID Beacon format
if data[0] == 0x00:
namespace_id = int.from_bytes(data[2:12], 'big')
instance_id = int.from_bytes(data[12:18], 'big')
print(f'\t\tEddystone UID: {namespace_id} - {instance_id} \u2197 {tx_pwr}')
# Eddystone URL beacon format
elif data[0] == 0x10:
prefix = data[2]
encoded_url = data[3:]
full_url = _url_prefix_scheme[prefix]
for letter in encoded_url:
if letter < len(_url_encoding):
full_url += _url_encoding[letter]
else:
full_url += chr(letter)
print(f'\t\tEddystone URL: {full_url} \u2197 {tx_pwr}')
def process_ibeacon(data, beacon_type='iBeacon'):
"""Print iBeacon data in human readable format"""
print('DATA:', data)
beacon_uuid = uuid.UUID(bytes=bytes(data[2:18]))
major = int.from_bytes(bytearray(data[18:20]), 'big', signed=False)
minor = int.from_bytes(bytearray(data[20:22]), 'big', signed=False)
tx_pwr = int.from_bytes([data[22]], 'big', signed=True)
print(f'\t\t{beacon_type}: {beacon_uuid} - {major} - {minor} \u2197 {tx_pwr}')
def ble_16bit_match(uuid_16, srv_data):
"""Expand 16 bit UUID to full 128 bit UUID"""
uuid_128 = f'0000{uuid_16}-0000-1000-8000-00805f9b34fb'
return uuid_128 == list(srv_data.keys())[0]
def on_iface_added(owner, path, iface, signal, interfaces_and_properties):
"""
Event handler for D-Bus interface added.
Test to see if it is a new Bluetooth device
"""
iface_path, iface_props = interfaces_and_properties
if DEVICE_INTERFACE in iface_props:
on_device_found(iface_path, iface_props[DEVICE_INTERFACE])
def on_device_found(device_path, device_props):
"""
Handle new Bluetooth device being discover.
If it is a beacon of type iBeacon, Eddystone, AltBeacon
then process it
"""
address = device_props.get('Address')
address_type = device_props.get('AddressType')
name = device_props.get('Name')
alias = device_props.get('Alias')
paired = device_props.get('Paired')
trusted = device_props.get('Trusted')
rssi = device_props.get('RSSI')
service_data = device_props.get('ServiceData')
manufacturer_data = device_props.get('ManufacturerData')
if address.casefold() == '00:c3:f4:f1:58:69':
print('Found mac address of interest')
if service_data and ble_16bit_match('feaa', service_data):
process_eddystone(service_data['0000feaa-0000-1000-8000-00805f9b34fb'])
remove_list.add(device_path)
elif manufacturer_data:
for mfg_id in manufacturer_data:
# iBeacon 0x004c
if mfg_id == 0x004c and manufacturer_data[mfg_id][0] == 0x02:
process_ibeacon(manufacturer_data[mfg_id])
remove_list.add(device_path)
# AltBeacon 0xacbe
elif mfg_id == 0xffff and manufacturer_data[mfg_id][0:2] == [0xbe, 0xac]:
process_ibeacon(manufacturer_data[mfg_id], beacon_type='AltBeacon')
remove_list.add(device_path)
clean_beacons()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-d', '--duration', type=int, default=0,
help='Duration of scan [0 for continuous]')
args = parser.parse_args()
bus = SystemBus()
adapter = bus.get('org.bluez', '/org/bluez/hci0')
bus.subscribe(iface='org.freedesktop.DBus.ObjectManager',
signal='InterfacesAdded',
signal_fired=on_iface_added)
mainloop = GLib.MainLoop()
if args.duration > 0:
GLib.timeout_add_seconds(args.duration, stop_scan)
adapter.SetDiscoveryFilter({'DuplicateData': GLib.Variant.new_boolean(False)})
adapter.StartDiscovery()
try:
print('\n\tUse CTRL-C to stop discovery\n')
mainloop.run()
except KeyboardInterrupt:
stop_scan()