同步两个独立进程的输出 Python
Synchronize output of two independent processes Python
我正在开发一个多进程代码来并行计算两个并行进程中的一些参数,然后这些参数在可用时用于主进程。
特别是我在 3D 音频领域工作(开发具有音频障碍物的 3D 音频模拟器),以上两个过程计算给定声源位置的插值 HRIR 和截止频率(障碍物模拟)给定声源和障碍物位置。
“结构”如下:
为了让事情更清楚:
- 主进程生成 2 个子进程并使用要模拟的源和对象位置序列初始化它们
- 主进程继续做自己的事情(做 DSP 的事情等等...)
- 当两个子进程完成对单个位置的处理后,它们输出 HRIR 和截止值。这两个参数在主进程中更新,主进程将继续使用它自己的 DSP 内容,但使用两个新参数
到目前为止,这是我的代码:
import time
import numpy as np
from classes.convolutioner import Convolutioner
import librosa
from classes.HRIR_interpreter_min_phase_linear_interpolation import HRIR_interpreter_min_phase_linear_interpolation
from classes.object_renderer import ObjectRenderer
import multiprocessing
from multiprocessing import Queue
#Useful video https://www.youtube.com/watch?v=sp7EhjLkFY4&t=17s&ab_channel=codebasics
def compute_hrir(queue_source_position,queue_computed_hrirs):
print('computing hrir')
SOFA_filename = '../HRTF_data/HUTUBS_min_phase.sofa'
# loading the simulated dataset using the support class HRIRInterpreter
HRIRInterpreter = HRIR_interpreter_min_phase_linear_interpolation(SOFA_filename=SOFA_filename)
#Continua a runnare in eterno in attesa di nuove posizioni
while(True):
#print('inside while loop')
time.sleep(1)
#print('state of the queue', queue_source_position.empty())
if(queue_source_position.empty() is False):
position = queue_source_position.get()
required_IR = HRIRInterpreter.get_interpolated_IR(position[0], position[1], 1)
queue_computed_hrirs.put(required_IR)
#print('printing computed HRIR:', required_IR)
def compute_cutoff(queue_source_position, queue_object_position, queue_computed_cutoff):
print('computing cutoff')
cutoff = 20000
object_renderer = ObjectRenderer()
while(True):
time.sleep(1)
#print('status of source position queue:', queue_source_position.empty)
if(queue_object_position.empty() is False):
print('inside object position update')
object_positions = queue_object_position.get()
object_renderer.update_object_position(object_positions)
if(queue_source_position.empty() is False):
print('inside source position update')
source_position = queue_source_position.get()
cutoff = object_renderer.get_cutoff(azimuth=source_position[0], elevation=source_position[1])
queue_computed_cutoff.put(cutoff)
if __name__ == '__main__':
i = 0
source_position = np.zeros(2)
current_cutoff = 0
object_positions = np.array([(20, 0), (40, 0), (180, 0), (225, 0)])
queue_source_position_hrir_calculator = Queue()
queue_source_position_cutoff_calculator = Queue()
queue_object_position = Queue()
queue_computed_hrirs = Queue()
queue_computed_cutoff = Queue()
while (i < 100):
# print('into main while-> source_position:', source_position[0])
queue_source_position_hrir_calculator.put(source_position)
queue_source_position_cutoff_calculator.put(source_position)
queue_object_position.put(object_positions)
source_position[0] = source_position[0] + 10
i = i + 1
p1 = multiprocessing.Process(target=compute_hrir, args = (queue_source_position_hrir_calculator,queue_computed_hrirs))
p2 = multiprocessing.Process(target=compute_cutoff, args=(queue_source_position_cutoff_calculator, queue_object_position, queue_computed_cutoff))
p1.start()
p2.start()
while(True):
if(queue_computed_hrirs is not False):
current_hrir = queue_computed_hrirs.get()
print('current_hrir:')
if(queue_computed_cutoff is not False):
current_cutoff = queue_computed_cutoff.get()
print('current cutoff:', current_cutoff)
print('doing stuff with the new values of current_hrir and current_cutoff')
time.sleep(1)
这两个子进程创建了两个不同的对象,封装了我计算所需的所有逻辑(为了我的问题清楚起见,对象的代码已被删除,它已经过测试并且在单处理中完美运行).
我希望 compute_hrir(...)
和 compute_cutoff(...)
函数并行发生,它们的输出同步,而我的主进程继续处理它的事情并在它们出现时使用两个新参数可用。
为了让事情更清楚这里是一个例子:
- 我有一系列声源位置(以度数表示)模拟声源围绕我头部的圆周运动,假设一个向量像
[0°, 10°, 20°, 30°, 50°, 60°]
- 我有一个障碍物向量(总是以我头部周围的度数表示),假设
[10°,30°]
- 我的
compute_hrir(...)
函数将源位置向量 ([0°, 10°, 20°, 30°, 50°, 60°]
) 作为输入,并以给定速率计算各个位置的 HRIR
- 我的
compute_cutoff(...)
函数将源位置向量和物体障碍物向量作为输入,并输出计算出的截止值
- 这两个过程并行运行s,但它们的输出必须同步:
compute_hrir(...)
输出0°位置声源的HRIR,compute_cutoff(...)
输出声源的截止位置在 0°,障碍物在 [10°,30°]
。然后过了一会儿,compute_hrir(...)
计算源位置在 10° 的 HRIR,compute_cutoff(...)
输出源位置在 10° 和障碍物在 [10°,30°]
的截止值,依此类推...
如何同步两个子进程的输出?目前,我的代码 运行 是独立的,我得到“未对齐”的输出(例如,位置 0° 的 HRIR 但位置 20° 的截止)。
注意: 我知道由于 while(True)
条件,所有进程都不会结束。这用于 运行 音频输出,直到用户强制停止模拟。
正如我提到的 Queue.empty()
是不可靠的;您需要在子进程中使用阻塞读取。为了防止永远循环,我添加了代码以在队列中放置一个额外的 None
值,它用作“消息结束”指示器。子进程可以测试这个值,如果它收到这个特殊指示器,它可以结束,或者如果你想继续无限循环,停止从队列中读取并继续使用最后读取的值。我已经包含了执行这两项操作的代码; return 的代码已被注释掉。
主进程启动一个执行同步的线程。它从每个输出队列中读取一条消息,然后将组合值作为单个元组写入新的 threading.Queue
实例。主进程对该队列进行阻塞获取,只是为了获取第一个值对。从那时起,它使用方法 queue.Queue.get_nowait
执行非阻塞获取,如果队列为空,它将抛出 queue.Empty
异常。 这是测试队列是否为空的唯一可靠方法。但实际上没有必要在子进程中使用此方法,因为队列永远不会为空,直到它得到最后一个“消息结束”指示器值。
我也做了一些更正。例如,主进程在修改其放置的值时将其放入 queue_source_position_hrir_calculator
和 queue_source_position_cutoff_calculator
队列时似乎存在竞争条件。我认为这是因为它是一个单独的线程,它实际上在执行放置操作,并且循环在线程实际完成放置操作之前修改 source_position
值。
import time
import numpy as np
from classes.convolutioner import Convolutioner
import librosa
from classes.HRIR_interpreter_min_phase_linear_interpolation import HRIR_interpreter_min_phase_linear_interpolation
from classes.object_renderer import ObjectRenderer
import multiprocessing
from multiprocessing import Queue
import threading
import queue
#Useful video https://www.youtube.com/watch?v=sp7EhjLkFY4&t=17s&ab_channel=codebasics
def compute_hrir(queue_source_position,queue_computed_hrirs):
print('computing hrir')
SOFA_filename = '../HRTF_data/HUTUBS_min_phase.sofa'
# loading the simulated dataset using the support class HRIRInterpreter
HRIRInterpreter = HRIR_interpreter_min_phase_linear_interpolation(SOFA_filename=SOFA_filename)
#Continua a runnare in eterno in attesa di nuove posizioni
eof_source_position = False
# Un-comment following line to return when no more messages
#while not eof_source_position:
while True:
#print('inside while loop')
time.sleep(1)
#print('state of the queue', queue_source_position.empty())
if not eof_source_position:
position = queue_source_position.get()
if position is None:
eof_source_position = True #end of messages indicator
else:
required_IR = HRIRInterpreter.get_interpolated_IR(position[0], position[1], 1)
queue_computed_hrirs.put(required_IR)
#print('printing computed HRIR:', required_IR)
queue_computed_hrirs.put(None) # end of messages indicator
def compute_cutoff(queue_source_position, queue_computed_cutoff):
print('computing cutoff')
cutoff = 20000
object_renderer = ObjectRenderer()
object_positions = np.array([(20, 0), (40, 0), (180, 0), (225, 0)])
eof_source_position = False
# Un-comment following line to return when no more messages
#while not eof_source_position:
while True:
time.sleep(1)
object_renderer.update_object_position(object_positions)
if not eof_source_position:
print('inside source position update')
source_position = queue_source_position.get()
if source_position is None: #end of messages indicator
eof_source_position = True
else:
cutoff = object_renderer.get_cutoff(azimuth=source_position[0], elevation=source_position[1])
queue_computed_cutoff.put(cutoff)
queue_computed_cutoff.put(None) # end of messages indicator
def process_output(queue_computed_hrirs, queue_computed_cutoff, output_queue):
while True:
current_hrir = queue_computed_hrirs.get()
if current_hrir is None: # end of message indicator
break
#print('current_hrir:', current_hrir)
current_cutoff = queue_computed_cutoff.get()
#print('current cutoff:', current_cutoff)
output_queue.put((current_hrir, current_cutoff))
output_queue.put(None) # end of messages indicator
def process_block(current_hrir, current_cutoff):
print('doing stuff with the new values of current_hrir and current_cutoff', current_hrir, current_cutoff)
...
if __name__ == '__main__':
current_cutoff = 0
queue_source_position_hrir_calculator = Queue()
queue_source_position_cutoff_calculator = Queue()
queue_computed_hrirs = Queue()
queue_computed_cutoff = Queue()
i = 0.0
for _ in range(100):
# print('into main while-> source_position:', source_position[0])
source_position = np.array([i, 0.0])
queue_source_position_hrir_calculator.put(source_position)
queue_source_position_cutoff_calculator.put(source_position)
i += 10
queue_source_position_hrir_calculator.put(None) # "end of messages" indicator
queue_source_position_cutoff_calculator.put(None) # "end of messages" indicator
p1 = multiprocessing.Process(target=compute_hrir, args=(queue_source_position_hrir_calculator, queue_computed_hrirs))
p2 = multiprocessing.Process(target=compute_cutoff, args=(queue_source_position_cutoff_calculator, queue_computed_cutoff))
output_queue = queue.Queue()
t = threading.Thread(target=process_output, args=(queue_computed_hrirs, queue_computed_cutoff, output_queue))
t.start()
p1.start()
p2.start()
# get first set of values:
current_hrir, current_cutoff = output_queue.get()
while True:
process_block(current_hrir, current_cutoff)
# any new values?
try:
tpl = output_queue.get_nowait()
if tpl is None: # end of messages indicator
break
except queue.Empty:
pass
else:
current_hrir, current_cutoff = tpl
# don't sleep too long so you can keep up with rate queue is being filled
time.sleep(.8)
p1.join()
p2.join()
t.join()
我正在开发一个多进程代码来并行计算两个并行进程中的一些参数,然后这些参数在可用时用于主进程。
特别是我在 3D 音频领域工作(开发具有音频障碍物的 3D 音频模拟器),以上两个过程计算给定声源位置的插值 HRIR 和截止频率(障碍物模拟)给定声源和障碍物位置。
“结构”如下:
为了让事情更清楚:
- 主进程生成 2 个子进程并使用要模拟的源和对象位置序列初始化它们
- 主进程继续做自己的事情(做 DSP 的事情等等...)
- 当两个子进程完成对单个位置的处理后,它们输出 HRIR 和截止值。这两个参数在主进程中更新,主进程将继续使用它自己的 DSP 内容,但使用两个新参数
到目前为止,这是我的代码:
import time
import numpy as np
from classes.convolutioner import Convolutioner
import librosa
from classes.HRIR_interpreter_min_phase_linear_interpolation import HRIR_interpreter_min_phase_linear_interpolation
from classes.object_renderer import ObjectRenderer
import multiprocessing
from multiprocessing import Queue
#Useful video https://www.youtube.com/watch?v=sp7EhjLkFY4&t=17s&ab_channel=codebasics
def compute_hrir(queue_source_position,queue_computed_hrirs):
print('computing hrir')
SOFA_filename = '../HRTF_data/HUTUBS_min_phase.sofa'
# loading the simulated dataset using the support class HRIRInterpreter
HRIRInterpreter = HRIR_interpreter_min_phase_linear_interpolation(SOFA_filename=SOFA_filename)
#Continua a runnare in eterno in attesa di nuove posizioni
while(True):
#print('inside while loop')
time.sleep(1)
#print('state of the queue', queue_source_position.empty())
if(queue_source_position.empty() is False):
position = queue_source_position.get()
required_IR = HRIRInterpreter.get_interpolated_IR(position[0], position[1], 1)
queue_computed_hrirs.put(required_IR)
#print('printing computed HRIR:', required_IR)
def compute_cutoff(queue_source_position, queue_object_position, queue_computed_cutoff):
print('computing cutoff')
cutoff = 20000
object_renderer = ObjectRenderer()
while(True):
time.sleep(1)
#print('status of source position queue:', queue_source_position.empty)
if(queue_object_position.empty() is False):
print('inside object position update')
object_positions = queue_object_position.get()
object_renderer.update_object_position(object_positions)
if(queue_source_position.empty() is False):
print('inside source position update')
source_position = queue_source_position.get()
cutoff = object_renderer.get_cutoff(azimuth=source_position[0], elevation=source_position[1])
queue_computed_cutoff.put(cutoff)
if __name__ == '__main__':
i = 0
source_position = np.zeros(2)
current_cutoff = 0
object_positions = np.array([(20, 0), (40, 0), (180, 0), (225, 0)])
queue_source_position_hrir_calculator = Queue()
queue_source_position_cutoff_calculator = Queue()
queue_object_position = Queue()
queue_computed_hrirs = Queue()
queue_computed_cutoff = Queue()
while (i < 100):
# print('into main while-> source_position:', source_position[0])
queue_source_position_hrir_calculator.put(source_position)
queue_source_position_cutoff_calculator.put(source_position)
queue_object_position.put(object_positions)
source_position[0] = source_position[0] + 10
i = i + 1
p1 = multiprocessing.Process(target=compute_hrir, args = (queue_source_position_hrir_calculator,queue_computed_hrirs))
p2 = multiprocessing.Process(target=compute_cutoff, args=(queue_source_position_cutoff_calculator, queue_object_position, queue_computed_cutoff))
p1.start()
p2.start()
while(True):
if(queue_computed_hrirs is not False):
current_hrir = queue_computed_hrirs.get()
print('current_hrir:')
if(queue_computed_cutoff is not False):
current_cutoff = queue_computed_cutoff.get()
print('current cutoff:', current_cutoff)
print('doing stuff with the new values of current_hrir and current_cutoff')
time.sleep(1)
这两个子进程创建了两个不同的对象,封装了我计算所需的所有逻辑(为了我的问题清楚起见,对象的代码已被删除,它已经过测试并且在单处理中完美运行).
我希望 compute_hrir(...)
和 compute_cutoff(...)
函数并行发生,它们的输出同步,而我的主进程继续处理它的事情并在它们出现时使用两个新参数可用。
为了让事情更清楚这里是一个例子:
- 我有一系列声源位置(以度数表示)模拟声源围绕我头部的圆周运动,假设一个向量像
[0°, 10°, 20°, 30°, 50°, 60°]
- 我有一个障碍物向量(总是以我头部周围的度数表示),假设
[10°,30°]
- 我的
compute_hrir(...)
函数将源位置向量 ([0°, 10°, 20°, 30°, 50°, 60°]
) 作为输入,并以给定速率计算各个位置的 HRIR - 我的
compute_cutoff(...)
函数将源位置向量和物体障碍物向量作为输入,并输出计算出的截止值 - 这两个过程并行运行s,但它们的输出必须同步:
compute_hrir(...)
输出0°位置声源的HRIR,compute_cutoff(...)
输出声源的截止位置在 0°,障碍物在[10°,30°]
。然后过了一会儿,compute_hrir(...)
计算源位置在 10° 的 HRIR,compute_cutoff(...)
输出源位置在 10° 和障碍物在[10°,30°]
的截止值,依此类推...
如何同步两个子进程的输出?目前,我的代码 运行 是独立的,我得到“未对齐”的输出(例如,位置 0° 的 HRIR 但位置 20° 的截止)。
注意: 我知道由于 while(True)
条件,所有进程都不会结束。这用于 运行 音频输出,直到用户强制停止模拟。
正如我提到的 Queue.empty()
是不可靠的;您需要在子进程中使用阻塞读取。为了防止永远循环,我添加了代码以在队列中放置一个额外的 None
值,它用作“消息结束”指示器。子进程可以测试这个值,如果它收到这个特殊指示器,它可以结束,或者如果你想继续无限循环,停止从队列中读取并继续使用最后读取的值。我已经包含了执行这两项操作的代码; return 的代码已被注释掉。
主进程启动一个执行同步的线程。它从每个输出队列中读取一条消息,然后将组合值作为单个元组写入新的 threading.Queue
实例。主进程对该队列进行阻塞获取,只是为了获取第一个值对。从那时起,它使用方法 queue.Queue.get_nowait
执行非阻塞获取,如果队列为空,它将抛出 queue.Empty
异常。 这是测试队列是否为空的唯一可靠方法。但实际上没有必要在子进程中使用此方法,因为队列永远不会为空,直到它得到最后一个“消息结束”指示器值。
我也做了一些更正。例如,主进程在修改其放置的值时将其放入 queue_source_position_hrir_calculator
和 queue_source_position_cutoff_calculator
队列时似乎存在竞争条件。我认为这是因为它是一个单独的线程,它实际上在执行放置操作,并且循环在线程实际完成放置操作之前修改 source_position
值。
import time
import numpy as np
from classes.convolutioner import Convolutioner
import librosa
from classes.HRIR_interpreter_min_phase_linear_interpolation import HRIR_interpreter_min_phase_linear_interpolation
from classes.object_renderer import ObjectRenderer
import multiprocessing
from multiprocessing import Queue
import threading
import queue
#Useful video https://www.youtube.com/watch?v=sp7EhjLkFY4&t=17s&ab_channel=codebasics
def compute_hrir(queue_source_position,queue_computed_hrirs):
print('computing hrir')
SOFA_filename = '../HRTF_data/HUTUBS_min_phase.sofa'
# loading the simulated dataset using the support class HRIRInterpreter
HRIRInterpreter = HRIR_interpreter_min_phase_linear_interpolation(SOFA_filename=SOFA_filename)
#Continua a runnare in eterno in attesa di nuove posizioni
eof_source_position = False
# Un-comment following line to return when no more messages
#while not eof_source_position:
while True:
#print('inside while loop')
time.sleep(1)
#print('state of the queue', queue_source_position.empty())
if not eof_source_position:
position = queue_source_position.get()
if position is None:
eof_source_position = True #end of messages indicator
else:
required_IR = HRIRInterpreter.get_interpolated_IR(position[0], position[1], 1)
queue_computed_hrirs.put(required_IR)
#print('printing computed HRIR:', required_IR)
queue_computed_hrirs.put(None) # end of messages indicator
def compute_cutoff(queue_source_position, queue_computed_cutoff):
print('computing cutoff')
cutoff = 20000
object_renderer = ObjectRenderer()
object_positions = np.array([(20, 0), (40, 0), (180, 0), (225, 0)])
eof_source_position = False
# Un-comment following line to return when no more messages
#while not eof_source_position:
while True:
time.sleep(1)
object_renderer.update_object_position(object_positions)
if not eof_source_position:
print('inside source position update')
source_position = queue_source_position.get()
if source_position is None: #end of messages indicator
eof_source_position = True
else:
cutoff = object_renderer.get_cutoff(azimuth=source_position[0], elevation=source_position[1])
queue_computed_cutoff.put(cutoff)
queue_computed_cutoff.put(None) # end of messages indicator
def process_output(queue_computed_hrirs, queue_computed_cutoff, output_queue):
while True:
current_hrir = queue_computed_hrirs.get()
if current_hrir is None: # end of message indicator
break
#print('current_hrir:', current_hrir)
current_cutoff = queue_computed_cutoff.get()
#print('current cutoff:', current_cutoff)
output_queue.put((current_hrir, current_cutoff))
output_queue.put(None) # end of messages indicator
def process_block(current_hrir, current_cutoff):
print('doing stuff with the new values of current_hrir and current_cutoff', current_hrir, current_cutoff)
...
if __name__ == '__main__':
current_cutoff = 0
queue_source_position_hrir_calculator = Queue()
queue_source_position_cutoff_calculator = Queue()
queue_computed_hrirs = Queue()
queue_computed_cutoff = Queue()
i = 0.0
for _ in range(100):
# print('into main while-> source_position:', source_position[0])
source_position = np.array([i, 0.0])
queue_source_position_hrir_calculator.put(source_position)
queue_source_position_cutoff_calculator.put(source_position)
i += 10
queue_source_position_hrir_calculator.put(None) # "end of messages" indicator
queue_source_position_cutoff_calculator.put(None) # "end of messages" indicator
p1 = multiprocessing.Process(target=compute_hrir, args=(queue_source_position_hrir_calculator, queue_computed_hrirs))
p2 = multiprocessing.Process(target=compute_cutoff, args=(queue_source_position_cutoff_calculator, queue_computed_cutoff))
output_queue = queue.Queue()
t = threading.Thread(target=process_output, args=(queue_computed_hrirs, queue_computed_cutoff, output_queue))
t.start()
p1.start()
p2.start()
# get first set of values:
current_hrir, current_cutoff = output_queue.get()
while True:
process_block(current_hrir, current_cutoff)
# any new values?
try:
tpl = output_queue.get_nowait()
if tpl is None: # end of messages indicator
break
except queue.Empty:
pass
else:
current_hrir, current_cutoff = tpl
# don't sleep too long so you can keep up with rate queue is being filled
time.sleep(.8)
p1.join()
p2.join()
t.join()