如何安全终止外部 PY 程序并释放临时资源?
How do I safely terminate the external PY program and free temporary resources?
我基于Bleak第三方库写了一个BLE程序,在接口中读取蓝牙值。关闭接口后,BLE 程序终止失败。这是什么原因?
我上传了一个动画 GIF 来说明问题:
程序包括:创建UI接口,UI接口将外部PY文件中的值读入BLE程序,关闭UI界面.
BLE.py代码:
import numpy as np
import sys
import time
import asyncio
import logging
from bleak import BleakClient
logger = logging.getLogger()
async def run_ble_client(address: str, char_uuid: str, queue: asyncio.Queue):
async def callback_handler(sender, data):
await queue.put((time.time(), data))
async with BleakClient(address) as client:
logger.info(f"Connected: {client.is_connected}")
await client.start_notify(char_uuid, callback_handler)
while True:
await asyncio.sleep(3600.0)
# await client.stop_notify(char_uuid)
# Send an "exit command to the consumer"
await queue.put((time.time(), None))
async def run_queue_consumer(queue: asyncio.Queue):
while True:
# Use await asyncio.wait_for(queue.get(), timeout=1.0) if you want a timeout for getting data.
epoch, data = await queue.get()
if data is None:
logger.info(
"Got message from client about disconnection. Exiting consumer loop..."
)
break
else:
EMG = str(data, "utf-8").split(',')
print(EMG)
print(queue.full())
async def main(address: str, char_uuid: str):
queue = asyncio.Queue()
client_task = run_ble_client(address, char_uuid, queue)
consumer_task = run_queue_consumer(queue)
await asyncio.gather(client_task, consumer_task)
logger.info("Main method done.")
主要代码:
import ble
from PyQt5 import QtCore, QtWidgets
import threading
import EMG_NUM
class UIForm(): # PE8: `CamelNames` for classes
def setupUI(self, form, **kwargs):
form.resize(820, 454)
form.setObjectName("Form")
self.gridLayoutWidget = QtWidgets.QWidget(form)
self.gridLayoutWidget.setGeometry(QtCore.QRect(550, 270, 261, 121))
self.gridLayoutWidget.setObjectName("gridLayoutWidget")
self.gridLayout = QtWidgets.QGridLayout(self.gridLayoutWidget)
self.gridLayout.setContentsMargins(0, 0, 0, 0)
self.gridLayout.setObjectName("gridLayout")
self.retranslateUi(form)
def retranslateUi(self, form):
_translate = QtCore.QCoreApplication.translate
form.setWindowTitle(_translate("Form", "xxxx"))
def main():
app = QtWidgets.QApplication([])
form = QtWidgets.QWidget() # PE8: `lower_case_names` for variables
ui = UIForm()
ui.setupUI(form)
form.show()
app.exec()
if __name__ == "__main__":
# better start before GUI to create all needed variables and values
thread_ble = threading.Thread(target=ble.ble)
thread_ble.start()
# input() # keep running program when GUI runs in thread
main()
thread_ble.join()
我使用了线程,但无法成功终止线程。你能告诉我如何终止线程吗?
furas 不好意思,最近在研究如何提高BLE的吞吐量。我没注意到你的评论。
以上解决方案是修改BLE代码
在BLE的main函数中添加判断,如Furas所说:使用全局running = True
。
查看 BLE 消息以检查是否已连接。如果断开连接,它将跳出。这样就可以安全的关闭蓝牙了。
花了几天时间熟悉了Bleakthird-party库的使用,改成下面的代码。 (几千行代码,这里就不贴了,举个简单的例子)
Main.py
...
if __name__ == "__main__":
thread_ble = threading.Thread(target=ble.ble)
thread_ble.start()
# input() # keep running program when GUI runs in thread
main()
EMG_NUM.Kill=True
thread_ble.join()
EMG_NUM.py
Kill=False
Connect=False
BLE.py
...
async def main(address: str, char_uuid: str):
async with BleakClient(address) as client:
EMG_NUM.Connect=client.is_connected
...
await client.start_notify(char_uuid, callback_handler)
while EMG_NUM.Connect:
if client.is_connected:
pass1`
else:
EMG_NUM.Connect=False
...
break
if EMG_NUM.Kill:
break
else:
await asyncio.sleep(1.0)
def ble():
while EMG_NUM.Kill == False:
try:
...
我基于Bleak第三方库写了一个BLE程序,在接口中读取蓝牙值。关闭接口后,BLE 程序终止失败。这是什么原因?
我上传了一个动画 GIF 来说明问题:
程序包括:创建UI接口,UI接口将外部PY文件中的值读入BLE程序,关闭UI界面.
BLE.py代码:
import numpy as np
import sys
import time
import asyncio
import logging
from bleak import BleakClient
logger = logging.getLogger()
async def run_ble_client(address: str, char_uuid: str, queue: asyncio.Queue):
async def callback_handler(sender, data):
await queue.put((time.time(), data))
async with BleakClient(address) as client:
logger.info(f"Connected: {client.is_connected}")
await client.start_notify(char_uuid, callback_handler)
while True:
await asyncio.sleep(3600.0)
# await client.stop_notify(char_uuid)
# Send an "exit command to the consumer"
await queue.put((time.time(), None))
async def run_queue_consumer(queue: asyncio.Queue):
while True:
# Use await asyncio.wait_for(queue.get(), timeout=1.0) if you want a timeout for getting data.
epoch, data = await queue.get()
if data is None:
logger.info(
"Got message from client about disconnection. Exiting consumer loop..."
)
break
else:
EMG = str(data, "utf-8").split(',')
print(EMG)
print(queue.full())
async def main(address: str, char_uuid: str):
queue = asyncio.Queue()
client_task = run_ble_client(address, char_uuid, queue)
consumer_task = run_queue_consumer(queue)
await asyncio.gather(client_task, consumer_task)
logger.info("Main method done.")
主要代码:
import ble
from PyQt5 import QtCore, QtWidgets
import threading
import EMG_NUM
class UIForm(): # PE8: `CamelNames` for classes
def setupUI(self, form, **kwargs):
form.resize(820, 454)
form.setObjectName("Form")
self.gridLayoutWidget = QtWidgets.QWidget(form)
self.gridLayoutWidget.setGeometry(QtCore.QRect(550, 270, 261, 121))
self.gridLayoutWidget.setObjectName("gridLayoutWidget")
self.gridLayout = QtWidgets.QGridLayout(self.gridLayoutWidget)
self.gridLayout.setContentsMargins(0, 0, 0, 0)
self.gridLayout.setObjectName("gridLayout")
self.retranslateUi(form)
def retranslateUi(self, form):
_translate = QtCore.QCoreApplication.translate
form.setWindowTitle(_translate("Form", "xxxx"))
def main():
app = QtWidgets.QApplication([])
form = QtWidgets.QWidget() # PE8: `lower_case_names` for variables
ui = UIForm()
ui.setupUI(form)
form.show()
app.exec()
if __name__ == "__main__":
# better start before GUI to create all needed variables and values
thread_ble = threading.Thread(target=ble.ble)
thread_ble.start()
# input() # keep running program when GUI runs in thread
main()
thread_ble.join()
我使用了线程,但无法成功终止线程。你能告诉我如何终止线程吗?
furas 不好意思,最近在研究如何提高BLE的吞吐量。我没注意到你的评论。
以上解决方案是修改BLE代码
在BLE的main函数中添加判断,如Furas所说:使用全局running = True
。
查看 BLE 消息以检查是否已连接。如果断开连接,它将跳出。这样就可以安全的关闭蓝牙了。
花了几天时间熟悉了Bleakthird-party库的使用,改成下面的代码。 (几千行代码,这里就不贴了,举个简单的例子)
Main.py
...
if __name__ == "__main__":
thread_ble = threading.Thread(target=ble.ble)
thread_ble.start()
# input() # keep running program when GUI runs in thread
main()
EMG_NUM.Kill=True
thread_ble.join()
EMG_NUM.py
Kill=False
Connect=False
BLE.py
...
async def main(address: str, char_uuid: str):
async with BleakClient(address) as client:
EMG_NUM.Connect=client.is_connected
...
await client.start_notify(char_uuid, callback_handler)
while EMG_NUM.Connect:
if client.is_connected:
pass1`
else:
EMG_NUM.Connect=False
...
break
if EMG_NUM.Kill:
break
else:
await asyncio.sleep(1.0)
def ble():
while EMG_NUM.Kill == False:
try:
...