运行 gpiozero 持续监听 uvicorn
Running gpiozero listener continuously with uvicorn
我正在尝试编写一个 python 应用程序,它将在 raspberry pi 上 运行,同时具有套接字连接(带 uvicorn 的套接字)和物理输入侦听器。我打算同时监听套接字连接和 gpio 事件,而不会相互阻塞。这是我目前所拥有的:
api.py
import uvicorn
import asyncio
from interaction.volume import VolumeControl
from system.platform_info import PlatformInfo
from connection.api_socket import app
class Api:
def __init__(self):
pass
def initialize_volume_listener(self):
volume_controller = VolumeControl()
volume_controller.start_listener()
def start(self):
PlatformInfo().print_info()
self.initialize_volume_listener()
uvicorn.run(app, host='127.0.0.1', port=5000, loop="asyncio")
volume_control.py
import asyncio
from gpiozero import Button
from connection.api_socket import volume_up
class VolumeControl:
def __init__(self):
self.volume_up_button = Button(4)
def volume_up(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
future = asyncio.ensure_future(volume_up(None, None))
loop.run_until_complete(future)
loop.close()
def start_listener(self):
self.volume_up_button.when_pressed = self.volume_up
api_socket.py
import socketio
from system.platform_info import PlatformInfo
sio = socketio.AsyncServer(async_mode='asgi', cors_allowed_origins='*')
app = socketio.ASGIApp(sio)
@sio.on('connect')
async def test_connect(sid, environ):
system_info = PlatformInfo().get_info()
current_volume = 35
initial_data = {"system_info": system_info,
"settings": {"volume": current_volume}
}
await sio.emit('initial_data', initial_data, room=sid)
@sio.on('disconnect request')
async def disconnect_request(sid):
await sio.disconnect(sid)
@sio.on('disconnect')
async def test_disconnect(sid):
print('Client disconnected')
await sio.emit('disconnect', {'data': 'Connected', 'count': 0}, room=sid)
@sio.on('volume_up')
async def volume_up(sid, volume=None):
increased_volume = 25
await sio.emit('volume_up', {'volume': increased_volume})
@sio.on('volume_down')
async def volume_down(sid, volume=None):
decreased_volume = 25
await sio.emit('volume_down', {'volume': decreased_volume})
我尝试过使用 asyncio,但我对 python 的异步功能有点陌生。问题是,我无法连续 运行 按钮侦听器,因此当套接字函数在进行时,我将能够同时侦听按钮交互,而不会相互阻塞。按钮侦听器根本不起作用。相反,只要 uvicorn 应用程序启动,我就需要按钮侦听器 运行ning。
如有任何帮助,我们将不胜感激。
谢谢。
@Miguel 谢谢你的回答。正如您所建议的,我已经在 while
循环中启动了 gpio,并在循环内使用了 asyncio.run()
命令来调用相关的 socketio 函数。它按预期工作。旁注:我已经用参数 daemon=True
启动了 gpio 线程。这使我能够在退出主线程(即 uvicorn 服务器)后立即退出 gpio 循环。最终代码如下:
api_socket.py
@sio.on('video_load')
async def load_video(sid, video_number=3):
data = open(os.path.join(os.getcwd(), f'sample_videos/dummy_video_{str(video_number)}.mp4'), 'rb').read()
print('Streaming video...')
await sio.emit('video_load', {'source': data}, room=sid)
nfc_listener.py
class NFCListener:
reading = True
def __init__(self):
GPIO.setmode(GPIO.BOARD)
self.rdr = RFID()
util = self.rdr.util()
util.debug = True
self.listener_thread = threading.Thread(target=self.start_nfc, daemon=True)
def start_nfc(self):
selected_video = None
while self.reading:
self.rdr.wait_for_tag()
(error, data) = self.rdr.request()
if not error:
print("\nCard identified!")
(error, uid) = self.rdr.anticoll()
if not error:
# Print UID
card_uid = str(uid[0])+" "+str(uid[1])+" " + \
str(uid[2])+" "+str(uid[3])+" "+str(uid[4])
print(card_uid)
if card_uid == "166 107 86 20 143":
if selected_video != 2:
selected_video = 2
asyncio.run(load_video(None, selected_video))
else:
if selected_video != 3:
selected_video = 3
asyncio.run(load_video(None, selected_video))
def start_reading(self):
self.listener_thread.start()
gpiozero 创建一个执行回调的新线程(这没有很好地记录)。如果回调应该在主异步循环中执行,那么您需要将控制权交还给主线程。
call_soon_threadsafe 方法可以为您做到这一点。本质上,它将回调添加到等待发生时主异步循环调用的任务列表中。
然而,asyncio 循环对于每个线程都是本地的:参见 get_running_loop
因此,当在主异步线程中创建 gpiozero 对象时,您需要在调用回调时使该循环对象对对象可用。
以下是我如何为调用异步 MQTT 方法的 PIR 执行此操作:
class PIR:
def __init__(self, mqtt, pin):
self.pir = MotionSensor(pin=pin)
self.pir.when_motion = self.motion
# store the mqtt client we'll need to call
self.mqtt = mqtt
# This PIR object is created in the main thread
# so store that loop object
self.loop = asyncio.get_running_loop()
def motion(self):
# motion is called in the gpiozero monitoring thread
# it has to use our stored copy of the loop and then
# tell that loop to call the callback:
self.loop.call_soon_threadsafe(self.mqtt.publish,
f'sensor/gpiod/pir/kitchen', True)
你可能想要这个:
import asyncio
from gpiozero import Button
from connection.api_socket import volume_up
class VolumeControl:
def __init__(self):
self.volume_up_button = Button(4)
self.loop = asyncio.get_running_loop()
def volume_up_cb(self):
self.loop.call_soon_threadsafe(volume_up, None, None)
def start_listener(self):
self.volume_up_button.when_pressed = self.volume_up_cb
更干净 - 并且线程安全! :)
我正在尝试编写一个 python 应用程序,它将在 raspberry pi 上 运行,同时具有套接字连接(带 uvicorn 的套接字)和物理输入侦听器。我打算同时监听套接字连接和 gpio 事件,而不会相互阻塞。这是我目前所拥有的:
api.py
import uvicorn
import asyncio
from interaction.volume import VolumeControl
from system.platform_info import PlatformInfo
from connection.api_socket import app
class Api:
def __init__(self):
pass
def initialize_volume_listener(self):
volume_controller = VolumeControl()
volume_controller.start_listener()
def start(self):
PlatformInfo().print_info()
self.initialize_volume_listener()
uvicorn.run(app, host='127.0.0.1', port=5000, loop="asyncio")
volume_control.py
import asyncio
from gpiozero import Button
from connection.api_socket import volume_up
class VolumeControl:
def __init__(self):
self.volume_up_button = Button(4)
def volume_up(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
future = asyncio.ensure_future(volume_up(None, None))
loop.run_until_complete(future)
loop.close()
def start_listener(self):
self.volume_up_button.when_pressed = self.volume_up
api_socket.py
import socketio
from system.platform_info import PlatformInfo
sio = socketio.AsyncServer(async_mode='asgi', cors_allowed_origins='*')
app = socketio.ASGIApp(sio)
@sio.on('connect')
async def test_connect(sid, environ):
system_info = PlatformInfo().get_info()
current_volume = 35
initial_data = {"system_info": system_info,
"settings": {"volume": current_volume}
}
await sio.emit('initial_data', initial_data, room=sid)
@sio.on('disconnect request')
async def disconnect_request(sid):
await sio.disconnect(sid)
@sio.on('disconnect')
async def test_disconnect(sid):
print('Client disconnected')
await sio.emit('disconnect', {'data': 'Connected', 'count': 0}, room=sid)
@sio.on('volume_up')
async def volume_up(sid, volume=None):
increased_volume = 25
await sio.emit('volume_up', {'volume': increased_volume})
@sio.on('volume_down')
async def volume_down(sid, volume=None):
decreased_volume = 25
await sio.emit('volume_down', {'volume': decreased_volume})
我尝试过使用 asyncio,但我对 python 的异步功能有点陌生。问题是,我无法连续 运行 按钮侦听器,因此当套接字函数在进行时,我将能够同时侦听按钮交互,而不会相互阻塞。按钮侦听器根本不起作用。相反,只要 uvicorn 应用程序启动,我就需要按钮侦听器 运行ning。
如有任何帮助,我们将不胜感激。 谢谢。
@Miguel 谢谢你的回答。正如您所建议的,我已经在 while
循环中启动了 gpio,并在循环内使用了 asyncio.run()
命令来调用相关的 socketio 函数。它按预期工作。旁注:我已经用参数 daemon=True
启动了 gpio 线程。这使我能够在退出主线程(即 uvicorn 服务器)后立即退出 gpio 循环。最终代码如下:
api_socket.py
@sio.on('video_load')
async def load_video(sid, video_number=3):
data = open(os.path.join(os.getcwd(), f'sample_videos/dummy_video_{str(video_number)}.mp4'), 'rb').read()
print('Streaming video...')
await sio.emit('video_load', {'source': data}, room=sid)
nfc_listener.py
class NFCListener:
reading = True
def __init__(self):
GPIO.setmode(GPIO.BOARD)
self.rdr = RFID()
util = self.rdr.util()
util.debug = True
self.listener_thread = threading.Thread(target=self.start_nfc, daemon=True)
def start_nfc(self):
selected_video = None
while self.reading:
self.rdr.wait_for_tag()
(error, data) = self.rdr.request()
if not error:
print("\nCard identified!")
(error, uid) = self.rdr.anticoll()
if not error:
# Print UID
card_uid = str(uid[0])+" "+str(uid[1])+" " + \
str(uid[2])+" "+str(uid[3])+" "+str(uid[4])
print(card_uid)
if card_uid == "166 107 86 20 143":
if selected_video != 2:
selected_video = 2
asyncio.run(load_video(None, selected_video))
else:
if selected_video != 3:
selected_video = 3
asyncio.run(load_video(None, selected_video))
def start_reading(self):
self.listener_thread.start()
gpiozero 创建一个执行回调的新线程(这没有很好地记录)。如果回调应该在主异步循环中执行,那么您需要将控制权交还给主线程。
call_soon_threadsafe 方法可以为您做到这一点。本质上,它将回调添加到等待发生时主异步循环调用的任务列表中。
然而,asyncio 循环对于每个线程都是本地的:参见 get_running_loop
因此,当在主异步线程中创建 gpiozero 对象时,您需要在调用回调时使该循环对象对对象可用。
以下是我如何为调用异步 MQTT 方法的 PIR 执行此操作:
class PIR:
def __init__(self, mqtt, pin):
self.pir = MotionSensor(pin=pin)
self.pir.when_motion = self.motion
# store the mqtt client we'll need to call
self.mqtt = mqtt
# This PIR object is created in the main thread
# so store that loop object
self.loop = asyncio.get_running_loop()
def motion(self):
# motion is called in the gpiozero monitoring thread
# it has to use our stored copy of the loop and then
# tell that loop to call the callback:
self.loop.call_soon_threadsafe(self.mqtt.publish,
f'sensor/gpiod/pir/kitchen', True)
你可能想要这个:
import asyncio
from gpiozero import Button
from connection.api_socket import volume_up
class VolumeControl:
def __init__(self):
self.volume_up_button = Button(4)
self.loop = asyncio.get_running_loop()
def volume_up_cb(self):
self.loop.call_soon_threadsafe(volume_up, None, None)
def start_listener(self):
self.volume_up_button.when_pressed = self.volume_up_cb
更干净 - 并且线程安全! :)