使用 pexpect 检测和处理来自低功耗蓝牙的通知
Using pexpect to detect and handle notifications from Bluetooth LE
我一直在编写 python 代码来读取 Bluetooth LE 接收到的 Raspberry Pi 3 Model B 上的值。
我可以通过以下方式读取正确的值:
child.sendline("char-read-hnd handle")
child.expect("Characteristic value/descripto: ",timeout=5)
我现在想做的是随时检查通知,所以我有一个线程可以搜索预期的模式 "Notification handle=",如下所示:
def run():
patterns = ['Notification handle=','Indication handle=']
while True:
try:
matched_pattern_index = child.expect(patterns,timeout=1)
if matched_pattern_index in {0,1}:
print("Received Notification")
handleNotification()
except pexpect.TIMEOUT:
pass
现在在我的主要代码中,我总是做一些 child.sendline 来检查新值以及 child.expect 。问题是我上面对 pexpect.expect 的 Thread 调用阻塞了代码中的其他 pexpect.expect。
我已经尝试做第二个 child 类似于第一个在 Thread 内部工作,但结果是一样的。
所以有人知道我如何实现这一目标吗?
如有任何帮助,我们将不胜感激。
提前致谢
我正在考虑继承 pexpect.spawn
并提供一个 expect_before(p,c)
方法来保存模式列表 p
和回调函数 c
,然后覆盖 expect(p2)
在调用真正的 spawn.expect
函数之前将列表 p
添加到该调用的列表 p2
上。
如果真正的函数return是一个匹配索引i
,它在列表p
的大小之内,我们可以调用函数c[i]
并再次循环。当索引超出该列表时,我们将其调整为来自调用的列表 p2
和 return 中的索引。这是一个近似值:
#!/usr/bin/python
#
from __future__ import print_function
import sys, pexpect
class Myspawn(pexpect.spawn):
def __init__(self, *args, **kwargs):
pexpect.spawn.__init__(self, *args, **kwargs)
def expect_before(self, patterns, callbacks):
self.eb_pat = patterns
self.eb_cb = callbacks
def expect(self, patlist, *args, **kwargs):
numbefore = len(self.eb_pat)
while True:
rc = pexpect.spawn.expect(self, self.eb_pat+patlist, *args, **kwargs)
if rc>=numbefore:
break
self.eb_cb[rc]() # call callback
return rc-numbefore
def test():
child = Myspawn("sudo bluetoothctl", logfile=sys.stdout)
patterns = ['Device FC:A8:9A:..:..:..', 'Device C8:FD:41:..:..:..']
callbacks = [lambda :handler1(), lambda :handler2()]
child.expect_before(patterns, callbacks)
child.sendline('scan on')
while True:
child.expect(['[bluetooth].*?#'], timeout=5)
我一直在编写 python 代码来读取 Bluetooth LE 接收到的 Raspberry Pi 3 Model B 上的值。 我可以通过以下方式读取正确的值:
child.sendline("char-read-hnd handle")
child.expect("Characteristic value/descripto: ",timeout=5)
我现在想做的是随时检查通知,所以我有一个线程可以搜索预期的模式 "Notification handle=",如下所示:
def run():
patterns = ['Notification handle=','Indication handle=']
while True:
try:
matched_pattern_index = child.expect(patterns,timeout=1)
if matched_pattern_index in {0,1}:
print("Received Notification")
handleNotification()
except pexpect.TIMEOUT:
pass
现在在我的主要代码中,我总是做一些 child.sendline 来检查新值以及 child.expect 。问题是我上面对 pexpect.expect 的 Thread 调用阻塞了代码中的其他 pexpect.expect。 我已经尝试做第二个 child 类似于第一个在 Thread 内部工作,但结果是一样的。 所以有人知道我如何实现这一目标吗?
如有任何帮助,我们将不胜感激。 提前致谢
我正在考虑继承 pexpect.spawn
并提供一个 expect_before(p,c)
方法来保存模式列表 p
和回调函数 c
,然后覆盖 expect(p2)
在调用真正的 spawn.expect
函数之前将列表 p
添加到该调用的列表 p2
上。
如果真正的函数return是一个匹配索引i
,它在列表p
的大小之内,我们可以调用函数c[i]
并再次循环。当索引超出该列表时,我们将其调整为来自调用的列表 p2
和 return 中的索引。这是一个近似值:
#!/usr/bin/python
#
from __future__ import print_function
import sys, pexpect
class Myspawn(pexpect.spawn):
def __init__(self, *args, **kwargs):
pexpect.spawn.__init__(self, *args, **kwargs)
def expect_before(self, patterns, callbacks):
self.eb_pat = patterns
self.eb_cb = callbacks
def expect(self, patlist, *args, **kwargs):
numbefore = len(self.eb_pat)
while True:
rc = pexpect.spawn.expect(self, self.eb_pat+patlist, *args, **kwargs)
if rc>=numbefore:
break
self.eb_cb[rc]() # call callback
return rc-numbefore
def test():
child = Myspawn("sudo bluetoothctl", logfile=sys.stdout)
patterns = ['Device FC:A8:9A:..:..:..', 'Device C8:FD:41:..:..:..']
callbacks = [lambda :handler1(), lambda :handler2()]
child.expect_before(patterns, callbacks)
child.sendline('scan on')
while True:
child.expect(['[bluetooth].*?#'], timeout=5)