如何在单独的 Python 多处理进程中从数据流中获取最新值?
How can I get the most recent value from a data stream in a separate Python multiprocessing process?
我有一个从传感器读取的进程和另一个从图表读取的进程。但是,绘图仪可能比传感器读数慢,尤其是当添加了许多传感器时。
我看到将信息从传感器传递给绘图仪的两个选项:管道和 mp.Value。据我所知,管道应该更快,但我担心绘图器开始延迟的问题:如果传感器采样速度是绘图器的 n 倍,那么对于每个绘图器时间步长,我们只进行 1/n 时间步长未来(例如,如果快两倍,则在 20 秒之后,绘图仪只显示 10 秒)。我可以看到传感器在添加新值之前轮询管道并删除所有值,但这在计算上听起来很昂贵。 mp.Value 路由确实需要更明确的锁定,我认为它不如 Pipe class 快,尽管我不确定。
处理此多处理以避免此处出现问题的最佳方法是什么?
编辑澄清:我不在乎绘图员是否获得了所有信息。使用最新的值就可以了,这就是标题说“仅管道最后一个值”的原因。绘图员的主要要求只是让绘图不被延迟,即使我们通过丢弃数据有效地进行了下采样。传感器的采样速度确实需要比绘图仪读取的速度更快,因为数据也在被记录和处理,我们不想对该信息进行下采样。
要获得最新的传感器值,您实际上需要传感器进程等待,直到绘图器准备好发送数据。有几种方法可以做到这一点,但我认为实际使用 two 单向 (duplex=False
) 管道是最好的方法,因为你不需要涉及任何额外的线程或信号量。在此设置中,第一个管道正常发送数据 sensor->grapher,但第二个管道只是表示 grapher 已准备好立即接受数据。用散文表达有点尴尬所以这里是伪代码:
def grapher():
while True:
data = pipe_to_grapher.recv()
graph(data)
pipe_to_sensor.send(None) # Can be any value
def sensor():
while True:
data = sense()
if pipe_to_sensor.poll():
pipe_to_grapher.send(data) # Freshest possible
pipe_to_sensor.recv() # Clear the pipe
record(data)
请注意,如果 poll()
returns False
,传感器可以直接通过,因为这表明绘图仪尚未准备好接收数据。您还可以轻松地扩展系统以使用特殊值将有关一个进程的状态的某些信息传递给另一个进程,例如关闭命令。
(预编辑答案如下)
这个问题似乎是关于应用 backpressure to your sensor data flow. It sounds like a multiprocessing.Queue
might be a good solution for your specific case. Internally it uses a pipe, so it will have similar performance characteristics, but it also has specifically the method put()
的,它可以与 maxsize
参数一起使用,您可以将参数设置为较小的数字,例如 1,这样传感器进程将等到绘图器进程在返回以获取更多数据之前检索了一个项目。
如果传感器有自己的缓冲区需要清除,您可以改用 put_nowait()
并捕获 Full
错误,作为绘图器无法绘制该数据的指示,并且它应该被丢弃。这节省了酸洗和发送数据的开销,但会导致非常快速地轮询传感器,这可能是开销本身的来源,具体取决于 device/drivers/api.
我有一个从传感器读取的进程和另一个从图表读取的进程。但是,绘图仪可能比传感器读数慢,尤其是当添加了许多传感器时。
我看到将信息从传感器传递给绘图仪的两个选项:管道和 mp.Value。据我所知,管道应该更快,但我担心绘图器开始延迟的问题:如果传感器采样速度是绘图器的 n 倍,那么对于每个绘图器时间步长,我们只进行 1/n 时间步长未来(例如,如果快两倍,则在 20 秒之后,绘图仪只显示 10 秒)。我可以看到传感器在添加新值之前轮询管道并删除所有值,但这在计算上听起来很昂贵。 mp.Value 路由确实需要更明确的锁定,我认为它不如 Pipe class 快,尽管我不确定。
处理此多处理以避免此处出现问题的最佳方法是什么?
编辑澄清:我不在乎绘图员是否获得了所有信息。使用最新的值就可以了,这就是标题说“仅管道最后一个值”的原因。绘图员的主要要求只是让绘图不被延迟,即使我们通过丢弃数据有效地进行了下采样。传感器的采样速度确实需要比绘图仪读取的速度更快,因为数据也在被记录和处理,我们不想对该信息进行下采样。
要获得最新的传感器值,您实际上需要传感器进程等待,直到绘图器准备好发送数据。有几种方法可以做到这一点,但我认为实际使用 two 单向 (duplex=False
) 管道是最好的方法,因为你不需要涉及任何额外的线程或信号量。在此设置中,第一个管道正常发送数据 sensor->grapher,但第二个管道只是表示 grapher 已准备好立即接受数据。用散文表达有点尴尬所以这里是伪代码:
def grapher():
while True:
data = pipe_to_grapher.recv()
graph(data)
pipe_to_sensor.send(None) # Can be any value
def sensor():
while True:
data = sense()
if pipe_to_sensor.poll():
pipe_to_grapher.send(data) # Freshest possible
pipe_to_sensor.recv() # Clear the pipe
record(data)
请注意,如果 poll()
returns False
,传感器可以直接通过,因为这表明绘图仪尚未准备好接收数据。您还可以轻松地扩展系统以使用特殊值将有关一个进程的状态的某些信息传递给另一个进程,例如关闭命令。
(预编辑答案如下)
这个问题似乎是关于应用 backpressure to your sensor data flow. It sounds like a multiprocessing.Queue
might be a good solution for your specific case. Internally it uses a pipe, so it will have similar performance characteristics, but it also has specifically the method put()
的,它可以与 maxsize
参数一起使用,您可以将参数设置为较小的数字,例如 1,这样传感器进程将等到绘图器进程在返回以获取更多数据之前检索了一个项目。
如果传感器有自己的缓冲区需要清除,您可以改用 put_nowait()
并捕获 Full
错误,作为绘图器无法绘制该数据的指示,并且它应该被丢弃。这节省了酸洗和发送数据的开销,但会导致非常快速地轮询传感器,这可能是开销本身的来源,具体取决于 device/drivers/api.