如何使用 dbus API 连接到蓝牙配置文件
How to connect to a bluetooth profile using dbus APIs
我有一个 python3 脚本,它使用旧式蓝牙成功打开到服务器的 RFCOMM 套接字。我正在尝试使用 dbus 完成同样的事情,这就是我读到的你现在应该在 Linux 上使用蓝牙的方式。 (这是对用 C 编写的 Linux 应用程序进行重大更改的概念验证。)
当我 运行 下面的脚本时,我看到了这个:
connecting...
ex from ConnectProfile(): g-io-error-quark: GDBus.Error:org.bluez.Error.NotAvailable: Operation currently not available (36)
onPropertiesChanged( org.bluez.Device1 {'Connected': True} [] )
onPropertiesChanged( org.bluez.Device1 {'ServicesResolved': True} [] )
onPropertiesChanged( org.bluez.Device1 {'ServicesResolved': False, 'Connected': False} [] )
请注意,属性 更改发生在对 ConnectProfile 的调用失败之后。我看到建议我应该从 属性-changed 回调中打开一个 RFCOMM 套接字,利用连接打开的时刻。但是服务器端(我在 github 上使用了出色的 bluez-rfcomm-example)dbus/bluez 负责创建套接字:您只需传递一个文件描述符。我希望 ConnectProfile 的工作方式类似,但找不到任何示例。
我应该如何修改我的 new_style() 函数以便它给我一个可用的套接字?
谢谢,
--埃里克
#!/usr/bin/env python3
# for new_style()
from pydbus import SystemBus
from gi.repository import GLib
# for old_style()
import bluetooth
PROFILE = 'b079b640-35fe-11e5-a432-0002a5d5c51b'
ADDR = 'AA:BB:CC:DD:EE:FF'
# Works fine. But you're supposed to use dbus these days
def old_style():
service_matches = bluetooth.find_service(uuid=PROFILE, address=ADDR)
if len(service_matches):
first_match = service_matches[0]
port = first_match['port']
host = first_match['host']
sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
sock.connect((host, port))
while True:
data = input()
if not data:
break
sock.send(data)
sock.close()
# Does not work. First an exception fires:
# g-io-error-quark: GDBus.Error:org.bluez.Error.NotAvailable: Operation currently not available (36)
# then onPropertiesChanged lists stuff -- after the failure, not during the connection attempt.
def new_style():
nucky = SystemBus().get('org.bluez', '/org/bluez/hci0/dev_' + ADDR.replace(':', '_'))
# Callback: (s, a{sv}, as)
nucky.onPropertiesChanged = lambda p1, p2, p3: print('onPropertiesChanged(', p1, p2, p3, ')')
def try_connect():
print('connecting...')
try:
nucky.ConnectProfile(PROFILE)
except Exception as ex:
print('ex from ConnectProfile():', ex)
GLib.timeout_add( 250, try_connect )
GLib.MainLoop().run()
if False:
old_style()
else:
new_style()
(稍后添加)
让我澄清一下我的问题。在 Linux 框中,我正在 运行 设置一个 bluez-rfcomm-example 服务器,我将其修改为使用自定义服务 UUID。它可能会创建一个服务记录,但在客户端 (Android) 端,这三行 Java 足以连接到它的套接字(假设服务器有蓝牙 mac AA:BB:CC:DD:EE:FF 并且两者配对):
BluetoothDevice remote = BluetoothAdapter.getDefaultAdapter().getRemoteDevice( "AA:BB:CC:DD:EE:FF" );
BluetoothSocket socket = remote.createRfcommSocketToServiceRecord( MY_SERVICE_UUID );
socket.connect();
有没有一种方法可以在 Linux 上使用 dbus/bluez 做到这一点,而且非常接近这个简单?我假设 Device1/ConnectProfile(UUID) 是我想要的——它与 createRfcommSocketToServiceRecord() 相同——但这个假设可能是完全错误的!这甚至可以从 Linux 使用 blues/dbus 实现吗?还是我应该坚持使用旧方法?
谢谢,对于模糊的初始问题感到抱歉。
--埃里克
有一篇比较 pybluez 和使用 Python 3 个套接字的好(如果现在有点旧)博客:
https://blog.kevindoran.co/bluetooth-programming-with-python-3/
如果您想使用 BlueZ D-Bus API,那么关键文档是:
https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/doc/profile-api.txt
BlueZ 示例位于:
https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/test/test-profile
使用 pydbus 创建它有一些问题,记录在:https://github.com/LEW21/pydbus/issues/54
下面的代码用于获取和使用 rfcomm 套接字来连接到由 UUID 指定的远程服务。我接受的答案由 ukBaz 提供,包括我需要的所有内容,但我没有足够的背景知识来立即理解它。我是对的,调用 ConnectProfile() 是开始的方式,但错过了两件事:
- 出于两个原因,有必要在调用方提供配置文件。首先,它提供了一个回调函数,您可以通过回调函数获取套接字。但是没有它——没有具体的 NewConnection 方法——连接失败(ConnectProfile() return 是一个错误。)
- 我需要在后台线程上调用 ConnectProfile()。回调将进入 glib 循环的主线程,因此 ConnectProfile() 不会 return 直到连接成功或失败,不得阻塞该线程!
不同的蓝牙连接类型可能需要略微不同的机制,但无论如何对于 RFCOMM 套接字连接来说,这就是诀窍。
#!/usr/bin/env python3
import socket, threading
from pydbus import SystemBus
from gi.repository import GLib
import dbus, dbus.service, dbus.mainloop.glib
CUSTOM_UUID = 'b079b640-35fe-11e5-a432-0002a5d5c51b'
ADDR = 'AA:BB:CC:DD:EE:FF'
PATH = '/org/neednt/match/remote'
class Profile(dbus.service.Object):
@dbus.service.method("org.bluez.Profile1",
in_signature="oha{sv}", out_signature="")
def NewConnection(self, path, fd, properties):
None
print('NewConnection: fd:', fd);
try:
self.socket = socket.socket(fileno=fd.take())
print('got socket:', self.socket)
self.socket.send(b"You there?")
except Exception as ex:
print('ex:', ex)
def connect_thread_main():
print('connect_thread_main()...')
SystemBus().get('org.bluez', '/org/bluez/hci0/dev_' + ADDR.replace(':', '_')) \
.ConnectProfile(CUSTOM_UUID)
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
Profile(bus, PATH) # added by side-effect apparently
dbus.Interface(bus.get_object("org.bluez","/org/bluez"),
"org.bluez.ProfileManager1") \
.RegisterProfile(PATH, CUSTOM_UUID, {})
threading.Thread(target=connect_thread_main).start()
GLib.MainLoop().run()
我有一个 python3 脚本,它使用旧式蓝牙成功打开到服务器的 RFCOMM 套接字。我正在尝试使用 dbus 完成同样的事情,这就是我读到的你现在应该在 Linux 上使用蓝牙的方式。 (这是对用 C 编写的 Linux 应用程序进行重大更改的概念验证。)
当我 运行 下面的脚本时,我看到了这个:
connecting...
ex from ConnectProfile(): g-io-error-quark: GDBus.Error:org.bluez.Error.NotAvailable: Operation currently not available (36)
onPropertiesChanged( org.bluez.Device1 {'Connected': True} [] )
onPropertiesChanged( org.bluez.Device1 {'ServicesResolved': True} [] )
onPropertiesChanged( org.bluez.Device1 {'ServicesResolved': False, 'Connected': False} [] )
请注意,属性 更改发生在对 ConnectProfile 的调用失败之后。我看到建议我应该从 属性-changed 回调中打开一个 RFCOMM 套接字,利用连接打开的时刻。但是服务器端(我在 github 上使用了出色的 bluez-rfcomm-example)dbus/bluez 负责创建套接字:您只需传递一个文件描述符。我希望 ConnectProfile 的工作方式类似,但找不到任何示例。
我应该如何修改我的 new_style() 函数以便它给我一个可用的套接字?
谢谢,
--埃里克
#!/usr/bin/env python3
# for new_style()
from pydbus import SystemBus
from gi.repository import GLib
# for old_style()
import bluetooth
PROFILE = 'b079b640-35fe-11e5-a432-0002a5d5c51b'
ADDR = 'AA:BB:CC:DD:EE:FF'
# Works fine. But you're supposed to use dbus these days
def old_style():
service_matches = bluetooth.find_service(uuid=PROFILE, address=ADDR)
if len(service_matches):
first_match = service_matches[0]
port = first_match['port']
host = first_match['host']
sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
sock.connect((host, port))
while True:
data = input()
if not data:
break
sock.send(data)
sock.close()
# Does not work. First an exception fires:
# g-io-error-quark: GDBus.Error:org.bluez.Error.NotAvailable: Operation currently not available (36)
# then onPropertiesChanged lists stuff -- after the failure, not during the connection attempt.
def new_style():
nucky = SystemBus().get('org.bluez', '/org/bluez/hci0/dev_' + ADDR.replace(':', '_'))
# Callback: (s, a{sv}, as)
nucky.onPropertiesChanged = lambda p1, p2, p3: print('onPropertiesChanged(', p1, p2, p3, ')')
def try_connect():
print('connecting...')
try:
nucky.ConnectProfile(PROFILE)
except Exception as ex:
print('ex from ConnectProfile():', ex)
GLib.timeout_add( 250, try_connect )
GLib.MainLoop().run()
if False:
old_style()
else:
new_style()
(稍后添加)
让我澄清一下我的问题。在 Linux 框中,我正在 运行 设置一个 bluez-rfcomm-example 服务器,我将其修改为使用自定义服务 UUID。它可能会创建一个服务记录,但在客户端 (Android) 端,这三行 Java 足以连接到它的套接字(假设服务器有蓝牙 mac AA:BB:CC:DD:EE:FF 并且两者配对):
BluetoothDevice remote = BluetoothAdapter.getDefaultAdapter().getRemoteDevice( "AA:BB:CC:DD:EE:FF" );
BluetoothSocket socket = remote.createRfcommSocketToServiceRecord( MY_SERVICE_UUID );
socket.connect();
有没有一种方法可以在 Linux 上使用 dbus/bluez 做到这一点,而且非常接近这个简单?我假设 Device1/ConnectProfile(UUID) 是我想要的——它与 createRfcommSocketToServiceRecord() 相同——但这个假设可能是完全错误的!这甚至可以从 Linux 使用 blues/dbus 实现吗?还是我应该坚持使用旧方法?
谢谢,对于模糊的初始问题感到抱歉。
--埃里克
有一篇比较 pybluez 和使用 Python 3 个套接字的好(如果现在有点旧)博客: https://blog.kevindoran.co/bluetooth-programming-with-python-3/
如果您想使用 BlueZ D-Bus API,那么关键文档是: https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/doc/profile-api.txt
BlueZ 示例位于: https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/test/test-profile
使用 pydbus 创建它有一些问题,记录在:https://github.com/LEW21/pydbus/issues/54
下面的代码用于获取和使用 rfcomm 套接字来连接到由 UUID 指定的远程服务。我接受的答案由 ukBaz 提供,包括我需要的所有内容,但我没有足够的背景知识来立即理解它。我是对的,调用 ConnectProfile() 是开始的方式,但错过了两件事:
- 出于两个原因,有必要在调用方提供配置文件。首先,它提供了一个回调函数,您可以通过回调函数获取套接字。但是没有它——没有具体的 NewConnection 方法——连接失败(ConnectProfile() return 是一个错误。)
- 我需要在后台线程上调用 ConnectProfile()。回调将进入 glib 循环的主线程,因此 ConnectProfile() 不会 return 直到连接成功或失败,不得阻塞该线程!
不同的蓝牙连接类型可能需要略微不同的机制,但无论如何对于 RFCOMM 套接字连接来说,这就是诀窍。
#!/usr/bin/env python3
import socket, threading
from pydbus import SystemBus
from gi.repository import GLib
import dbus, dbus.service, dbus.mainloop.glib
CUSTOM_UUID = 'b079b640-35fe-11e5-a432-0002a5d5c51b'
ADDR = 'AA:BB:CC:DD:EE:FF'
PATH = '/org/neednt/match/remote'
class Profile(dbus.service.Object):
@dbus.service.method("org.bluez.Profile1",
in_signature="oha{sv}", out_signature="")
def NewConnection(self, path, fd, properties):
None
print('NewConnection: fd:', fd);
try:
self.socket = socket.socket(fileno=fd.take())
print('got socket:', self.socket)
self.socket.send(b"You there?")
except Exception as ex:
print('ex:', ex)
def connect_thread_main():
print('connect_thread_main()...')
SystemBus().get('org.bluez', '/org/bluez/hci0/dev_' + ADDR.replace(':', '_')) \
.ConnectProfile(CUSTOM_UUID)
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
Profile(bus, PATH) # added by side-effect apparently
dbus.Interface(bus.get_object("org.bluez","/org/bluez"),
"org.bluez.ProfileManager1") \
.RegisterProfile(PATH, CUSTOM_UUID, {})
threading.Thread(target=connect_thread_main).start()
GLib.MainLoop().run()