跨进程共享 Numpy 数组变量 - Python 多处理
Sharing Numpy array variables across processes - Python Multiprocessing
我正在编写一些使用 Goerztel method 构建频谱图的代码。计算主要使用 Numpy ndarrays 执行。最终的频谱图是一个 2D ndarray(例如 1536 x 828),它是从初始 empty/zero ndarray 构建的,然后用 Goerztel 算法的结果(列向量)更新,该算法执行 num_windows
次.
我有使用其他编程语言 multithreading/parallel 进行处理的经验,C/Java,但对 Python 不太熟悉。我有一个多进程版本的代码可以工作,但我觉得有更多 elegant/efficient 的方法可以做到这一点。根据我对 Python 中代码和多处理的理解,每个进程(transformed_cols
和 coefficients
ndarrays)中使用了一些变量的副本,我认为这是可以避免的。
我认为这段代码适合并行的原因是,当写入发生在同一个 ndarray 上时,写入 ndarray 的哪一部分没有重叠。
通过阅读其他类似的 posts,我未能找到适合我的情况足以解决我的问题的方法,因此我们将不胜感激。我认为可以改进的部分是 apply_async 函数调用,我只是不确定如何:(
就其价值而言,与我的串行解决方案相比,我发现使用以下解决方案(在我的机器上)速度提高了大约 3-3.5 倍
def build_specific_spectrogram(signal: np.ndarray,
sample_rate: int,
filterbank: Filterbank,
analysis_window: AnalysisWindow,
time_spaces: list,
num_windows: int) -> np.ndarray:
if :
## other spectrograms here
elif filterbank.name == 'goertzel':
spect = np.zeros((filterbank.num_bands, num_windows), dtype='complex_')
transformed_cols = build_window_transformed_cols(analysis_window.data, signal, num_windows, analysis_window.window_overlap)
coefficients = filterbank.band_frequencies / sample_rate * transformed_cols.shape[0]
num_processes = mp.cpu_count()
def update_spect(result, index):
spect[:,index] = result
pool = mp.Pool(processes=num_processes)
for win_index in range(num_windows-1):
func_callback = partial(update_spect, index=win_index)
pool.apply_async(build_goertzel_async, [win_index, transformed_cols, coefficients], callback=func_callback)
pool.close()
pool.join()
return spect
def build_goertzel_async(win_index, transformed_cols, coefficients):
signal_window = transformed_cols[:, win_index]
window_powers = generalized_goertzel(signal_window, coefficients)
return window_powers[:,]
def build_window_transformed_cols(analysis_window_data: np.ndarray, sample_window: np.ndarray, num_windows: int, window_overlap: float) -> np.ndarray:
transformed_cols = np.zeros((len(analysis_window_data), num_windows - 1))
s_index = 0
e_index = len(analysis_window_data)
for win_index in range(num_windows-1):
windowed_signal = sample_window[s_index:e_index]
transformed_cols[:, win_index] = np.asarray([windowed_signal[i] * analysis_window_data[i] for i in range(len(windowed_signal))])
s_index += window_overlap
e_index += window_overlap
return transformed_cols
def generalized_goertzel(signal_window: np.ndarray,
coefficients: np.ndarray) -> np.ndarray:
signal_length = len(signal_window)
signal_window = np.reshape(signal_window, (signal_length, 1), order='F')
num_freqs = len(coefficients)
powers = np.zeros((num_freqs), dtype = 'complex_')
for freq_index in range(num_freqs):
A = 2 * math.pi * (coefficients[freq_index] / signal_length)
B = math.cos(A) * 2
C = cmath.exp(A * -1j)
s_0 = 0
s_1 = 0
s_2 = 0
for i in range(0, signal_length-1):
s_0 = signal_window[i] + B * s_1 - s_2
s_2 = s_1
s_1 = s_0
s_0 = signal_window[signal_length - 1] + B * s_1 - s_2
powers[freq_index] = s_0 - s_1 * C
powers[freq_index] = powers[freq_index] * cmath.exp(A * (signal_length - 1) * -1j)
return powers
提前为未提供的代码道歉 运行,但这需要完整的代码库,对于 Whosebug post。
只是想提供一个有效的答案。非常感谢@Aarons 的评论和他之前的 post,帮了大忙。
结构略有变化,但功能相同。执行时间也有所改善。没有单进程和原来的多进程实现之间那么大,但仍然是一个很好的改进。
def goertzel_spectrogram_by_multiprocessing(signal: np.ndarray,
filterbank: Filterbank,
num_windows: int,
analysis_window_data: list,
analysis_window_overlap: float,
sample_rate: int) -> np.ndarray:
spect_shm = shared_memory.SharedMemory(create=True, size=filterbank.num_bands * num_windows * 16)
spect = np.ndarray((filterbank.num_bands, num_windows), dtype='complex_', buffer=spect_shm.buf)
transformed_cols_shm = shared_memory.SharedMemory(create=True, size=len(analysis_window_data) * num_windows * 8)
transformed_cols = np.ndarray((len(analysis_window_data), num_windows), buffer=transformed_cols_shm.buf)
transformed_cols[:] = build_window_transformed_cols(analysis_window_data, signal, num_windows, analysis_window_overlap)
coefficients_shm = shared_memory.SharedMemory(create=True, size=len(filterbank.band_frequencies) * 8)
coefficients = np.ndarray((len(filterbank.band_frequencies)), dtype='float64', buffer=coefficients_shm.buf)
coefficients[:] = filterbank.band_frequencies / sample_rate * transformed_cols.shape[0]
cpu_count = mp.cpu_count()
print('cpu_count = %d', cpu_count)
in_q = mp.Queue()
shm_names = {
'spect': spect_shm,
'cols': transformed_cols_shm,
'coefficients': coefficients_shm
}
processes = [mp.Process(target=update_goertzel, args=(in_q, shm_names, spect.shape, transformed_cols.shape, coefficients.shape)) for _ in range(cpu_count)]
for p in processes:
p.start()
for window_index in range(num_windows):
in_q.put(window_index)
for _ in processes:
in_q.put(STOPFLAG())
for p in processes:
p.join()
spect_copy = np.copy(spect) ## need to copy, since the close and unlink operations destroy the original
spect_shm.close()
spect_shm.unlink()
transformed_cols_shm.close()
transformed_cols_shm.unlink()
coefficients_shm.close()
coefficients_shm.unlink()
return spect_copy
class STOPFLAG: pass
def update_goertzel(in_q, shm_names: dict, spect_shape, cols_shape, coefficients_shape):
spect_shm = shm_names['spect']
transformed_cols_shm = shm_names['cols']
coefficients_shm = shm_names['coefficients']
spect = np.ndarray(spect_shape, dtype='complex_', buffer=spect_shm.buf)
transformed_cols = np.ndarray(cols_shape, dtype='float64', buffer=transformed_cols_shm.buf)
coefficients = np.ndarray(coefficients_shape, dtype='float64', buffer=coefficients_shm.buf)
while True:
try:
window_index = in_q.get(1)
except Empty:
print('Tasks done, exitting')
break
if isinstance(window_index, STOPFLAG):
print('Received STOPFLAG, exitting')
break
res = build_goertzel(window_index, transformed_cols, coefficients)
spect[:,window_index] = res
def build_goertzel(win_index, transformed_cols, coefficients):
signal_window = transformed_cols[:, win_index]
window_powers = generalized_goertzel(signal_window, coefficients)
return window_powers[:,]
def build_window_transformed_cols(analysis_window_data: np.ndarray,
sample_window: np.ndarray,
num_windows: int,
window_overlap: float) -> np.ndarray:
transformed_cols = np.zeros((len(analysis_window_data), num_windows ), dtype='float64')
s_index = 0
e_index = len(analysis_window_data)
for win_index in range(num_windows-1):
windowed_signal = sample_window[s_index:e_index]
transformed_cols[:, win_index] = np.asarray([windowed_signal[i] * analysis_window_data[i] for i in range(len(windowed_signal))])
s_index += window_overlap
e_index += window_overlap
return transformed_cols
def generalized_goertzel(signal_window: np.ndarray,
coefficients: np.ndarray) -> np.ndarray:
signal_length = len(signal_window)
signal_window = np.reshape(signal_window, (signal_length, 1), order='F')
num_freqs = len(coefficients)
powers = np.zeros((num_freqs), dtype = 'complex_')
for freq_index in range(num_freqs):
A = 2 * math.pi * (coefficients[freq_index] / signal_length)
B = math.cos(A) * 2
C = cmath.exp(A * -1j)
s_0 = 0
s_1 = 0
s_2 = 0
for i in range(0, signal_length-1):
s_0 = signal_window[i] + B * s_1 - s_2
s_2 = s_1
s_1 = s_0
s_0 = signal_window[signal_length - 1] + B * s_1 - s_2
powers[freq_index] = s_0 - s_1 * C
powers[freq_index] = powers[freq_index] * cmath.exp(A * (signal_length - 1) * -1j)
return powers
我正在编写一些使用 Goerztel method 构建频谱图的代码。计算主要使用 Numpy ndarrays 执行。最终的频谱图是一个 2D ndarray(例如 1536 x 828),它是从初始 empty/zero ndarray 构建的,然后用 Goerztel 算法的结果(列向量)更新,该算法执行 num_windows
次.
我有使用其他编程语言 multithreading/parallel 进行处理的经验,C/Java,但对 Python 不太熟悉。我有一个多进程版本的代码可以工作,但我觉得有更多 elegant/efficient 的方法可以做到这一点。根据我对 Python 中代码和多处理的理解,每个进程(transformed_cols
和 coefficients
ndarrays)中使用了一些变量的副本,我认为这是可以避免的。
我认为这段代码适合并行的原因是,当写入发生在同一个 ndarray 上时,写入 ndarray 的哪一部分没有重叠。
通过阅读其他类似的 posts,我未能找到适合我的情况足以解决我的问题的方法,因此我们将不胜感激。我认为可以改进的部分是 apply_async 函数调用,我只是不确定如何:(
就其价值而言,与我的串行解决方案相比,我发现使用以下解决方案(在我的机器上)速度提高了大约 3-3.5 倍
def build_specific_spectrogram(signal: np.ndarray,
sample_rate: int,
filterbank: Filterbank,
analysis_window: AnalysisWindow,
time_spaces: list,
num_windows: int) -> np.ndarray:
if :
## other spectrograms here
elif filterbank.name == 'goertzel':
spect = np.zeros((filterbank.num_bands, num_windows), dtype='complex_')
transformed_cols = build_window_transformed_cols(analysis_window.data, signal, num_windows, analysis_window.window_overlap)
coefficients = filterbank.band_frequencies / sample_rate * transformed_cols.shape[0]
num_processes = mp.cpu_count()
def update_spect(result, index):
spect[:,index] = result
pool = mp.Pool(processes=num_processes)
for win_index in range(num_windows-1):
func_callback = partial(update_spect, index=win_index)
pool.apply_async(build_goertzel_async, [win_index, transformed_cols, coefficients], callback=func_callback)
pool.close()
pool.join()
return spect
def build_goertzel_async(win_index, transformed_cols, coefficients):
signal_window = transformed_cols[:, win_index]
window_powers = generalized_goertzel(signal_window, coefficients)
return window_powers[:,]
def build_window_transformed_cols(analysis_window_data: np.ndarray, sample_window: np.ndarray, num_windows: int, window_overlap: float) -> np.ndarray:
transformed_cols = np.zeros((len(analysis_window_data), num_windows - 1))
s_index = 0
e_index = len(analysis_window_data)
for win_index in range(num_windows-1):
windowed_signal = sample_window[s_index:e_index]
transformed_cols[:, win_index] = np.asarray([windowed_signal[i] * analysis_window_data[i] for i in range(len(windowed_signal))])
s_index += window_overlap
e_index += window_overlap
return transformed_cols
def generalized_goertzel(signal_window: np.ndarray,
coefficients: np.ndarray) -> np.ndarray:
signal_length = len(signal_window)
signal_window = np.reshape(signal_window, (signal_length, 1), order='F')
num_freqs = len(coefficients)
powers = np.zeros((num_freqs), dtype = 'complex_')
for freq_index in range(num_freqs):
A = 2 * math.pi * (coefficients[freq_index] / signal_length)
B = math.cos(A) * 2
C = cmath.exp(A * -1j)
s_0 = 0
s_1 = 0
s_2 = 0
for i in range(0, signal_length-1):
s_0 = signal_window[i] + B * s_1 - s_2
s_2 = s_1
s_1 = s_0
s_0 = signal_window[signal_length - 1] + B * s_1 - s_2
powers[freq_index] = s_0 - s_1 * C
powers[freq_index] = powers[freq_index] * cmath.exp(A * (signal_length - 1) * -1j)
return powers
提前为未提供的代码道歉 运行,但这需要完整的代码库,对于 Whosebug post。
只是想提供一个有效的答案。非常感谢@Aarons 的评论和他之前的 post,帮了大忙。
结构略有变化,但功能相同。执行时间也有所改善。没有单进程和原来的多进程实现之间那么大,但仍然是一个很好的改进。
def goertzel_spectrogram_by_multiprocessing(signal: np.ndarray,
filterbank: Filterbank,
num_windows: int,
analysis_window_data: list,
analysis_window_overlap: float,
sample_rate: int) -> np.ndarray:
spect_shm = shared_memory.SharedMemory(create=True, size=filterbank.num_bands * num_windows * 16)
spect = np.ndarray((filterbank.num_bands, num_windows), dtype='complex_', buffer=spect_shm.buf)
transformed_cols_shm = shared_memory.SharedMemory(create=True, size=len(analysis_window_data) * num_windows * 8)
transformed_cols = np.ndarray((len(analysis_window_data), num_windows), buffer=transformed_cols_shm.buf)
transformed_cols[:] = build_window_transformed_cols(analysis_window_data, signal, num_windows, analysis_window_overlap)
coefficients_shm = shared_memory.SharedMemory(create=True, size=len(filterbank.band_frequencies) * 8)
coefficients = np.ndarray((len(filterbank.band_frequencies)), dtype='float64', buffer=coefficients_shm.buf)
coefficients[:] = filterbank.band_frequencies / sample_rate * transformed_cols.shape[0]
cpu_count = mp.cpu_count()
print('cpu_count = %d', cpu_count)
in_q = mp.Queue()
shm_names = {
'spect': spect_shm,
'cols': transformed_cols_shm,
'coefficients': coefficients_shm
}
processes = [mp.Process(target=update_goertzel, args=(in_q, shm_names, spect.shape, transformed_cols.shape, coefficients.shape)) for _ in range(cpu_count)]
for p in processes:
p.start()
for window_index in range(num_windows):
in_q.put(window_index)
for _ in processes:
in_q.put(STOPFLAG())
for p in processes:
p.join()
spect_copy = np.copy(spect) ## need to copy, since the close and unlink operations destroy the original
spect_shm.close()
spect_shm.unlink()
transformed_cols_shm.close()
transformed_cols_shm.unlink()
coefficients_shm.close()
coefficients_shm.unlink()
return spect_copy
class STOPFLAG: pass
def update_goertzel(in_q, shm_names: dict, spect_shape, cols_shape, coefficients_shape):
spect_shm = shm_names['spect']
transformed_cols_shm = shm_names['cols']
coefficients_shm = shm_names['coefficients']
spect = np.ndarray(spect_shape, dtype='complex_', buffer=spect_shm.buf)
transformed_cols = np.ndarray(cols_shape, dtype='float64', buffer=transformed_cols_shm.buf)
coefficients = np.ndarray(coefficients_shape, dtype='float64', buffer=coefficients_shm.buf)
while True:
try:
window_index = in_q.get(1)
except Empty:
print('Tasks done, exitting')
break
if isinstance(window_index, STOPFLAG):
print('Received STOPFLAG, exitting')
break
res = build_goertzel(window_index, transformed_cols, coefficients)
spect[:,window_index] = res
def build_goertzel(win_index, transformed_cols, coefficients):
signal_window = transformed_cols[:, win_index]
window_powers = generalized_goertzel(signal_window, coefficients)
return window_powers[:,]
def build_window_transformed_cols(analysis_window_data: np.ndarray,
sample_window: np.ndarray,
num_windows: int,
window_overlap: float) -> np.ndarray:
transformed_cols = np.zeros((len(analysis_window_data), num_windows ), dtype='float64')
s_index = 0
e_index = len(analysis_window_data)
for win_index in range(num_windows-1):
windowed_signal = sample_window[s_index:e_index]
transformed_cols[:, win_index] = np.asarray([windowed_signal[i] * analysis_window_data[i] for i in range(len(windowed_signal))])
s_index += window_overlap
e_index += window_overlap
return transformed_cols
def generalized_goertzel(signal_window: np.ndarray,
coefficients: np.ndarray) -> np.ndarray:
signal_length = len(signal_window)
signal_window = np.reshape(signal_window, (signal_length, 1), order='F')
num_freqs = len(coefficients)
powers = np.zeros((num_freqs), dtype = 'complex_')
for freq_index in range(num_freqs):
A = 2 * math.pi * (coefficients[freq_index] / signal_length)
B = math.cos(A) * 2
C = cmath.exp(A * -1j)
s_0 = 0
s_1 = 0
s_2 = 0
for i in range(0, signal_length-1):
s_0 = signal_window[i] + B * s_1 - s_2
s_2 = s_1
s_1 = s_0
s_0 = signal_window[signal_length - 1] + B * s_1 - s_2
powers[freq_index] = s_0 - s_1 * C
powers[freq_index] = powers[freq_index] * cmath.exp(A * (signal_length - 1) * -1j)
return powers