每次用户按键时如何录制音频?

How to record audio each time user presses a key?

当且仅当用户按下 ctrl 键并在用户按下 ctrl+c 键时关闭录音循环时,如何不确定地录制用户的音频? 到目前为止基于在一些在线示例中构建此脚本:

from pynput import keyboard
import time, os
import pyaudio
import wave
import sched
import sys
from playsound import playsound


CHUNK = 8192
FORMAT = pyaudio.paInt16
CHANNELS = 2
RATE = 44100
WAVE_OUTPUT_FILENAME = "mic.wav"

p = pyaudio.PyAudio()
frames = []

def callback(in_data, frame_count, time_info, status):
    frames.append(in_data)
    return (in_data, pyaudio.paContinue)

class MyListener(keyboard.Listener):
    def __init__(self):
        super(MyListener, self).__init__(self.on_press, self.on_release)
        self.key_pressed = None
        self.wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
        self.wf.setnchannels(CHANNELS)
        self.wf.setsampwidth(p.get_sample_size(FORMAT))
        self.wf.setframerate(RATE)
    def on_press(self, key):

        try:
            if key.ctrl:
                self.key_pressed = True
            return True
        except AttributeError:
            sys.exit()





    def on_release(self, key):

        if key.ctrl:
            self.key_pressed = False
        return True


listener = MyListener()


listener.start()
started = False
stream = None

def recorder():
    global started, p, stream, frames

    while True:

        try:
            if listener.key_pressed and not started:
                # Start the recording
                try:
                    stream = p.open(format=FORMAT,
                                    channels=CHANNELS,
                                    rate=RATE,
                                    input=True,
                                    frames_per_buffer=CHUNK,
                                    stream_callback = callback)
                    print("Stream active:", stream.is_active())
                    started = True
                    print("start Stream")
                except KeyboardInterrupt:
                    print('\nRecording finished: ' + repr(WAVE_OUTPUT_FILENAME))
                    quit()

            elif not listener.key_pressed and started:

                print("Stop recording")
                listener.wf.writeframes(b''.join(frames))
                listener.wf.close()
                print("You should have a wav file in the current directory")
                print('-> Playing recorded sound...')
                playsound(str(os.getcwd())+'/mic.wav')
                os.system('python "/Users/user/rec.py"')

        except KeyboardInterrupt:
            print('\nRecording finished: ' + repr(WAVE_OUTPUT_FILENAME))
            quit()
        except AttributeError:
            quit()





print ("-> Press and hold the 'ctrl' key to record your audio")
print ("-> Release the 'ctrl' key to end recording")


recorder()

问题是效率真的很低,比如电脑开始发热。我发现让程序保持 运行 并录制不同音频样本的唯一方法是:os.system('python "/Users/user/rec.py"')。为了完成程序,我尝试使用以下方法捕获异常:

except AttributeError:
       sys.exit()

或用户输入:

if key.ctrl_c:
   sys.exit()

基于 pyinput docs,我试图有效地使用监听器。但是,对于这种特定情况,建议使用这些侦听器的方式是什么?

试试这个:

import sys
try:
   #Your code here

except KeyboardInterrupt:
    sys.exit()

我真的不建议使用 ctrl + c 做任何事情而不是打扰。另外,我不建议一直按某个按钮来继续录制。我的建议是一键录音,一键停止;这就是为什么在下面的代码中我使用 sstart 记录和 qquit。它是完全可配置的,您可以根据自己的喜好进行更改:

安装依赖项

pip install pyaudio numpy scipy 
sudo pip install keyboard

录音机

一个简单的 class 从你的麦克风录音:

import pyaudio
import keyboard
import numpy as np
from scipy.io import wavfile


class Recorder():
    def __init__(self, filename):
        self.audio_format = pyaudio.paInt16
        self.channels = 1
        self.sample_rate = 44100
        self.chunk = int(0.03*self.sample_rate)
        self.filename = filename
        self.START_KEY = 's'
        self.STOP_KEY = 'q'


    def record(self):
        recorded_data = []
        p = pyaudio.PyAudio()

        stream = p.open(format=self.audio_format, channels=self.channels,
                        rate=self.sample_rate, input=True,
                        frames_per_buffer=self.chunk)
        while(True):
            data = stream.read(self.chunk)
            recorded_data.append(data)
            if keyboard.is_pressed(self.STOP_KEY):
                print("Stop recording")
                # stop and close the stream
                stream.stop_stream()
                stream.close()
                p.terminate()
                #convert recorded data to numpy array
                recorded_data = [np.frombuffer(frame, dtype=np.int16) for frame in recorded_data]
                wav = np.concatenate(recorded_data, axis=0)
                wavfile.write(self.filename, self.sample_rate, wav)
                print("You should have a wav file in the current directory")
                break


    def listen(self):
        print(f"Press `{self.START_KEY}` to start and `{self.STOP_KEY}` to quit!")
        while True:
            if keyboard.is_pressed(self.START_KEY):
                self.record()
                break

要使用此 class,只需像这样调用 listen() 方法:

recorder = Recorded("mic.wav") #name of output file
recorder.listen()

至于您的计算机似乎非常努力地工作的主要问题,那是因为您使用了一个 while 循环来不断检查记录密钥何时被释放。在这个循环中,计算机将尽可能快地循环,从不休息。

更好的解决方案是使用事件驱动编程,让 OS 定期通知您事件,并在事件发生时检查您是否想做任何事情。这听起来可能很复杂,但幸运的是 pynput 为您完成了大部分艰苦的工作。

如果您跟踪记录或回放的状态,则在下次控制键按下事件发生时开始新记录也相当简单,而无需递归调用整个新过程的“技巧”对于每个新录音。键盘侦听器内的事件循环将继续进行,直到回调函数之一 returns False 或引发 self.stopException().

我创建了一个简单的 listener class 类似于您最初的尝试,它调用记录器或播放器实例(我稍后会谈到)来启动和停止。我也必须同意 Anwarvic 的观点,<ctl-c> 应该被保留作为停止脚本的紧急方式,所以我将停止命令更改为字母 q.

class listener(keyboard.Listener):
    def __init__(self, recorder, player):
        super().__init__(on_press = self.on_press, on_release = self.on_release)
        self.recorder = recorder
        self.player = player
    
    def on_press(self, key):
        if key is None: #unknown event
            pass
        elif isinstance(key, keyboard.Key): #special key event
            if key.ctrl and self.player.playing == 0:
                self.recorder.start()
        elif isinstance(key, keyboard.KeyCode): #alphanumeric key event
            if key.char == 'q': #press q to quit
                if self.recorder.recording:
                    self.recorder.stop()
                return False #this is how you stop the listener thread
            if key.char == 'p' and not self.recorder.recording:
                self.player.start()
                
    def on_release(self, key):
        if key is None: #unknown event
            pass
        elif isinstance(key, keyboard.Key): #special key event
            if key.ctrl:
                self.recorder.stop()
        elif isinstance(key, keyboard.KeyCode): #alphanumeric key event
            pass

if __name__ == '__main__':
    r = recorder("mic.wav")
    p = player("mic.wav")
    l = listener(r, p)
    print('hold ctrl to record, press p to playback, press q to quit')
    l.start() #keyboard listener is a thread so we start it here
    l.join() #wait for the tread to terminate so the program doesn't instantly close

有了这个结构,我们就需要一个记录器class,它具有启动和停止功能,不会阻止(异步)侦听器线程继续接收关键事件。 PyAudio 的文档为异步输出提供了一个很好的示例,因此我只是将其应用于输入。稍微重新安排一下,还有一个标志让我们的听众知道我们稍后录音,我们有一个录音机 class:

class recorder:
    def __init__(self, 
                 wavfile, 
                 chunksize=8192, 
                 dataformat=pyaudio.paInt16, 
                 channels=2, 
                 rate=44100):
        self.filename = wavfile
        self.chunksize = chunksize
        self.dataformat = dataformat
        self.channels = channels
        self.rate = rate
        self.recording = False
        self.pa = pyaudio.PyAudio()

    def start(self):
        #we call start and stop from the keyboard listener, so we use the asynchronous 
        # version of pyaudio streaming. The keyboard listener must regain control to 
        # begin listening again for the key release.
        if not self.recording:
            self.wf = wave.open(self.filename, 'wb')
            self.wf.setnchannels(self.channels)
            self.wf.setsampwidth(self.pa.get_sample_size(self.dataformat))
            self.wf.setframerate(self.rate)
            
            def callback(in_data, frame_count, time_info, status):
                #file write should be able to keep up with audio data stream (about 1378 Kbps)
                self.wf.writeframes(in_data) 
                return (in_data, pyaudio.paContinue)
            
            self.stream = self.pa.open(format = self.dataformat,
                                       channels = self.channels,
                                       rate = self.rate,
                                       input = True,
                                       stream_callback = callback)
            self.stream.start_stream()
            self.recording = True
            print('recording started')
    
    def stop(self):
        if self.recording:         
            self.stream.stop_stream()
            self.stream.close()
            self.wf.close()
            
            self.recording = False
            print('recording finished')

最后,我们创建了一个音频播放器,当您按下 p 时可以播放音频。我将 PyAudio 示例放入一个线程中,该线程在您每次按下按钮时都会创建,以便可以创建多个彼此重叠的播放器。我们还会跟踪有多少玩家在玩,所以我们不会在文件已被玩家使用时尝试录制。 (我还在顶部包含了我的导入)

from threading import Thread, Lock
from pynput import keyboard
import pyaudio
import wave

class player:
    def __init__(self, wavfile):
        self.wavfile = wavfile
        self.playing = 0 #flag so we don't try to record while the wav file is in use
        self.lock = Lock() #muutex so incrementing and decrementing self.playing is safe
    
    #contents of the run function are processed in another thread so we use the blocking
    # version of pyaudio play file example: http://people.csail.mit.edu/hubert/pyaudio/#play-wave-example
    def run(self):
        with self.lock:
            self.playing += 1
        with wave.open(self.wavfile, 'rb') as wf:
            p = pyaudio.PyAudio()
            stream = p.open(format=p.get_format_from_width(wf.getsampwidth()),
                            channels=wf.getnchannels(),
                            rate=wf.getframerate(),
                            output=True)
            data = wf.readframes(8192)
            while data != b'':
                stream.write(data)
                data = wf.readframes(8192)

            stream.stop_stream()
            stream.close()
            p.terminate()
            wf.close()
        with self.lock:
            self.playing -= 1
        
    def start(self):
        Thread(target=self.run).start()

我不能保证它完全没有错误,但如果您对它的工作原理/如何让它工作有任何疑问,请随时发表评论。