如何在 python 中录制不确定持续时间的音频并允许暂停和恢复功能?

How to record audio in python for undetermined duration AND allow for pause and resume features?

我正在编写一个 Python 应用程序来将音频录制为 WAV 文件,直到用户按下 pausestop .暂停音频后,用户还应该能够恢复 录音。另外:

解决这个问题的好方法是什么?您能否为您的解决方案提供一些代码片段?

使用 python-sounddevice,我可以 stop()start() 流来模仿 'pause' 功能。我可以指定一个 numpy 数组作为记录的输出。但是:

python-sounddevice and sound-file可以支持事先不知道大小的录音。但是:


我尝试sound-device方法

paused = False

def record():
    self.recording = ? # create numpy.ndarray of the correct size 
                       # (not sure the best way to do this without 
                       # knowing the recording duration)
    with sd.InputStream(samplerate=44100, device=mic, channels=1, 
        callback=self.callback):

            while self.paused:
            sd.stop()
        sd.rec(out=recording) # but what happens if 
                              # recording is very long
                              # or numpy array fills up?

def stop_and_save():
    sd.stop()
    scipy.io.wavfile.write("recording.wav", 44100, self.recording)


sound-devicesound-file方法:

with sf.SoundFile(args.filename, mode='x', samplerate=args.samplerate,
                      channels=args.channels, subtype=args.subtype) as file:
        with sd.InputStream(samplerate=args.samplerate, device=args.device,
                            channels=args.channels, callback=callback):
            print('press Ctrl+C to stop the recording')
            while True:
                file.write(q.get())  # but how do you stop writing when 'paused'?

except KeyboardInterrupt:
    print('\nRecording finished: ' + repr(args.filename))
    parser.exit(0)
except Exception as e:
    parser.exit(type(e).__name__ + ': ' + str(e))

我想到了 pause/resume 功能的这个解决方案,它利用 sound-devicesound-file 方法,只要用户单击 [=37=,当前录制就会停止]暂停 并在恢复 时开始新的录制。然后,在用户点击停止后,所有的WAV文件按顺序合并。

Matthias' code 看起来也是一个很好的解决方案,可以更好地利用线程。)


开始录音:

    def record(self):
        try:
            with sf.SoundFile(self.filepath,
                                       mode='x', samplerate=self.SAMPLE_RATE,
                                       channels=self.CHANNELS, subtype=None) as file:
                with sd.InputStream(samplerate=self.SAMPLE_RATE, device=self.mic_id,
                                           channels=self.CHANNELS, callback=self.callback):
                    logger.info(f"New recording started: {self.sound_file.name}")
                    try:
                        while True:
                            file.write(self.mic_queue.get())

                    except RuntimeError as re:
                        logger.debug(f"{re}. If recording was stopped by the user, then this can be ignored")

回调 record():


    def callback(self, indata, frames, time, status):
        """This is called (from a separate thread) for each audio block."""
        if status:
            print(status, file=sys.stderr)
        self.mic_queue.put(indata.copy())

暂停

    def pause_recording(self):
        """Mimics a 'pause' functionality by writing the current sound file changes to disk.
        Upon 'resume' a new recording will be made. Note: close() is not called here, because
        that would kill the recording thread
        """
        self.sound_file.flush()
        logger.info(f"'Paused' (closed) recording: {self.sound_file.name}")

恢复

    def resume_recording(self):
        """
        Mimics 'resuming' by starting a new recording, which will be merged with the others
        when the user selects Stop & Save (or deleted upon Stop & Delete)
        Note: get_full_sound_file_name() outputs a new recording with the same base name as the first, but appends a `_part2` or `_part3` etc. to the suffix to distinguish it from the first and maintain order.
        """
        self.sound_file = self.get_full_sound_file_name()
        self.record()

停止记录:

    def stop_mic_recording(self):
        try:
            self.sound_file.flush()
            self.sound_file.close()
            logger.info(f"Stopped and closed recording: {self.sound_file.name}")

        except RuntimeError as e:
            logger.info(f"Error stopping/saving {self.sound_file.name}. Make sure the file exists and can be modified")
            logger.info(f"RunTimeError: \n{e}")

合并音频(在stop_recording()之后调用):

   def combine_recordings_if_needed(self):
        """
        If recording was paused, combines all sections in alphabetical order into a new audio file
        """
        if self.section_count > 1:   # this is incremented when a recording is paused/resumed
            combined_audio = AudioSegment.empty()
            files_combined = []
            for rec in glob.glob(os.path.join(RECORDING_DIR, "*" + self.FILE_EXT)):
                combined_audio = combined_audio + AudioSegment.from_wav(rec) # this is why alphabetical order is important
                files_combined.append(rec)

            combined_file_name = os.path.join(RECORDING_DIR, self.base_filename + "_combined" + self.FILE_EXT)
            combined_audio.export(out_f=combined_file_name, format="wav")
            logger.info(f"Combined the following recordings into {combined_file_name}:"
                        f"\n {files_combined}")