如何在 python 中录制不确定持续时间的音频并允许暂停和恢复功能?
How to record audio in python for undetermined duration AND allow for pause and resume features?
我正在编写一个 Python 应用程序来将音频录制为 WAV 文件,直到用户按下 pause 或 stop .暂停音频后,用户还应该能够恢复 录音。另外:
- 应用程序无法预先知道录制多长时间
- 应用程序应避免 运行 内存不足(因为录音可能会很长)。例如,它可以实时写入 WAV 文件,以防止将不断增长的录音存储在内存中。
解决这个问题的好方法是什么?您能否为您的解决方案提供一些代码片段?
使用 python-sounddevice,我可以 stop()
和 start()
流来模仿 'pause' 功能。我可以指定一个 numpy 数组作为记录的输出。但是:
- 我不知道阵列有多大(因为我不知道记录时长)
- 数组满了怎么办?
python-sounddevice and sound-file可以支持事先不知道大小的录音。但是:
- 如何合并 'pause' 和 'resume' 功能?声音文件只有
read
和 write
方法。
- 有没有比使用
KeyBoardInterrupt
更好的停止流的方法?
- 我可以在用户点击 'stop' 后的每个 'pause' 和 combine the WAV files 之后创建不同的录音吗?
- 我尝试使用
Threading.Event()
来阻止记录线程以模拟暂停功能,但记录一直写入文件
我尝试sound-device
方法
paused = False
def record():
self.recording = ? # create numpy.ndarray of the correct size
# (not sure the best way to do this without
# knowing the recording duration)
with sd.InputStream(samplerate=44100, device=mic, channels=1,
callback=self.callback):
while self.paused:
sd.stop()
sd.rec(out=recording) # but what happens if
# recording is very long
# or numpy array fills up?
def stop_and_save():
sd.stop()
scipy.io.wavfile.write("recording.wav", 44100, self.recording)
sound-device
和sound-file
方法:
with sf.SoundFile(args.filename, mode='x', samplerate=args.samplerate,
channels=args.channels, subtype=args.subtype) as file:
with sd.InputStream(samplerate=args.samplerate, device=args.device,
channels=args.channels, callback=callback):
print('press Ctrl+C to stop the recording')
while True:
file.write(q.get()) # but how do you stop writing when 'paused'?
except KeyboardInterrupt:
print('\nRecording finished: ' + repr(args.filename))
parser.exit(0)
except Exception as e:
parser.exit(type(e).__name__ + ': ' + str(e))
我想到了 pause/resume 功能的这个解决方案,它利用 sound-device
和 sound-file
方法,只要用户单击 [=37=,当前录制就会停止]暂停 并在恢复 时开始新的录制。然后,在用户点击停止后,所有的WAV文件按顺序合并。
(Matthias' code 看起来也是一个很好的解决方案,可以更好地利用线程。)
要开始录音:
def record(self):
try:
with sf.SoundFile(self.filepath,
mode='x', samplerate=self.SAMPLE_RATE,
channels=self.CHANNELS, subtype=None) as file:
with sd.InputStream(samplerate=self.SAMPLE_RATE, device=self.mic_id,
channels=self.CHANNELS, callback=self.callback):
logger.info(f"New recording started: {self.sound_file.name}")
try:
while True:
file.write(self.mic_queue.get())
except RuntimeError as re:
logger.debug(f"{re}. If recording was stopped by the user, then this can be ignored")
回调 record()
:
def callback(self, indata, frames, time, status):
"""This is called (from a separate thread) for each audio block."""
if status:
print(status, file=sys.stderr)
self.mic_queue.put(indata.copy())
至暂停:
def pause_recording(self):
"""Mimics a 'pause' functionality by writing the current sound file changes to disk.
Upon 'resume' a new recording will be made. Note: close() is not called here, because
that would kill the recording thread
"""
self.sound_file.flush()
logger.info(f"'Paused' (closed) recording: {self.sound_file.name}")
到恢复:
def resume_recording(self):
"""
Mimics 'resuming' by starting a new recording, which will be merged with the others
when the user selects Stop & Save (or deleted upon Stop & Delete)
Note: get_full_sound_file_name() outputs a new recording with the same base name as the first, but appends a `_part2` or `_part3` etc. to the suffix to distinguish it from the first and maintain order.
"""
self.sound_file = self.get_full_sound_file_name()
self.record()
要停止记录:
def stop_mic_recording(self):
try:
self.sound_file.flush()
self.sound_file.close()
logger.info(f"Stopped and closed recording: {self.sound_file.name}")
except RuntimeError as e:
logger.info(f"Error stopping/saving {self.sound_file.name}. Make sure the file exists and can be modified")
logger.info(f"RunTimeError: \n{e}")
合并音频(在stop_recording()
之后调用):
def combine_recordings_if_needed(self):
"""
If recording was paused, combines all sections in alphabetical order into a new audio file
"""
if self.section_count > 1: # this is incremented when a recording is paused/resumed
combined_audio = AudioSegment.empty()
files_combined = []
for rec in glob.glob(os.path.join(RECORDING_DIR, "*" + self.FILE_EXT)):
combined_audio = combined_audio + AudioSegment.from_wav(rec) # this is why alphabetical order is important
files_combined.append(rec)
combined_file_name = os.path.join(RECORDING_DIR, self.base_filename + "_combined" + self.FILE_EXT)
combined_audio.export(out_f=combined_file_name, format="wav")
logger.info(f"Combined the following recordings into {combined_file_name}:"
f"\n {files_combined}")
我正在编写一个 Python 应用程序来将音频录制为 WAV 文件,直到用户按下 pause 或 stop .暂停音频后,用户还应该能够恢复 录音。另外:
- 应用程序无法预先知道录制多长时间
- 应用程序应避免 运行 内存不足(因为录音可能会很长)。例如,它可以实时写入 WAV 文件,以防止将不断增长的录音存储在内存中。
解决这个问题的好方法是什么?您能否为您的解决方案提供一些代码片段?
使用 python-sounddevice,我可以 stop()
和 start()
流来模仿 'pause' 功能。我可以指定一个 numpy 数组作为记录的输出。但是:
- 我不知道阵列有多大(因为我不知道记录时长)
- 数组满了怎么办?
python-sounddevice and sound-file可以支持事先不知道大小的录音。但是:
- 如何合并 'pause' 和 'resume' 功能?声音文件只有
read
和write
方法。 - 有没有比使用
KeyBoardInterrupt
更好的停止流的方法? - 我可以在用户点击 'stop' 后的每个 'pause' 和 combine the WAV files 之后创建不同的录音吗?
- 我尝试使用
Threading.Event()
来阻止记录线程以模拟暂停功能,但记录一直写入文件
我尝试sound-device
方法
paused = False
def record():
self.recording = ? # create numpy.ndarray of the correct size
# (not sure the best way to do this without
# knowing the recording duration)
with sd.InputStream(samplerate=44100, device=mic, channels=1,
callback=self.callback):
while self.paused:
sd.stop()
sd.rec(out=recording) # but what happens if
# recording is very long
# or numpy array fills up?
def stop_and_save():
sd.stop()
scipy.io.wavfile.write("recording.wav", 44100, self.recording)
sound-device
和sound-file
方法:
with sf.SoundFile(args.filename, mode='x', samplerate=args.samplerate,
channels=args.channels, subtype=args.subtype) as file:
with sd.InputStream(samplerate=args.samplerate, device=args.device,
channels=args.channels, callback=callback):
print('press Ctrl+C to stop the recording')
while True:
file.write(q.get()) # but how do you stop writing when 'paused'?
except KeyboardInterrupt:
print('\nRecording finished: ' + repr(args.filename))
parser.exit(0)
except Exception as e:
parser.exit(type(e).__name__ + ': ' + str(e))
我想到了 pause/resume 功能的这个解决方案,它利用 sound-device
和 sound-file
方法,只要用户单击 [=37=,当前录制就会停止]暂停 并在恢复 时开始新的录制。然后,在用户点击停止后,所有的WAV文件按顺序合并。
(Matthias' code 看起来也是一个很好的解决方案,可以更好地利用线程。)
要开始录音:
def record(self):
try:
with sf.SoundFile(self.filepath,
mode='x', samplerate=self.SAMPLE_RATE,
channels=self.CHANNELS, subtype=None) as file:
with sd.InputStream(samplerate=self.SAMPLE_RATE, device=self.mic_id,
channels=self.CHANNELS, callback=self.callback):
logger.info(f"New recording started: {self.sound_file.name}")
try:
while True:
file.write(self.mic_queue.get())
except RuntimeError as re:
logger.debug(f"{re}. If recording was stopped by the user, then this can be ignored")
回调 record()
:
def callback(self, indata, frames, time, status):
"""This is called (from a separate thread) for each audio block."""
if status:
print(status, file=sys.stderr)
self.mic_queue.put(indata.copy())
至暂停:
def pause_recording(self):
"""Mimics a 'pause' functionality by writing the current sound file changes to disk.
Upon 'resume' a new recording will be made. Note: close() is not called here, because
that would kill the recording thread
"""
self.sound_file.flush()
logger.info(f"'Paused' (closed) recording: {self.sound_file.name}")
到恢复:
def resume_recording(self):
"""
Mimics 'resuming' by starting a new recording, which will be merged with the others
when the user selects Stop & Save (or deleted upon Stop & Delete)
Note: get_full_sound_file_name() outputs a new recording with the same base name as the first, but appends a `_part2` or `_part3` etc. to the suffix to distinguish it from the first and maintain order.
"""
self.sound_file = self.get_full_sound_file_name()
self.record()
要停止记录:
def stop_mic_recording(self):
try:
self.sound_file.flush()
self.sound_file.close()
logger.info(f"Stopped and closed recording: {self.sound_file.name}")
except RuntimeError as e:
logger.info(f"Error stopping/saving {self.sound_file.name}. Make sure the file exists and can be modified")
logger.info(f"RunTimeError: \n{e}")
合并音频(在stop_recording()
之后调用):
def combine_recordings_if_needed(self):
"""
If recording was paused, combines all sections in alphabetical order into a new audio file
"""
if self.section_count > 1: # this is incremented when a recording is paused/resumed
combined_audio = AudioSegment.empty()
files_combined = []
for rec in glob.glob(os.path.join(RECORDING_DIR, "*" + self.FILE_EXT)):
combined_audio = combined_audio + AudioSegment.from_wav(rec) # this is why alphabetical order is important
files_combined.append(rec)
combined_file_name = os.path.join(RECORDING_DIR, self.base_filename + "_combined" + self.FILE_EXT)
combined_audio.export(out_f=combined_file_name, format="wav")
logger.info(f"Combined the following recordings into {combined_file_name}:"
f"\n {files_combined}")