Python - 多处理 - 队列:即使是同一个对象,我的队列的引用也发生了变化?
Python - multiprocessing - queue: The reference of my queue changed even if it is the same object?
我不久前开始使用 multiprocessing,它正在处理基本示例。之后我尝试实现某种多声音输入程序,并尝试通过队列将输入通量引导到某个处理模块,但目前严重失败。
我将分三点描述我的问题:文件夹结构、进程结构、我尝试过的方法。
文件夹结构
- 根文件夹
- 申请
- start_applicaton.py
- input_cfg.ini
- 核心
- core.py
- gui.py
- audio_recorder.py(使用sounddevice.InputStream)
- x_recorder.py
进程结构
当我启动 运行 我的应用程序时,会调用 gui 并在我按下开始按钮后创建进程。
- 主进程
- audio_recorder_1 进程
- audio_recorder_ 进程
- 申请流程
我试过的
core.py
from multiprocessing import Queue, Process
central_queue = Queue()
...
d = {}
d['output'] = central_queue
o = AudioRecorder('name', **d)
start_application.py
import core
def handle_queue_data():
while True:
print(str(core.central_queue.get()))
if __name__ == "__main__":
Process(target=handle_queue_data, name="syncOutput").start()
audio_recorder.py
class AudioRecorder(object):
def __init__(self, name, **d):
...
self.output_queue = d['output']
def run(self):
queue = Queue()
def callback(indata, frames, time, status):
if status:
print(status, flush=True)
# Push the got data into the queue
queue.put([indata.copy()])
with sd.InputStream(samplerate=self.sample_rate, device=self.device_id, channels=self.channel_id, callback=callback):
while True:
self.output_queue.put(queue.get())
它不工作。调试后,似乎从记录器的 core.py
开始后,队列的引用发生了变化...仅供调试信息:
# in the audio_recorder.py object
centralized_queue = {Queue} <multiprocessing.queues.Queue object at 0x00000000086B3320>
_buffer = {deque} deque([[array([[-0.01989746, -0.02053833],\n [-0.01828003, -0.0196228 ],\n [-0.00634766, -0.00686646],\n ..., \n [-0.01119995, -0.01144409],\n [-0.00900269, -0.00982666],\n [-0.00823975, -0.00888062]], dtype=float32)]])
_close = {Finalize} <Finalize object, callback=_finalize_close, args=[deque([[array([[-0.01989746, -0.02053833],\n [-0.01828003, -0.0196228 ],\n [-0.00634766, -0.00686646],\n ..., \n [-0.01119995, -0.01144409],\n [-0.00900269, -0.00982666],\n [-0
_closed = {bool} False
_ignore_epipe = {bool} False
_joincancelled = {bool} False
_jointhread = {Finalize} <Finalize object, callback=_finalize_join, args=[<weakref at 0x00000000083A2638; to 'Thread' at 0x0000000004DF1B00>], exitprority=-5>
_maxsize = {int} 2147483647
_notempty = {Condition} <Condition(<unlocked _thread.lock object at 0x0000000004738198>, 0)>
_opid = {int} 1320
_reader = {PipeConnection} <multiprocessing.connection.PipeConnection object at 0x00000000086B34A8>
_rlock = {Lock} <Lock(owner=None)>
_sem = {BoundedSemaphore} <BoundedSemaphore(value=2147483645, maxvalue=2147483647)>
_thread = {Thread} <Thread(QueueFeederThread, started daemon 9344)>
_wlock = {NoneType} None
_writer = {PipeConnection} <multiprocessing.connection.PipeConnection object at 0x00000000086B3518>
# in the handle_queue_data
centralized_queue = {Queue} <multiprocessing.queues.Queue object at 0x000000000479DA20>
_buffer = {deque} deque([])
_close = {NoneType} None
_closed = {bool} False
_ignore_epipe = {bool} False
_joincancelled = {bool} False
_jointhread = {NoneType} None
_maxsize = {int} 2147483647
_notempty = {Condition} <Condition(<unlocked _thread.lock object at 0x00000000058C8350>, 0)>
_opid = {int} 7208
_reader = {PipeConnection} <multiprocessing.connection.PipeConnection object at 0x000000000684C438>
_rlock = {Lock} <Lock(owner=None)>
_sem = {BoundedSemaphore} <BoundedSemaphore(value=2147483647, maxvalue=2147483647)>
_thread = {NoneType} None
_wlock = {NoneType} None
_writer = {PipeConnection} <multiprocessing.connection.PipeConnection object at 0x00000000058DE6A0>
之后我也尝试过使用不同的东西,都没有成功,我没有设法传递数据......这里队列可能是一个可变对象吗?或者多处理中存在错误(不太可能)或者与 sounddevice 的结合使队列不稳定?
对不起,我的描述太长了...
在此先感谢您的帮助!
最好的问候,
塞巴斯蒂安
我真的没有使用 multiprocessing
的经验,但据我了解 start_application.py
的模块名称空间中的所有对象对于每个进程都是重复的。
如果我没记错的话,这包括 core
模块。因此,core.central_queue
每个进程都有一个单独的实例。
至少 Windows 似乎是这种情况,Python docs 无论如何都建议 "Explicitly pass resources to child processes"。
您应该使用 if __name__ == '__main__':
块来创建 Queue
的唯一实例和 AudioRecorder
的唯一实例。
然后,您可以使用 Process
的 args
参数将这些唯一实例传递给您的进程(如上面 link 所示)。
除此之外,我真的不知道你想要达到什么目的。你想用随机的可用进程之一处理随机的音频输入块吗?
或者您想为每个进程提供完整的相同音频输入?
在后一种情况下,毕竟每个子进程都应该有一个单独的队列!
sd.InputStream
应该仍然是唯一的。在您的 with
语句中,您应该遍历所有子进程并将当前的音频块分别放入每个进程队列中。
PS: 我刚刚意识到您可能出于某种原因只想启动一个附加进程。在这种情况下,你应该考虑放弃整个 multiprocessing
混乱,只做你需要在 with
语句中做的任何事情。
更新:
如果您想同时使用多个音频设备(因此需要多个 PortAudio 流),您仍然不一定需要 multiprocessing
。你可以有一个包含多个上下文管理器的 with
语句,并在其中进行处理。
根据您想要实现的目标,您可能有一个队列,所有音频回调都写入其中,或者每个回调有一个队列。
如果您有充分的理由使用 multiprocessing
,如果您在主进程中启动所有音频流并在新的子进程中进行处理,它也应该可以正常工作。
我不久前开始使用 multiprocessing,它正在处理基本示例。之后我尝试实现某种多声音输入程序,并尝试通过队列将输入通量引导到某个处理模块,但目前严重失败。 我将分三点描述我的问题:文件夹结构、进程结构、我尝试过的方法。
文件夹结构
- 根文件夹
- 申请
- start_applicaton.py
- input_cfg.ini
- 核心
- core.py
- gui.py
- audio_recorder.py(使用sounddevice.InputStream)
- x_recorder.py
- 申请
进程结构 当我启动 运行 我的应用程序时,会调用 gui 并在我按下开始按钮后创建进程。
- 主进程
- audio_recorder_1 进程
- audio_recorder_ 进程
- 申请流程
我试过的
core.py
from multiprocessing import Queue, Process
central_queue = Queue()
...
d = {}
d['output'] = central_queue
o = AudioRecorder('name', **d)
start_application.py
import core
def handle_queue_data():
while True:
print(str(core.central_queue.get()))
if __name__ == "__main__":
Process(target=handle_queue_data, name="syncOutput").start()
audio_recorder.py
class AudioRecorder(object):
def __init__(self, name, **d):
...
self.output_queue = d['output']
def run(self):
queue = Queue()
def callback(indata, frames, time, status):
if status:
print(status, flush=True)
# Push the got data into the queue
queue.put([indata.copy()])
with sd.InputStream(samplerate=self.sample_rate, device=self.device_id, channels=self.channel_id, callback=callback):
while True:
self.output_queue.put(queue.get())
它不工作。调试后,似乎从记录器的 core.py
开始后,队列的引用发生了变化...仅供调试信息:
# in the audio_recorder.py object
centralized_queue = {Queue} <multiprocessing.queues.Queue object at 0x00000000086B3320>
_buffer = {deque} deque([[array([[-0.01989746, -0.02053833],\n [-0.01828003, -0.0196228 ],\n [-0.00634766, -0.00686646],\n ..., \n [-0.01119995, -0.01144409],\n [-0.00900269, -0.00982666],\n [-0.00823975, -0.00888062]], dtype=float32)]])
_close = {Finalize} <Finalize object, callback=_finalize_close, args=[deque([[array([[-0.01989746, -0.02053833],\n [-0.01828003, -0.0196228 ],\n [-0.00634766, -0.00686646],\n ..., \n [-0.01119995, -0.01144409],\n [-0.00900269, -0.00982666],\n [-0
_closed = {bool} False
_ignore_epipe = {bool} False
_joincancelled = {bool} False
_jointhread = {Finalize} <Finalize object, callback=_finalize_join, args=[<weakref at 0x00000000083A2638; to 'Thread' at 0x0000000004DF1B00>], exitprority=-5>
_maxsize = {int} 2147483647
_notempty = {Condition} <Condition(<unlocked _thread.lock object at 0x0000000004738198>, 0)>
_opid = {int} 1320
_reader = {PipeConnection} <multiprocessing.connection.PipeConnection object at 0x00000000086B34A8>
_rlock = {Lock} <Lock(owner=None)>
_sem = {BoundedSemaphore} <BoundedSemaphore(value=2147483645, maxvalue=2147483647)>
_thread = {Thread} <Thread(QueueFeederThread, started daemon 9344)>
_wlock = {NoneType} None
_writer = {PipeConnection} <multiprocessing.connection.PipeConnection object at 0x00000000086B3518>
# in the handle_queue_data
centralized_queue = {Queue} <multiprocessing.queues.Queue object at 0x000000000479DA20>
_buffer = {deque} deque([])
_close = {NoneType} None
_closed = {bool} False
_ignore_epipe = {bool} False
_joincancelled = {bool} False
_jointhread = {NoneType} None
_maxsize = {int} 2147483647
_notempty = {Condition} <Condition(<unlocked _thread.lock object at 0x00000000058C8350>, 0)>
_opid = {int} 7208
_reader = {PipeConnection} <multiprocessing.connection.PipeConnection object at 0x000000000684C438>
_rlock = {Lock} <Lock(owner=None)>
_sem = {BoundedSemaphore} <BoundedSemaphore(value=2147483647, maxvalue=2147483647)>
_thread = {NoneType} None
_wlock = {NoneType} None
_writer = {PipeConnection} <multiprocessing.connection.PipeConnection object at 0x00000000058DE6A0>
之后我也尝试过使用不同的东西,都没有成功,我没有设法传递数据......这里队列可能是一个可变对象吗?或者多处理中存在错误(不太可能)或者与 sounddevice 的结合使队列不稳定?
对不起,我的描述太长了...
在此先感谢您的帮助!
最好的问候,
塞巴斯蒂安
我真的没有使用 multiprocessing
的经验,但据我了解 start_application.py
的模块名称空间中的所有对象对于每个进程都是重复的。
如果我没记错的话,这包括 core
模块。因此,core.central_queue
每个进程都有一个单独的实例。
至少 Windows 似乎是这种情况,Python docs 无论如何都建议 "Explicitly pass resources to child processes"。
您应该使用 if __name__ == '__main__':
块来创建 Queue
的唯一实例和 AudioRecorder
的唯一实例。
然后,您可以使用 Process
的 args
参数将这些唯一实例传递给您的进程(如上面 link 所示)。
除此之外,我真的不知道你想要达到什么目的。你想用随机的可用进程之一处理随机的音频输入块吗? 或者您想为每个进程提供完整的相同音频输入?
在后一种情况下,毕竟每个子进程都应该有一个单独的队列!
sd.InputStream
应该仍然是唯一的。在您的 with
语句中,您应该遍历所有子进程并将当前的音频块分别放入每个进程队列中。
PS: 我刚刚意识到您可能出于某种原因只想启动一个附加进程。在这种情况下,你应该考虑放弃整个 multiprocessing
混乱,只做你需要在 with
语句中做的任何事情。
更新:
如果您想同时使用多个音频设备(因此需要多个 PortAudio 流),您仍然不一定需要 multiprocessing
。你可以有一个包含多个上下文管理器的 with
语句,并在其中进行处理。
根据您想要实现的目标,您可能有一个队列,所有音频回调都写入其中,或者每个回调有一个队列。
如果您有充分的理由使用 multiprocessing
,如果您在主进程中启动所有音频流并在新的子进程中进行处理,它也应该可以正常工作。