实施 Python 实时说话人识别多处理模块

Implementing the Python Multiprocessing Module for Real-Time Speaker Identification

我正在开发一种实时说话人识别算法。我的想法是 运行 三个任务,即 writeAudio()detectionBlock()identificationBlock(),并行使用 multiprocessing 模块。

实际上,writeAudio()函数使用PyAudio捕获连续录音并将0.5秒的音频文件保存到本地目录,detectionBlock()函数处理最旧的两个从目录中提取 0.5 秒的文件并使用语音 Activity 检测 (VAD) 模型来确定音频是语音还是噪声,并且 identificationBlock() 函数处理一个单独的 3 秒音频文件(保存到与一大块 0.5 秒音频文件不同的目录),然后使用语音识别 (VR) 模型确定说话者的身份。

我希望我可以在这里应用 multiprocessing 来避开全局解释器锁 (GIL) 和 运行 这三个函数同时作为 Process 对象。目前,在 writeAudio() 完成录制之前,程序不会启动 运行ning detectionBlock()identificationBlock() 功能。

这是当前实现的代码 multiprocessing:

from multiprocessing import Process

# Perform Parallel Processing with the Multiprocessing Module
    def parallelProcessing(self):
        
        # Define Individual Functions as Process() Objects
        rec = Process(target=self.writeAudio()) # Cog 1
        vad = Process(target=self.detectionBlock()) # Cog 2
        si = Process(target=self.identificationBlock()) # Cog 3
        
        cogs = [rec, vad, si] # List of functions
        
        # Run All Three Cogs in Parallel
        rec.start() # Start Cog 1
        
        time.sleep(3) # Wait 3 sec to start speech detection & identification
 
        vad.start() # Start Cog 2
        si.start() # Start Cog 3
        
        for cog in cogs:
            cog.join() # Wait for processes to complete before continuing

我以前从未申请过 multiprocessing,所以我想知道如果采用不同的实施方法,这是否可行。感谢您的帮助。

编辑:

为了更加清晰,我添加了以下函数的简化版本。

# Speech Detection Sequence
    def detectionBlock(self):
        
        # Create VoiceActivityDetectionModel() Class Object
        vad = VoiceActivityDetectionModel()
        
        # Run Speech Detection on Oldest Audio Segments in Directory
        files = self.getListDir() # List of audiofiles
        index = 0 # First file in list
        
        path_1 = os.path.join(self.VAD_audio_path, files[index])
        path_2 = os.path.join(self.VAD_audio_path, files[index+1])
        
        label_1, _, _ = vad.detectFromAudiofile(path_1) # VAD classifier for first segment
        label_2, _, _ = vad.detectFromAudiofile(path_2) # VAD classifier for second segment
        
        if (label_1 == 'speech') and (label_2 == 'speech'):
            self.combineAudio(index) # Generate 3-sec recording for SI if 
                                     # speech is detected in both audiofiles
        else:
            self.deleteAudio() # Remove oldest audio segment
    # Speaker Identification Sequence
    def identificationBlock(self):
        
        # Create EnsemblePredictions() Class Object
        ep = EnsemblePredictions()
        
        # Run Speaker Identification on Oldest Audio Segment in Directory
        files = self.getListDir(audio_type='SI')
        index = 0 # First file in list
        
        if files:
            filepath = os.path.join(self.SI_audio_path, files[index])
        
            speaker, prob_list = ep.predict(filepath, first_session=False) # SI classifier
            time_stamp = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime()) # Time of identification
        
            self.speakerDiarization(speaker=speaker, prob_list=prob_list, time_stamp=time_stamp) # Save results
        
            # Remove 3-Second Audio Segment from Directory
            self.deleteAudio(audio_type='SI')
# Audio Recording Sequence
    def writeAudio(self):
        
        # Instantiate Recording System Variables
        FORMAT = pyaudio.paFloat32 # 32 bits per sample
        CHANNELS = 1 # Mono
        RATE = 16000 # Sampling Rate
        CHUNK = int(self.VAD_audio_length*RATE) # Chunks of bytes to record from microphone
        
        # Initialize Recording
        p = pyaudio.PyAudio() # Create interface to PortAudio

        input('Press ENTER to Begin Recording') # Wait for keypress to record

        if keyboard.is_pressed('Enter'):
            stream = p.open(format=FORMAT,
                            channels=CHANNELS,
                            rate=RATE,
                            frames_per_buffer=CHUNK,
                            input=True)
            
            print()
            print('Hold SPACE to Finish Recording')
            
            while(True):
                # End Process with Manual User Interrupt
                if keyboard.is_pressed('Space'):
                    break 
                
                # Generate Audio Recording
                data = stream.read(CHUNK) # Read 0.5-second segment from audio stream
                data = np.frombuffer(data, dtype=np.float32) # Convert to NumPy array
                
                filename = 'VAD_segment_' + str(self.VAD_audio_count) + '.wav'
                
                sf.write(os.path.join(self.VAD_audio_path, filename), data, RATE)
        
                # Adjust Segment Count
                self.VAD_audio_count = self.VAD_audio_count + 1 # Increment
                
            # Stop & Close Stream
            stream.stop_stream()
            stream.close()
        
            # Terminate PortAudio Interface
            p.terminate()

这是我在评论中提到的示例。我没有真正 运行 它的所有组件,因此将其视为伪代码,但我相信它应该是一个很好的起点。主要改进是 pastream 提供的一些简化,它声称基本上没有 GIL portaudio 迭代。这里的好处是更少的开销和更容易将数据传输到至少检测音频的管道的第一阶段。如果速度变慢,您可能需要一些额外的复杂性来丢帧,但如果我正确理解 pastream 文档,这种结构通常应该可以工作。

import pastream
import multiprocessing as mp
from Queue import Empty

class ExitFlag: pass

def voice_identification(rx_q: mp.Queue):
    while True:
        try:
            received = rx_q.get(1) 
            #if voice_identification is too slow you may want to `get` until 
            #  the queue is empty to drop all but most recent frame. This way
            #  you won't have an infinitely growing queue.
        except Empty:
            pass
        if isinstance(received, ExitFlag):
            break
        
        else:
            print(identify(received)) #identify audio
    print("identifier process exiting")

if __name__ == "__main__":
    tx_q = mp.Queue()
    identifier_p = mp.Process(target=voice_identification, args=(tx_q,))
    identifier_p.start()
    
    samplerate=44100
    stream = pastream.InputStream()
    #3 second chunks every half second
    for chunk in stream.chunks(chunksize=samplerate/2, overlap=(samplerate/2)*5):
        if detect_audio(chunk): #detect audio
            tx_q.put(chunk)
        if exit_key_down(): #however you want to detect this, it's good to ensure smooth shutdown of child
            tx_q.put(ExitFlag())
            identifier_p.join()
            break