Python 使用 Bluez-obex 和 pydbus 的 opp obex 服务器?
Python opp obex server using Bluez-obex and pydbus?
对于我的项目,我需要通过蓝牙下载 .zip 文件。我为此使用 python 和 obex。我有一个可用的 python opp 客户端实现(向 ukBaz 求助),但我仍在使用 os 启动服务器,所以我的服务器程序的其余部分不知道何时收到文件。程序的其余部分完成了,但是没有服务器的输出,我无法触发解压缩功能到运行。我已经阅读了 bluez-obex API 文档,似乎没有关于启动服务器的任何内容。还有另一种方法吗?我的代码如下。
os.system("sudo obexpushd -B -o /home/pi/Desktop/ -n")
我不熟悉 obexpushd
以及使用 obexd
.
提供的功能给你带来的好处
我的理解是,您需要创建一个 obex-agent 到 accept/reject 的蓝牙对象推送请求 obexd
。
监控 InterfacesAdded
DBus 信号应该会为您提供传输开始时需要知道的信息。然后,您可以监视 PropertiesChanged
信号以了解状态何时更改。
我能够将照片从 phone 推送到 Raspberry Pi,并在 RPi 上使用以下 运行(我的 phone 和 RPi 已经配对):
from gi.repository import GLib
import pydbus
BUS_NAME = 'org.bluez.obex'
PATH = '/org/bluez/obex'
AGENT_MANAGER_INTERFACE = 'org.bluez.obex.AgentManager1'
AGENT_INTERFACE = 'org.bluez.obex.Agent1'
SESSION_INTERFACE = 'org.bluez.obex.Session1'
TRANSFER_INTERFACE = 'org.bluez.obex.Transfer1'
ses_bus = pydbus.SessionBus()
def transfer_status_handler(iface, props_changed, props_removed):
if iface == TRANSFER_INTERFACE:
status = props_changed.get('Status')
if status == 'complete':
print('Transfer complete')
elif status == 'queued':
print('Still queued')
elif status == 'active':
print('transferring')
elif status == 'suspended':
print('Suspended')
elif status == 'error':
print('error')
def iface_added_handler(dbus_path, interfaces):
if SESSION_INTERFACE in interfaces and 'server' in dbus_path:
print('Server session added')
elif TRANSFER_INTERFACE in interfaces and 'server' in dbus_path:
print('Transfer started')
transfer = ses_bus.get(BUS_NAME, dbus_path)
transfer.onPropertiesChanged = transfer_status_handler
class Agent:
"""
<node>
<interface name='org.bluez.obex.Agent1'>
<method name='Release'>
</method>
<method name='AuthorizePush'>
<arg type='s' name='path' direction='out'/>
<arg type='o' name='transfer' direction='in'/>
</method>
<method name='Cancel'>
</method>
</interface>
</node>
"""
def AuthorizePush(self, path):
print('Authorize Push', path)
transfer = ses_bus.get(BUS_NAME, path)
props = transfer.GetAll(TRANSFER_INTERFACE)
print(props)
return props.get('Name')
def Cancel(self):
print('Authorization Cancelled')
def Release(self):
pass
if __name__ == '__main__':
obex_mngr = ses_bus.get('org.bluez.obex', '/')
obex_mngr.onInterfacesAdded = iface_added_handler
mainloop = GLib.MainLoop()
ses_bus.register_object('/test/agent', Agent(), None)
print('Agent created')
agnt_mngr = ses_bus.get(BUS_NAME, PATH)
agnt_mngr.RegisterAgent('/test/agent')
print('Agent registered')
try:
mainloop.run()
except KeyboardInterrupt:
mainloop.quit()
此代码只接受由它发出的任何请求。这可能是一件坏事,如果文件应该被接受,人们可能想添加一些检查(或提示用户)。
对于我的项目,我需要通过蓝牙下载 .zip 文件。我为此使用 python 和 obex。我有一个可用的 python opp 客户端实现(向 ukBaz 求助),但我仍在使用 os 启动服务器,所以我的服务器程序的其余部分不知道何时收到文件。程序的其余部分完成了,但是没有服务器的输出,我无法触发解压缩功能到运行。我已经阅读了 bluez-obex API 文档,似乎没有关于启动服务器的任何内容。还有另一种方法吗?我的代码如下。
os.system("sudo obexpushd -B -o /home/pi/Desktop/ -n")
我不熟悉 obexpushd
以及使用 obexd
.
我的理解是,您需要创建一个 obex-agent 到 accept/reject 的蓝牙对象推送请求 obexd
。
监控 InterfacesAdded
DBus 信号应该会为您提供传输开始时需要知道的信息。然后,您可以监视 PropertiesChanged
信号以了解状态何时更改。
我能够将照片从 phone 推送到 Raspberry Pi,并在 RPi 上使用以下 运行(我的 phone 和 RPi 已经配对):
from gi.repository import GLib
import pydbus
BUS_NAME = 'org.bluez.obex'
PATH = '/org/bluez/obex'
AGENT_MANAGER_INTERFACE = 'org.bluez.obex.AgentManager1'
AGENT_INTERFACE = 'org.bluez.obex.Agent1'
SESSION_INTERFACE = 'org.bluez.obex.Session1'
TRANSFER_INTERFACE = 'org.bluez.obex.Transfer1'
ses_bus = pydbus.SessionBus()
def transfer_status_handler(iface, props_changed, props_removed):
if iface == TRANSFER_INTERFACE:
status = props_changed.get('Status')
if status == 'complete':
print('Transfer complete')
elif status == 'queued':
print('Still queued')
elif status == 'active':
print('transferring')
elif status == 'suspended':
print('Suspended')
elif status == 'error':
print('error')
def iface_added_handler(dbus_path, interfaces):
if SESSION_INTERFACE in interfaces and 'server' in dbus_path:
print('Server session added')
elif TRANSFER_INTERFACE in interfaces and 'server' in dbus_path:
print('Transfer started')
transfer = ses_bus.get(BUS_NAME, dbus_path)
transfer.onPropertiesChanged = transfer_status_handler
class Agent:
"""
<node>
<interface name='org.bluez.obex.Agent1'>
<method name='Release'>
</method>
<method name='AuthorizePush'>
<arg type='s' name='path' direction='out'/>
<arg type='o' name='transfer' direction='in'/>
</method>
<method name='Cancel'>
</method>
</interface>
</node>
"""
def AuthorizePush(self, path):
print('Authorize Push', path)
transfer = ses_bus.get(BUS_NAME, path)
props = transfer.GetAll(TRANSFER_INTERFACE)
print(props)
return props.get('Name')
def Cancel(self):
print('Authorization Cancelled')
def Release(self):
pass
if __name__ == '__main__':
obex_mngr = ses_bus.get('org.bluez.obex', '/')
obex_mngr.onInterfacesAdded = iface_added_handler
mainloop = GLib.MainLoop()
ses_bus.register_object('/test/agent', Agent(), None)
print('Agent created')
agnt_mngr = ses_bus.get(BUS_NAME, PATH)
agnt_mngr.RegisterAgent('/test/agent')
print('Agent registered')
try:
mainloop.run()
except KeyboardInterrupt:
mainloop.quit()
此代码只接受由它发出的任何请求。这可能是一件坏事,如果文件应该被接受,人们可能想添加一些检查(或提示用户)。