如何使用 Python 异步地 运行 终端命令?
How can I run a terminal command asynchronously using Python?
我正在 Linux 环境中工作,尝试使用 Python 脚本自动执行这 2 个 Linux 命令:
- sudo rfcomm 连接 0 XX:XX:XX:XX:XX:XX
- hcitool rssi XX:XX:XX:XX:XX:XX
命令 1 连接到我的蓝牙设备,命令 2 return只要设备连接就接收信号强度指示 (RSSI)。因此脚本必须能够在没有任何人为干预的情况下连接并提取 RSSI。我正在使用 subprocess 模块 运行ning 这些命令。
问题: 当命令 1 运行 建立连接时,它显示“已连接 /dev/rfcomm0 到 XX:XX:XX:XX:XX:XX 在频道 1
按 CTRL-C 挂断",但它不会 return。 一旦连接丢失,它只会 returns,此时提取 RSSI 是不可能的。目前 python 脚本不会在 运行ning 命令 1 之后继续到下一行代码,所以我无法在之后 运行 命令 2。这里是我的代码:
def get_rssi():
while True:
p1 = subprocess.run(['hcitool', 'rssi', shimmer_id], capture_output=True, text=True)
if p1.stderr:
print('stderr:', p1.stderr)
reconnect()
if p1.stdout:
print('stdout:', p1.stdout)
time.sleep(0.3)
def reconnect():
reconnected = False
print('[-] lost connection, reconnecting...')
while reconnected is False:
p2 = subprocess.run(['sudo', 'rfcomm', 'connect', '0', shimmer_id], capture_output=True, text=True)
# If the connection on the previous line is successful, nothing is executed from here onward! The script only continues once the above line returns (i.e. when the connection is lost)
if p2.stderr:
if 'Host is down' in p2.stderr.decode():
print('Host is down')
time.sleep(0.3)
reconnect()
else:
reconnected = True
get_rssi()
我尝试过线程和多处理,但它们没有帮助,因为命令 1 从不“加入”python 线程术语。我知道我可能需要 运行 命令 1 异步 ,但我正在努力在 python 中正确实现它,即使在参考 asyncio 文档时也是如此。非常感谢任何帮助!
您似乎正在尝试建立 RFCOMM 连接。
rfcomm
和 hcitool
早在 2017 年就被 BlueZ 项目 deprecated 了,所以我不建议使用它们来启动项目。
BlueZ 的 API 旨在通过 Python 等语言访问,这些记录在:https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/doc and the official examples are at: https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/test
但是,RFCOMM(或串行端口配置文件)连接可以使用标准 Python 版本中的套接字库完成。在
有例子
上面的答案中也有使用 Bluedot 库来做同样事情的例子。
我正在 Linux 环境中工作,尝试使用 Python 脚本自动执行这 2 个 Linux 命令:
- sudo rfcomm 连接 0 XX:XX:XX:XX:XX:XX
- hcitool rssi XX:XX:XX:XX:XX:XX
命令 1 连接到我的蓝牙设备,命令 2 return只要设备连接就接收信号强度指示 (RSSI)。因此脚本必须能够在没有任何人为干预的情况下连接并提取 RSSI。我正在使用 subprocess 模块 运行ning 这些命令。
问题: 当命令 1 运行 建立连接时,它显示“已连接 /dev/rfcomm0 到 XX:XX:XX:XX:XX:XX 在频道 1 按 CTRL-C 挂断",但它不会 return。 一旦连接丢失,它只会 returns,此时提取 RSSI 是不可能的。目前 python 脚本不会在 运行ning 命令 1 之后继续到下一行代码,所以我无法在之后 运行 命令 2。这里是我的代码:
def get_rssi():
while True:
p1 = subprocess.run(['hcitool', 'rssi', shimmer_id], capture_output=True, text=True)
if p1.stderr:
print('stderr:', p1.stderr)
reconnect()
if p1.stdout:
print('stdout:', p1.stdout)
time.sleep(0.3)
def reconnect():
reconnected = False
print('[-] lost connection, reconnecting...')
while reconnected is False:
p2 = subprocess.run(['sudo', 'rfcomm', 'connect', '0', shimmer_id], capture_output=True, text=True)
# If the connection on the previous line is successful, nothing is executed from here onward! The script only continues once the above line returns (i.e. when the connection is lost)
if p2.stderr:
if 'Host is down' in p2.stderr.decode():
print('Host is down')
time.sleep(0.3)
reconnect()
else:
reconnected = True
get_rssi()
我尝试过线程和多处理,但它们没有帮助,因为命令 1 从不“加入”python 线程术语。我知道我可能需要 运行 命令 1 异步 ,但我正在努力在 python 中正确实现它,即使在参考 asyncio 文档时也是如此。非常感谢任何帮助!
您似乎正在尝试建立 RFCOMM 连接。
rfcomm
和 hcitool
早在 2017 年就被 BlueZ 项目 deprecated 了,所以我不建议使用它们来启动项目。
BlueZ 的 API 旨在通过 Python 等语言访问,这些记录在:https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/doc and the official examples are at: https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/test
但是,RFCOMM(或串行端口配置文件)连接可以使用标准 Python 版本中的套接字库完成。在
上面的答案中也有使用 Bluedot 库来做同样事情的例子。