跨进程共享 Numpy 数组变量 - Python 多处理

Sharing Numpy array variables across processes - Python Multiprocessing

我正在编写一些使用 Goerztel method 构建频谱图的代码。计算主要使用 Numpy ndarrays 执行。最终的频谱图是一个 2D ndarray(例如 1536 x 828),它是从初始 empty/zero ndarray 构建的,然后用 Goerztel 算法的结果(列向量)更新,该算法执行 num_windows 次.

我有使用其他编程语言 multithreading/parallel 进行处理的经验,C/Java,但对 Python 不太熟悉。我有一个多进程版本的代码可以工作,但我觉得有更多 elegant/efficient 的方法可以做到这一点。根据我对 Python 中代码和多处理的理解,每个进程(transformed_colscoefficients ndarrays)中使用了一些变量的副本,我认为这是可以避免的。

我认为这段代码适合并行的原因是,当写入发生在同一个 ndarray 上时,写入 ndarray 的哪一部分没有重叠。

通过阅读其他类似的 posts,我未能找到适合我的情况足以解决我的问题的方法,因此我们将不胜感激。我认为可以改进的部分是 apply_async 函数调用,我只是不确定如何:(

就其价值而言,与我的串行解决方案相比,我发现使用以下解决方案(在我的机器上)速度提高了大约 3-3.5 倍

def build_specific_spectrogram(signal: np.ndarray, 
                               sample_rate: int, 
                               filterbank: Filterbank,
                               analysis_window: AnalysisWindow,
                               time_spaces: list,
                               num_windows: int) -> np.ndarray:
    if :
        ## other spectrograms here
    elif filterbank.name == 'goertzel':
        spect = np.zeros((filterbank.num_bands, num_windows), dtype='complex_')
        transformed_cols = build_window_transformed_cols(analysis_window.data, signal, num_windows, analysis_window.window_overlap)

        coefficients = filterbank.band_frequencies / sample_rate * transformed_cols.shape[0]
        
        num_processes = mp.cpu_count()
        
        def update_spect(result, index):
            spect[:,index] = result
        
        pool = mp.Pool(processes=num_processes)
                    
        for win_index in range(num_windows-1):
            func_callback = partial(update_spect, index=win_index)
            pool.apply_async(build_goertzel_async, [win_index, transformed_cols, coefficients], callback=func_callback)
        pool.close()
        pool.join()
        return spect


def build_goertzel_async(win_index, transformed_cols, coefficients):
    signal_window = transformed_cols[:, win_index]
    window_powers = generalized_goertzel(signal_window, coefficients)
    return window_powers[:,]
           

def build_window_transformed_cols(analysis_window_data: np.ndarray, sample_window: np.ndarray, num_windows: int, window_overlap: float) -> np.ndarray:
    transformed_cols = np.zeros((len(analysis_window_data), num_windows - 1))
    s_index = 0
    e_index = len(analysis_window_data) 
    for win_index in range(num_windows-1):
        windowed_signal = sample_window[s_index:e_index]

        transformed_cols[:, win_index] = np.asarray([windowed_signal[i] * analysis_window_data[i] for i in range(len(windowed_signal))])
        s_index += window_overlap
        e_index += window_overlap
    return transformed_cols        

    
def generalized_goertzel(signal_window: np.ndarray, 
                         coefficients: np.ndarray) -> np.ndarray:
    signal_length = len(signal_window)
    signal_window = np.reshape(signal_window, (signal_length, 1), order='F')
    num_freqs = len(coefficients)
    powers = np.zeros((num_freqs), dtype = 'complex_')
    for freq_index in range(num_freqs):
        A = 2 * math.pi * (coefficients[freq_index] / signal_length)
        B = math.cos(A) * 2
        C = cmath.exp(A * -1j)
        s_0 = 0
        s_1 = 0
        s_2 = 0
        for i in range(0, signal_length-1):
            s_0 = signal_window[i] + B * s_1 - s_2
            s_2 = s_1
            s_1 = s_0
        s_0 = signal_window[signal_length - 1] + B * s_1 - s_2
        powers[freq_index] = s_0 - s_1 * C
        powers[freq_index] = powers[freq_index] * cmath.exp(A * (signal_length - 1) * -1j)
    return powers

提前为未提供的代码道歉 运行,但这需要完整的代码库,对于 Whosebug post。

只是想提供一个有效的答案。非常感谢@Aarons 的评论和他之前的 post,帮了大忙。

结构略有变化,但功能相同。执行时间也有所改善。没有单进程和原来的多进程实现之间那么大,但仍然是一个很好的改进。

def goertzel_spectrogram_by_multiprocessing(signal: np.ndarray,
                                            filterbank: Filterbank, 
                                            num_windows: int, 
                                            analysis_window_data: list, 
                                            analysis_window_overlap: float,
                                            sample_rate: int) -> np.ndarray:
    spect_shm = shared_memory.SharedMemory(create=True, size=filterbank.num_bands * num_windows * 16)
    spect = np.ndarray((filterbank.num_bands, num_windows), dtype='complex_', buffer=spect_shm.buf)
        
    transformed_cols_shm = shared_memory.SharedMemory(create=True, size=len(analysis_window_data) * num_windows * 8)
    transformed_cols = np.ndarray((len(analysis_window_data), num_windows), buffer=transformed_cols_shm.buf)
    transformed_cols[:] = build_window_transformed_cols(analysis_window_data, signal, num_windows, analysis_window_overlap)

    coefficients_shm = shared_memory.SharedMemory(create=True, size=len(filterbank.band_frequencies) * 8)
    coefficients = np.ndarray((len(filterbank.band_frequencies)), dtype='float64', buffer=coefficients_shm.buf)
    coefficients[:] = filterbank.band_frequencies / sample_rate * transformed_cols.shape[0]
        
    cpu_count = mp.cpu_count()
    print('cpu_count = %d', cpu_count)
        
    in_q = mp.Queue()
    shm_names = {
        'spect': spect_shm,
        'cols': transformed_cols_shm,
        'coefficients': coefficients_shm
        }
    processes = [mp.Process(target=update_goertzel, args=(in_q, shm_names, spect.shape, transformed_cols.shape, coefficients.shape)) for _ in range(cpu_count)]
    
    for p in processes:
        p.start()
        
    for window_index in range(num_windows):
        in_q.put(window_index)
        
    for _ in processes:
        in_q.put(STOPFLAG())
            
    for p in processes:
        p.join()
        
    spect_copy = np.copy(spect) ## need to copy, since the close and unlink operations destroy the original
      
    spect_shm.close()
    spect_shm.unlink()
    transformed_cols_shm.close()
    transformed_cols_shm.unlink()
    coefficients_shm.close()
    coefficients_shm.unlink()

    return spect_copy


class STOPFLAG: pass


def update_goertzel(in_q, shm_names: dict, spect_shape, cols_shape, coefficients_shape):
    spect_shm = shm_names['spect']
    transformed_cols_shm = shm_names['cols']
    coefficients_shm = shm_names['coefficients']
    spect = np.ndarray(spect_shape, dtype='complex_', buffer=spect_shm.buf)
    transformed_cols = np.ndarray(cols_shape, dtype='float64', buffer=transformed_cols_shm.buf)
    coefficients = np.ndarray(coefficients_shape, dtype='float64', buffer=coefficients_shm.buf)
    while True:
        try:    
            window_index = in_q.get(1)
        except Empty:
            print('Tasks done, exitting')
            break
        if isinstance(window_index, STOPFLAG):
            print('Received STOPFLAG, exitting')
            break
        res = build_goertzel(window_index, transformed_cols, coefficients)
        spect[:,window_index] = res


def build_goertzel(win_index, transformed_cols, coefficients):
    signal_window = transformed_cols[:, win_index]
    window_powers = generalized_goertzel(signal_window, coefficients)
    return window_powers[:,]
           

def build_window_transformed_cols(analysis_window_data: np.ndarray,
                                  sample_window: np.ndarray,
                                  num_windows: int,
                                  window_overlap: float) -> np.ndarray:
    transformed_cols = np.zeros((len(analysis_window_data), num_windows ), dtype='float64')
    s_index = 0
    e_index = len(analysis_window_data) 
    for win_index in range(num_windows-1):
        windowed_signal = sample_window[s_index:e_index]

        transformed_cols[:, win_index] = np.asarray([windowed_signal[i] * analysis_window_data[i] for i in range(len(windowed_signal))])
        s_index += window_overlap
        e_index += window_overlap
    return transformed_cols        


def generalized_goertzel(signal_window: np.ndarray, 
                         coefficients: np.ndarray) -> np.ndarray:
    signal_length = len(signal_window)
    signal_window = np.reshape(signal_window, (signal_length, 1), order='F')
    num_freqs = len(coefficients)
    powers = np.zeros((num_freqs), dtype = 'complex_')
    for freq_index in range(num_freqs):
        A = 2 * math.pi * (coefficients[freq_index] / signal_length)
        B = math.cos(A) * 2
        C = cmath.exp(A * -1j)
        s_0 = 0
        s_1 = 0
        s_2 = 0
        for i in range(0, signal_length-1):
            s_0 = signal_window[i] + B * s_1 - s_2
            s_2 = s_1
            s_1 = s_0
        s_0 = signal_window[signal_length - 1] + B * s_1 - s_2
        powers[freq_index] = s_0 - s_1 * C
        powers[freq_index] = powers[freq_index] * cmath.exp(A * (signal_length - 1) * -1j)
    return powers