如何异步处理来自键盘热键的回调?

How to async handle callback from keyboard hotkeys?

需要注册全局热键。例如,f4f8。使用 keyboard 库,而第一个回调没有 return,下一个不会调用。

换句话说,日志是这样的

pressed f4
end for f4
pressed f8
end for f8

但我想点赞

pressed f4
pressed f8
end for f4
end for f8

演示代码

# pip install keyboard
from keyboard import add_hotkey, wait
from time import sleep

def on_callback(key):
    print('pressed', key)
    sleep(5) # emulate long run task
    print('end for', key)

add_hotkey("f4", lambda: on_callback("f4"))
add_hotkey("f8", lambda: on_callback("f8"))

wait('esc')

我尝试使用 asyncio,但没有任何改变

pressed f4
end for f4
pressed f8
end for f8
from keyboard import add_hotkey, wait
import asyncio

async def on_callback(key):
    print('pressed', key)
    await asyncio.sleep(5) # emulate long run task
    print('end for', key)

add_hotkey("f4", lambda: asyncio.run(on_callback("f4")))
add_hotkey("f8", lambda: asyncio.run(on_callback("f8")))

wait('esc')

更新 1

键盘库的开发人员 gave advise 使用 call_later 函数为每个 callback 创建新线程,它的工作方式如我所愿。

但是有没有办法在同一个线程中完成这样的任务(使用asyncio)?我没有成功。

# example with 'call_later' function
from keyboard import add_hotkey, wait, call_later
from time import sleep

def on_callback(key):
    print('pressed', key)
    sleep(5) # emulate long run task
    print('end for', key)

add_hotkey("f4", lambda: call_later(on_callback, args=("f4",)))
add_hotkey("f8", lambda: call_later(on_callback, args=("f8",)))

wait('esc')

更新 2

现在看起来像下面(github 上的完整代码)。我似乎为了等待 http 请求而创建新线程的操作过于繁重。因此我想在当前线程中使用 asyncio,同时继续处理其他热键。

from googleapiclient.discovery import build
from os import getenv
from settings import get_settings
from loguru import logger
import keyboard

class ScriptService():

    def __init__(self):
        # ...
        self._script = AppsScript(id)
        self._hotkeys = values["hotkeys"]

    def _register_hotkeys(self):
        self._add_hotkey(self._hotkeys["reload"], self._on_reload)
        for item in self._hotkeys["goofy"]:
            k, f = item["keys"], item["function"]
            self._add_hotkey(k, self._on_callback, args=(f, k))

    def _add_hotkey(self, keys, callback, args=()):
        # lambda bug: https://github.com/boppreh/keyboard/issues/493
        keyboard.add_hotkey(keys, lambda: keyboard.call_later(callback, args))

    def _on_callback(self, function, keys):
        response = self._script.run(function)

class AppsScript():

    def __init__(self, id: str):
        self._name = getenv("API_SERVICE_NAME")
        self._version = getenv("API_VERSION")
        self._id = id

    def run(self, function: str):
        body = {"function": function}
        with build(self._name, self._version, credentials=get_credentials()) as service:
            # http request
            return service.scripts().run(scriptId=self._id, body=body).execute()

不幸的是,none 您正在使用的库实际上是可等待的,因此将它们与 asyncio 一起使用将是一个挑战。您可以从 google 库中提取实际的 http 调用,然后稍后使用与 asyncio 兼容的库实现您的 own 客户端,但这需要做很多工作才能避免开销启动一个新线程。

幸运的是,已经有一种方法可以避免昂贵的旋转线程:使用工作线程池。在这种方法中,我们不是为每个回调立即启动一个新线程,而是将 task 添加到由我们提前启动的线程池服务的任务队列中。这样我们只为启动一个线程付费,之后我们只为将请求序列化到线程付费——这不是什么都没有,但它比启动一个线程要少。

虽然可以让 asyncio 管理线程池,但在这种情况下它根本没有任何优势,因为您的代码中没有任何 else 是可等待的。 (如果你确实想这样做,你会使用 loop.run_in_exeuctor(), taking care not to re-create the pool as noted in this question。)

这是一些虚拟代码,需要根据您的 类:

from threading import Thread
from queue import Queue
from time import sleep
from random import randint


def process(task):
    print(task["name"])
    sleep(3 + randint(0, 100) / 100)
    print(f"Task {task['name']} done")


class WorkerThread(Thread):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue
        print("Started worker thread")
        self._open = True

    def run(self):
        while self._open:
            task = self.queue.get()
            process(task)
            self.queue.task_done()

    def close(self):
        print("Closing", self)
        self._open = False


task_queue = Queue()
THREADS = 6
worker_threads = [WorkerThread(task_queue) for _ in range(THREADS)]
for worker in worker_threads:
    worker.setDaemon(True)
    worker.start()


print("Sending one task")
task_queue.put({"name": "Task 1"})
sleep(1)

print("Sending a bunch of tasks")
for i in range(1, 15):
    task_queue.put({"name": f"Task {i}"})
print("Sleeping for a bit")
sleep(2)

print("Shutting down")

# wrap this in your exit code
task_queue.join()  # wait for everything to be done
for worker in worker_threads:
    worker.close()

还有其他方法,但我认为在这里明确地写出来更清楚。请注意,我假设您的代码不受 cpu 约束,因此使用线程而不是进程更有意义。

顺便说一句,这看起来 非常 像是 celery 之类的最小实现.

顺便说一句,我不懂俄语,但这看起来是个有趣的项目。