如何将 microbit 与 BLE 连接并监听按钮按下事件?
How do I connect microbit with BLE and listen for button press events?
11/28/2021 编辑:
如果您需要使用低功耗蓝牙将 microbit 连接到计算机,并在单击按钮时执行操作。直接跳转并按照下面 @ukBaz 的回答进行操作。
注意:该解决方案在 GNU/Linux 上完美运行,但在 Windows 上可能效果不佳。
下面是post的原题。我不会编辑它来隐藏我的错误。
总结:我有一个 microbit 连接到一个 rpi-zero。我对 microbit 进行了编码,当按下 A button
时,它将通过 uart.write
将数据发送到 rpi-zero。
在此测试中,microbit 将 uart.write("Test")
,将“测试”字写入 rpi-0。
我的最终目标是使用 rpi-zero 的 BLE 功能充当控制设备,并从 microbit 按钮发送指令。
我发现这个 GATT Server Code 写在 python 中用于 rpi。 运行 完全没有问题。
下面的代码将用于监听microbit uart服务并检查接收到的数据是否为"Test"
:
import serial
serialPort = serial.Serial(port = "/dev/ttyACM0", baudrate=115200, bytesize=8, timeout=0.5, stopbits=serial.STOPBITS_ONE)
serialString = " "
(serialPort.in_waiting > 0)
while True:
serialString = serialPort.readline()
if serialString == b'Test':
print("Yes")
else:
print("F")
但真正的问题是当我尝试将此循环代码实现到 GATT 服务器代码中时。
我似乎无法理解如何将此值传递给 self.send_tx
此外,似乎GATT服务器代码中已经有一个全局循环。因此,我尝试对 运行 这两个函数同时使用线程,但是当我添加 self.send_tx("Test")
时,它只会抛出错误 Self is not defined
.
很抱歉,我对编码一窍不通,有人知道解决这个问题的方法吗?谢谢
完整代码如下:
import sys
import threading
import dbus, dbus.mainloop.glib
import serial
from gi.repository import GLib
from example_advertisement import Advertisement
from example_advertisement import register_ad_cb, register_ad_error_cb
from example_gatt_server import Service, Characteristic
from example_gatt_server import register_app_cb, register_app_error_cb
BLUEZ_SERVICE_NAME = 'org.bluez'
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
UART_SERVICE_UUID = '6e400001-b5a3-f393-e0a9-e50e24dcca9e'
UART_RX_CHARACTERISTIC_UUID = '6e400002-b5a3-f393-e0a9-e50e24dcca9e'
UART_TX_CHARACTERISTIC_UUID = '6e400003-b5a3-f393-e0a9-e50e24dcca9e'
LOCAL_NAME = 'rpi-gatt-server'
mainloop = None
serialPort = serial.Serial(port = "/dev/ttyACM0", baudrate=115200, bytesize=8, timeout=0.8, stopbits=serial.STOPBITS_ONE)
serialString = " "
(serialPort.in_waiting > 0)
class TxCharacteristic(Characteristic):
def __init__(self, bus, index, service):
Characteristic.__init__(self, bus, index, UART_TX_CHARACTERISTIC_UUID,
['notify'], service)
self.notifying = False
GLib.io_add_watch(sys.stdin, GLib.IO_IN, self.on_console_input)
def on_console_input(self, fd, condition):
s = fd.readline()
if s.isspace():
pass
else:
self.send_tx(s)
return True
def send_tx(self, s):
if not self.notifying:
return
value = []
for c in s:
value.append(dbus.Byte(c.encode()))
self.PropertiesChanged(GATT_CHRC_IFACE, {'Value': value}, [])
def StartNotify(self):
if self.notifying:
print("yes")
return
self.notifying = True
def StopNotify(self):
if not self.notifying:
print("no")
return
self.notifying = False
class RxCharacteristic(Characteristic):
def __init__(self, bus, index, service):
Characteristic.__init__(self, bus, index, UART_RX_CHARACTERISTIC_UUID,
['write'], service)
def WriteValue(self, value, options):
print('remote: {}'.format(bytearray(value).decode()))
class UartService(Service):
def __init__(self, bus, index):
Service.__init__(self, bus, index, UART_SERVICE_UUID, True)
self.add_characteristic(TxCharacteristic(bus, 0, self))
self.add_characteristic(RxCharacteristic(bus, 1, self))
class Application(dbus.service.Object):
def __init__(self, bus):
self.path = '/'
self.services = []
dbus.service.Object.__init__(self, bus, self.path)
def get_path(self):
return dbus.ObjectPath(self.path)
def add_service(self, service):
self.services.append(service)
@dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
def GetManagedObjects(self):
response = {}
for service in self.services:
response[service.get_path()] = service.get_properties()
chrcs = service.get_characteristics()
for chrc in chrcs:
response[chrc.get_path()] = chrc.get_properties()
return response
class UartApplication(Application):
def __init__(self, bus):
Application.__init__(self, bus)
self.add_service(UartService(bus, 0))
class UartAdvertisement(Advertisement):
def __init__(self, bus, index):
Advertisement.__init__(self, bus, index, 'peripheral')
self.add_service_uuid(UART_SERVICE_UUID)
self.add_local_name(LOCAL_NAME)
self.include_tx_power = True
def find_adapter(bus):
remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
DBUS_OM_IFACE)
objects = remote_om.GetManagedObjects()
for o, props in objects.items():
if LE_ADVERTISING_MANAGER_IFACE in props and GATT_MANAGER_IFACE in props:
return o
print('Skip adapter:', o)
return None
def check():
while True:
serialString = serialPort.readline()
if serialString == b'Test':
print("Okay, Test")
self.send_tx("Test")
else:
print("No")
def main():
global mainloop
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
adapter = find_adapter(bus)
if not adapter:
print('BLE adapter not found')
return
service_manager = dbus.Interface(
bus.get_object(BLUEZ_SERVICE_NAME, adapter),
GATT_MANAGER_IFACE)
ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
LE_ADVERTISING_MANAGER_IFACE)
app = UartApplication(bus)
adv = UartAdvertisement(bus, 0)
mainloop = GLib.MainLoop()
service_manager.RegisterApplication(app.get_path(), {},
reply_handler=register_app_cb,
error_handler=register_app_error_cb)
ad_manager.RegisterAdvertisement(adv.get_path(), {},
reply_handler=register_ad_cb,
error_handler=register_ad_error_cb)
try:
mainloop.run()
except KeyboardInterrupt:
adv.Release()
if __name__ == '__main__':
p1 = threading.Thread(target=main)
p2 = threading.Thread(target=check)
p1.start()
p2.start()
我认为这可能是 XY problem。我的理解是,您想通过低功耗蓝牙 (BLE) 从 micro:bit 向 RPi 发送按钮按下。如果是这样的话,那么有一个更有效的方法来做到这一点。
在micro:bit的Bluetooth Profile中有一个按钮服务和一个按钮状态特征可以使用。他们的 UUID 是:
BTN_SRV = 'E95D9882-251D-470A-A062-FA1922DFA9A8'
BTN_A_STATE = 'E95DDA90-251D-470A-A062-FA1922DFA9A8'
您需要在 micro:bit 上设置 GATT 服务器。最有效的方法是使用 https://makecode.microbit.org/#editor 创建以下内容:
如果 left-hand 菜单上没有蓝牙,请单击屏幕右上角的齿轮,select Extensions
,然后单击 select Bluetooth
替换 Radio
扩展。
RPi 上的 GATT 客户端代码可以通过为 D-Bus 绑定使用 pydbus 库来简化。
使用蓝牙,而不是有一个 while
循环并不断轮询 micro:bit,有一个事件循环订阅来自(在这种情况下)按钮 A 的通知会更有效特征。每次按下或释放 micro:bit 按钮 A 时都会调用我命名为 btn_handler
的函数。
下面的代码不会在 micro:bit 和树莓派之间进行初始配对。由于配对是一个 one-off 配置步骤,所以我手动进行。
这是 RPi 的示例 python 代码,它响应按下 micro:bit 上的按钮 A...
from time import sleep
import pydbus
from gi.repository import GLib
DEVICE_ADDR = 'DE:82:35:E7:CE:BE' # micro:bit address
BTN_A_STATE = 'E95DDA90-251D-470A-A062-FA1922DFA9A8'
# DBus object paths
BLUEZ_SERVICE = 'org.bluez'
ADAPTER_PATH = '/org/bluez/hci0'
device_path = f"{ADAPTER_PATH}/dev_{DEVICE_ADDR.replace(':', '_')}"
# setup dbus
bus = pydbus.SystemBus()
mngr = bus.get(BLUEZ_SERVICE, '/')
adapter = bus.get(BLUEZ_SERVICE, ADAPTER_PATH)
device = bus.get(BLUEZ_SERVICE, device_path)
device.Connect()
while not device.ServicesResolved:
sleep(0.5)
def get_characteristic_path(dev_path, uuid):
"""Look up DBus path for characteristic UUID"""
mng_objs = mngr.GetManagedObjects()
for path in mng_objs:
chr_uuid = mng_objs[path].get('org.bluez.GattCharacteristic1', {}).get('UUID')
if path.startswith(dev_path) and chr_uuid == uuid.casefold():
return path
# Characteristic DBus information
btn_a_path = get_characteristic_path(device._path, BTN_A_STATE)
btn_a = bus.get(BLUEZ_SERVICE, btn_a_path)
# Read button A without event loop notifications
print(btn_a.ReadValue({}))
# Enable eventloop for notifications
def btn_handler(iface, prop_changed, prop_removed):
"""Notify event handler for button press"""
if 'Value' in prop_changed:
new_value = prop_changed['Value']
print(f"Button A state: {new_value}")
print(f'As byte: {bytes(new_value)}')
print(f'As bytearray: {bytearray(new_value)}')
print(f'As int: {int(new_value[0])}')
print(f'As bool: {bool(new_value[0])}')
mainloop = GLib.MainLoop()
btn_a.onPropertiesChanged = btn_handler
btn_a.StartNotify()
try:
mainloop.run()
except KeyboardInterrupt:
mainloop.quit()
btn_a.StopNotify()
device.Disconnect()
我创建了一个 Python 包,以便更轻松地连接到 micro:bit 的蓝牙服务:kaspersmicrobit. It works under linux and windows and comes with extensive documentation。
监听按钮事件可以这样进行:
def pressed(button):
print(f"button {button} pressed")
with KaspersMicrobit(MICROBIT_BLUETOOTH_ADDRESS) as microbit:
# listen for button events
microbit.buttons.on_button_a(press=pressed)
microbit.buttons.on_button_b(press=pressed)
time.sleep(15)
可以这样监听UART数据:
def print_received_string(string: str):
print(f"Received from microbit: '{string}'")
with KaspersMicrobit(MICROBIT_BLUETOOTH_ADDRESS) as microbit:
# listen for strings sent by the microbit
microbit.uart.receive_string(print_received_string)
# send a string to the microbit
microbit.uart.send_string("Hi, from python!\n")
time.sleep(25)
11/28/2021 编辑:
如果您需要使用低功耗蓝牙将 microbit 连接到计算机,并在单击按钮时执行操作。直接跳转并按照下面 @ukBaz 的回答进行操作。
注意:该解决方案在 GNU/Linux 上完美运行,但在 Windows 上可能效果不佳。
下面是post的原题。我不会编辑它来隐藏我的错误。
总结:我有一个 microbit 连接到一个 rpi-zero。我对 microbit 进行了编码,当按下 A button
时,它将通过 uart.write
将数据发送到 rpi-zero。
在此测试中,microbit 将 uart.write("Test")
,将“测试”字写入 rpi-0。
我的最终目标是使用 rpi-zero 的 BLE 功能充当控制设备,并从 microbit 按钮发送指令。
我发现这个 GATT Server Code 写在 python 中用于 rpi。 运行 完全没有问题。
下面的代码将用于监听microbit uart服务并检查接收到的数据是否为"Test"
:
import serial
serialPort = serial.Serial(port = "/dev/ttyACM0", baudrate=115200, bytesize=8, timeout=0.5, stopbits=serial.STOPBITS_ONE)
serialString = " "
(serialPort.in_waiting > 0)
while True:
serialString = serialPort.readline()
if serialString == b'Test':
print("Yes")
else:
print("F")
但真正的问题是当我尝试将此循环代码实现到 GATT 服务器代码中时。
我似乎无法理解如何将此值传递给 self.send_tx
此外,似乎GATT服务器代码中已经有一个全局循环。因此,我尝试对 运行 这两个函数同时使用线程,但是当我添加 self.send_tx("Test")
时,它只会抛出错误 Self is not defined
.
很抱歉,我对编码一窍不通,有人知道解决这个问题的方法吗?谢谢
完整代码如下:
import sys
import threading
import dbus, dbus.mainloop.glib
import serial
from gi.repository import GLib
from example_advertisement import Advertisement
from example_advertisement import register_ad_cb, register_ad_error_cb
from example_gatt_server import Service, Characteristic
from example_gatt_server import register_app_cb, register_app_error_cb
BLUEZ_SERVICE_NAME = 'org.bluez'
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
UART_SERVICE_UUID = '6e400001-b5a3-f393-e0a9-e50e24dcca9e'
UART_RX_CHARACTERISTIC_UUID = '6e400002-b5a3-f393-e0a9-e50e24dcca9e'
UART_TX_CHARACTERISTIC_UUID = '6e400003-b5a3-f393-e0a9-e50e24dcca9e'
LOCAL_NAME = 'rpi-gatt-server'
mainloop = None
serialPort = serial.Serial(port = "/dev/ttyACM0", baudrate=115200, bytesize=8, timeout=0.8, stopbits=serial.STOPBITS_ONE)
serialString = " "
(serialPort.in_waiting > 0)
class TxCharacteristic(Characteristic):
def __init__(self, bus, index, service):
Characteristic.__init__(self, bus, index, UART_TX_CHARACTERISTIC_UUID,
['notify'], service)
self.notifying = False
GLib.io_add_watch(sys.stdin, GLib.IO_IN, self.on_console_input)
def on_console_input(self, fd, condition):
s = fd.readline()
if s.isspace():
pass
else:
self.send_tx(s)
return True
def send_tx(self, s):
if not self.notifying:
return
value = []
for c in s:
value.append(dbus.Byte(c.encode()))
self.PropertiesChanged(GATT_CHRC_IFACE, {'Value': value}, [])
def StartNotify(self):
if self.notifying:
print("yes")
return
self.notifying = True
def StopNotify(self):
if not self.notifying:
print("no")
return
self.notifying = False
class RxCharacteristic(Characteristic):
def __init__(self, bus, index, service):
Characteristic.__init__(self, bus, index, UART_RX_CHARACTERISTIC_UUID,
['write'], service)
def WriteValue(self, value, options):
print('remote: {}'.format(bytearray(value).decode()))
class UartService(Service):
def __init__(self, bus, index):
Service.__init__(self, bus, index, UART_SERVICE_UUID, True)
self.add_characteristic(TxCharacteristic(bus, 0, self))
self.add_characteristic(RxCharacteristic(bus, 1, self))
class Application(dbus.service.Object):
def __init__(self, bus):
self.path = '/'
self.services = []
dbus.service.Object.__init__(self, bus, self.path)
def get_path(self):
return dbus.ObjectPath(self.path)
def add_service(self, service):
self.services.append(service)
@dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
def GetManagedObjects(self):
response = {}
for service in self.services:
response[service.get_path()] = service.get_properties()
chrcs = service.get_characteristics()
for chrc in chrcs:
response[chrc.get_path()] = chrc.get_properties()
return response
class UartApplication(Application):
def __init__(self, bus):
Application.__init__(self, bus)
self.add_service(UartService(bus, 0))
class UartAdvertisement(Advertisement):
def __init__(self, bus, index):
Advertisement.__init__(self, bus, index, 'peripheral')
self.add_service_uuid(UART_SERVICE_UUID)
self.add_local_name(LOCAL_NAME)
self.include_tx_power = True
def find_adapter(bus):
remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
DBUS_OM_IFACE)
objects = remote_om.GetManagedObjects()
for o, props in objects.items():
if LE_ADVERTISING_MANAGER_IFACE in props and GATT_MANAGER_IFACE in props:
return o
print('Skip adapter:', o)
return None
def check():
while True:
serialString = serialPort.readline()
if serialString == b'Test':
print("Okay, Test")
self.send_tx("Test")
else:
print("No")
def main():
global mainloop
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
adapter = find_adapter(bus)
if not adapter:
print('BLE adapter not found')
return
service_manager = dbus.Interface(
bus.get_object(BLUEZ_SERVICE_NAME, adapter),
GATT_MANAGER_IFACE)
ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
LE_ADVERTISING_MANAGER_IFACE)
app = UartApplication(bus)
adv = UartAdvertisement(bus, 0)
mainloop = GLib.MainLoop()
service_manager.RegisterApplication(app.get_path(), {},
reply_handler=register_app_cb,
error_handler=register_app_error_cb)
ad_manager.RegisterAdvertisement(adv.get_path(), {},
reply_handler=register_ad_cb,
error_handler=register_ad_error_cb)
try:
mainloop.run()
except KeyboardInterrupt:
adv.Release()
if __name__ == '__main__':
p1 = threading.Thread(target=main)
p2 = threading.Thread(target=check)
p1.start()
p2.start()
我认为这可能是 XY problem。我的理解是,您想通过低功耗蓝牙 (BLE) 从 micro:bit 向 RPi 发送按钮按下。如果是这样的话,那么有一个更有效的方法来做到这一点。
在micro:bit的Bluetooth Profile中有一个按钮服务和一个按钮状态特征可以使用。他们的 UUID 是:
BTN_SRV = 'E95D9882-251D-470A-A062-FA1922DFA9A8'
BTN_A_STATE = 'E95DDA90-251D-470A-A062-FA1922DFA9A8'
您需要在 micro:bit 上设置 GATT 服务器。最有效的方法是使用 https://makecode.microbit.org/#editor 创建以下内容:
如果 left-hand 菜单上没有蓝牙,请单击屏幕右上角的齿轮,select Extensions
,然后单击 select Bluetooth
替换 Radio
扩展。
RPi 上的 GATT 客户端代码可以通过为 D-Bus 绑定使用 pydbus 库来简化。
使用蓝牙,而不是有一个 while
循环并不断轮询 micro:bit,有一个事件循环订阅来自(在这种情况下)按钮 A 的通知会更有效特征。每次按下或释放 micro:bit 按钮 A 时都会调用我命名为 btn_handler
的函数。
下面的代码不会在 micro:bit 和树莓派之间进行初始配对。由于配对是一个 one-off 配置步骤,所以我手动进行。
这是 RPi 的示例 python 代码,它响应按下 micro:bit 上的按钮 A...
from time import sleep
import pydbus
from gi.repository import GLib
DEVICE_ADDR = 'DE:82:35:E7:CE:BE' # micro:bit address
BTN_A_STATE = 'E95DDA90-251D-470A-A062-FA1922DFA9A8'
# DBus object paths
BLUEZ_SERVICE = 'org.bluez'
ADAPTER_PATH = '/org/bluez/hci0'
device_path = f"{ADAPTER_PATH}/dev_{DEVICE_ADDR.replace(':', '_')}"
# setup dbus
bus = pydbus.SystemBus()
mngr = bus.get(BLUEZ_SERVICE, '/')
adapter = bus.get(BLUEZ_SERVICE, ADAPTER_PATH)
device = bus.get(BLUEZ_SERVICE, device_path)
device.Connect()
while not device.ServicesResolved:
sleep(0.5)
def get_characteristic_path(dev_path, uuid):
"""Look up DBus path for characteristic UUID"""
mng_objs = mngr.GetManagedObjects()
for path in mng_objs:
chr_uuid = mng_objs[path].get('org.bluez.GattCharacteristic1', {}).get('UUID')
if path.startswith(dev_path) and chr_uuid == uuid.casefold():
return path
# Characteristic DBus information
btn_a_path = get_characteristic_path(device._path, BTN_A_STATE)
btn_a = bus.get(BLUEZ_SERVICE, btn_a_path)
# Read button A without event loop notifications
print(btn_a.ReadValue({}))
# Enable eventloop for notifications
def btn_handler(iface, prop_changed, prop_removed):
"""Notify event handler for button press"""
if 'Value' in prop_changed:
new_value = prop_changed['Value']
print(f"Button A state: {new_value}")
print(f'As byte: {bytes(new_value)}')
print(f'As bytearray: {bytearray(new_value)}')
print(f'As int: {int(new_value[0])}')
print(f'As bool: {bool(new_value[0])}')
mainloop = GLib.MainLoop()
btn_a.onPropertiesChanged = btn_handler
btn_a.StartNotify()
try:
mainloop.run()
except KeyboardInterrupt:
mainloop.quit()
btn_a.StopNotify()
device.Disconnect()
我创建了一个 Python 包,以便更轻松地连接到 micro:bit 的蓝牙服务:kaspersmicrobit. It works under linux and windows and comes with extensive documentation。
监听按钮事件可以这样进行:
def pressed(button):
print(f"button {button} pressed")
with KaspersMicrobit(MICROBIT_BLUETOOTH_ADDRESS) as microbit:
# listen for button events
microbit.buttons.on_button_a(press=pressed)
microbit.buttons.on_button_b(press=pressed)
time.sleep(15)
可以这样监听UART数据:
def print_received_string(string: str):
print(f"Received from microbit: '{string}'")
with KaspersMicrobit(MICROBIT_BLUETOOTH_ADDRESS) as microbit:
# listen for strings sent by the microbit
microbit.uart.receive_string(print_received_string)
# send a string to the microbit
microbit.uart.send_string("Hi, from python!\n")
time.sleep(25)