如何使用 pynput 和 winsound "continue" 来自回调函数的 for 循环?

How to "continue" a for loop from a callback function with pynput and winsound?

我需要让我的程序播放目录中的 wav 文件,并在用户按 F9 时跳到下一个文件。我发现我需要将 winsound 设置为异步模式以使其在上一个文件仍在播放时跳到下一个文件,但同时我需要 运行 一个后台进程,如 time.sleep(5) 否则程序将立即退出。

我有一个通过 wav 文件到 运行 的 for 循环,但我不能直接从 on_press() 函数调用继续,所以我尝试使用条件全局变量来睡眠或不睡觉。因此,我可以使用 PlaySound(None, winsound.Purge) 停止当前播放的 wav 文件,但我无法继续 for 循环的下一次迭代或获取我的 time.sleep(5)有条件地 运行 - 它总是 运行 并且我总是必须等待那 5 秒才能开始下一次迭代。我使用回调的方式有问题还是我应该使用不同的输入库或其他东西?

from pynput import keyboard
import winsound
import os

mySkip = 0
currentIdx = 0

def clamp(n, minn, maxn):
    return max(min(maxn, n), minn)

def on_press(key):
    global mySkip
    global currentIdx
    if key == keyboard.Key.f9:
        winsound.Beep(5500, 200)
        mySkip = 1

        currentIdx = clamp(currentIdx + 1, 0, 3)
        winsound.PlaySound(None, winsound.SND_PURGE)

listener = keyboard.Listener(on_press=on_press)
listener.start()

WAVDir = "C:/Users/me/PycharmProjects/projectName/WAV"
wavList = os.listdir(WAVDir) #have 4 files in here, "test0.wav" to "test4.wav"

for idx, i in enumerate(wavList):
    soundPath = WAVDir + "/test" + str(currentIdx) + ".wav"

    print(soundPath)
    winsound.PlaySound(soundPath, winsound.SND_ASYNC)

    if mySkip == 0:
        time.sleep(5)

    currentIdx += 1
    mySkip = 0

这是我使用轮询想出的解决方案 - 这从来都不是我最喜欢做的事情,但是 winsound 在线程方面是出了名的不合作,我相信线程是解决问题的方法:

def main():

    from pathlib import Path
    from threading import Event, Thread
    from pynput import keyboard
    from collections import namedtuple
    import wave

    skip_sound = Event()

    def play_sound_thread(path, max_duration_seconds):
        from winsound import PlaySound, SND_ASYNC
        PlaySound(path, SND_ASYNC)
        elapsed_seconds = 0
        poll_interval = 1 / 8
        while not skip_sound.is_set() and elapsed_seconds < max_duration_seconds:
            skip_sound.wait(poll_interval)
            elapsed_seconds += poll_interval

    def on_press(key):
        if key is keyboard.Key.f9:
            nonlocal skip_sound
            skip_sound.set()

    Wave = namedtuple("Wave", ["path", "file_name", "duration_seconds"])

    waves = []

    wav_dir = Path("./wavs")

    for wav_path in wav_dir.glob("*.wav"):
        wav_path_as_str = str(wav_path)
        file_name = wav_path.name
        with wave.open(wav_path_as_str, "rb") as wave_read:
            sample_rate = wave_read.getframerate()
            number_of_samples = wave_read.getnframes()
        duration_seconds = number_of_samples / sample_rate

        wav = Wave(
            path=wav_path_as_str,
            file_name=file_name,
            duration_seconds=duration_seconds
        )

        waves.append(wav)

    listener = keyboard.Listener(on_press=on_press)
    listener.start()

    for wav in waves:
        print(f"Playing {wav.file_name}")

        thread = Thread(
            target=play_sound_thread,
            args=(wav.path, wav.duration_seconds),
            daemon=True
        )
        thread.start()
        thread.join()

        if skip_sound.is_set():
            print(f"Skipping {wav.file_name}")
            skip_sound.clear()

    return 0


if __name__ == "__main__":
    import sys
    sys.exit(main())

工作原理:

每次需要播放声音时,都会生成一个新线程,并在该线程中完成 winsound.PlaySound 内容。 PlaySound 接收到 SND_ASYNC 参数,所以声音是异步播放的,这个操作本身不会阻塞线程中的执行。然而,线程的执行随后被一个循环阻塞,该循环在经过一定时间(声音有足够的时间完成播放)或设置 skip_sound 事件时终止。

on_press 回调在按下 f9 键时设置 skip_sound 事件。

我创建了一个名为 Wavenamedtuple,它的作用就像一个简单的数据桶。我们需要的任何单个波形文件的所有相关信息都包含在其中一个文件中。

主 for 循环(第二个,第一个只是编译 Wave 个对象的列表),我们遍历 Wave 个对象。然后我们为当前 Wave 对象生成一个新线程。然后我们启动线程开始播放声音,并且 thread.join 它让主线程等待这个子线程完成 - 基本上阻塞直到子线程完成。当 on_press 回调被触发时,它会提前终止子线程,因此 join 操作将不再阻塞,允许我们在必要时重置我们的 skip_sound 事件并且继续下一次迭代。

我喜欢这个解决方案的地方:

  • 有效

我不喜欢这个解决方案的地方:

  • 就像我说的,play_sound_thread 函数播放声音 异步,然后轮询直到 skip_sound 事件 已触发,或经过了一定时间。投票是粗暴的。我只是武断地决定以 1/8 秒的间隔进行轮询。
  • 为了确定轮询的最长持续时间,您 需要打开wave文件,解析,分 按采样率采样。基本打开一样感觉很脏 文件两次 - 一次在 wave.open 中,另一次在 winsound.PlaySound.