Python 使用 pyxs Xenstore 客户端监视 GPIO 引脚的守护进程
Python daemon to monitor GPIO pins using pyxs Xenstore client
我正在使用 pyxs
Python Xenstore 客户端模块编写一个 Upstart 守护程序,它监视盒子上 GPIO 控制器上的一堆输出引脚。 daemon的基本结构,启动后就是导出相关的pin,为pin添加对应的Xenstore路径,为每个Xenstore路径添加和监控watch。 watches 部分是线程化的——对于每个 watch,都会创建一个线程,并使用目标 worker 方法来监视 watch 的变化。根据 PyXS 文档,您基本上必须执行以下操作:
# monitor is a pyxs.client.Client.Monitor object, and watch adds a
# watch to the given path
monitor.watch(path, path_token)
# wait for events on the watched path - returns a pair if there is an
# event, the first is the event path and the second is the path token
monitor.wait(sleep=...)
我的问题是,如果未指定 sleep=<time>
参数,对 wait
的调用是否会阻塞 - 从 PyXS 文档中不清楚是否是这种情况。
代码大致是这样的:
from pyxs.client import Client
from pyxs.exceptions import PyXSError
from threading import Thread
...
class gpiod(object):
def __init__(self,...):
...
# stores pin numbers using descriptive labels as keys
self._gpio_pins = {}
# stores Xenstore paths for the pins, using the pin labels as keys
self._xenstore_paths = {}
self._xenstore_client = Client()
self._xenstore_monitor = self._xenstore_client.monitor()
# stores threads that monitor the watches added for the paths
self._xenstore_watchers = {}
self.start()
self.run()
def _watch_xenstore_path(self, watch_path):
"""
A worker method that is a target for a watch thread.
"""
while True:
try:
path_change = self._xenstore_monitor.wait(sleep=1)
while not path_change:
path_change = self._xenstore_monitor.wait(sleep=1)
if path_change[0] == watch_path:
# write changed value to the pin
except PyXSError as e:
# log the error
def start(self):
# load config
...
# export relevant GPIO output pins (using system calls to sysfs)
...
# create Xenstore paths using _xenstore_client
...
# set watches on the paths
for path in self._xenstore_paths:
self._xenstore_monitor.watch(wpath=path, token=path)
self._xenstore_watchers.update(
{pin_label: Thread(target=self._watch_xenstore_path, args=(path,))}
)
def run(self):
for watcher in self._xenstore_watchers:
watcher.start()
来自 pyxs
文档:
wait(sleep=None)
Waits for any of the watched paths to generate an event, which is a (path, token) pair, where the first element is event path, i.e. the
actual path that was modified and second element is a token, passed to
the watch().
Parameters: sleep (float) – number of seconds to sleep between event checks.
这意味着,sleep
参数实际上是对 wait
方法进行偶数检查,如果您从 github 中查看 code:
def wait(self, sleep=None):
"""Waits for any of the watched paths to generate an event,
which is a ``(path, token)`` pair, where the first element
is event path, i.e. the actual path that was modified and
second element is a token, passed to the :meth:`watch`.
:param float sleep: number of seconds to sleep between event
checks.
"""
while True:
if self.client.events:
packet = self.client.events.popleft()
return Event(*packet.payload.split("\x00")[:-1])
# Executing a noop, hopefuly we'll get some events queued
# in the meantime. Note: I know it sucks, but it seems like
# there's no other way ...
self.client.execute_command(Op.DEBUG, "")
if sleep is not None: #sleep here is to provide tap gap between event check
time.sleep(sleep)
您会看到 sleep
参数仅用于提供事件检查之间的抽头间隙,其中 while True
循环一直持续到您得到 un event
或者如果您想要:检查event
的速度有多快,sleep
越小,常规检查越快。
所以,总而言之:
对 wait
方法的调用已经阻塞,sleep
只是为了在 event
检查之间给出时间间隔。
我正在使用 pyxs
Python Xenstore 客户端模块编写一个 Upstart 守护程序,它监视盒子上 GPIO 控制器上的一堆输出引脚。 daemon的基本结构,启动后就是导出相关的pin,为pin添加对应的Xenstore路径,为每个Xenstore路径添加和监控watch。 watches 部分是线程化的——对于每个 watch,都会创建一个线程,并使用目标 worker 方法来监视 watch 的变化。根据 PyXS 文档,您基本上必须执行以下操作:
# monitor is a pyxs.client.Client.Monitor object, and watch adds a
# watch to the given path
monitor.watch(path, path_token)
# wait for events on the watched path - returns a pair if there is an
# event, the first is the event path and the second is the path token
monitor.wait(sleep=...)
我的问题是,如果未指定 sleep=<time>
参数,对 wait
的调用是否会阻塞 - 从 PyXS 文档中不清楚是否是这种情况。
代码大致是这样的:
from pyxs.client import Client
from pyxs.exceptions import PyXSError
from threading import Thread
...
class gpiod(object):
def __init__(self,...):
...
# stores pin numbers using descriptive labels as keys
self._gpio_pins = {}
# stores Xenstore paths for the pins, using the pin labels as keys
self._xenstore_paths = {}
self._xenstore_client = Client()
self._xenstore_monitor = self._xenstore_client.monitor()
# stores threads that monitor the watches added for the paths
self._xenstore_watchers = {}
self.start()
self.run()
def _watch_xenstore_path(self, watch_path):
"""
A worker method that is a target for a watch thread.
"""
while True:
try:
path_change = self._xenstore_monitor.wait(sleep=1)
while not path_change:
path_change = self._xenstore_monitor.wait(sleep=1)
if path_change[0] == watch_path:
# write changed value to the pin
except PyXSError as e:
# log the error
def start(self):
# load config
...
# export relevant GPIO output pins (using system calls to sysfs)
...
# create Xenstore paths using _xenstore_client
...
# set watches on the paths
for path in self._xenstore_paths:
self._xenstore_monitor.watch(wpath=path, token=path)
self._xenstore_watchers.update(
{pin_label: Thread(target=self._watch_xenstore_path, args=(path,))}
)
def run(self):
for watcher in self._xenstore_watchers:
watcher.start()
来自 pyxs
文档:
wait(sleep=None)
Waits for any of the watched paths to generate an event, which is a (path, token) pair, where the first element is event path, i.e. the actual path that was modified and second element is a token, passed to the watch(). Parameters: sleep (float) – number of seconds to sleep between event checks.
这意味着,sleep
参数实际上是对 wait
方法进行偶数检查,如果您从 github 中查看 code:
def wait(self, sleep=None):
"""Waits for any of the watched paths to generate an event,
which is a ``(path, token)`` pair, where the first element
is event path, i.e. the actual path that was modified and
second element is a token, passed to the :meth:`watch`.
:param float sleep: number of seconds to sleep between event
checks.
"""
while True:
if self.client.events:
packet = self.client.events.popleft()
return Event(*packet.payload.split("\x00")[:-1])
# Executing a noop, hopefuly we'll get some events queued
# in the meantime. Note: I know it sucks, but it seems like
# there's no other way ...
self.client.execute_command(Op.DEBUG, "")
if sleep is not None: #sleep here is to provide tap gap between event check
time.sleep(sleep)
您会看到 sleep
参数仅用于提供事件检查之间的抽头间隙,其中 while True
循环一直持续到您得到 un event
或者如果您想要:检查event
的速度有多快,sleep
越小,常规检查越快。
所以,总而言之:
对 wait
方法的调用已经阻塞,sleep
只是为了在 event
检查之间给出时间间隔。