如何异步处理来自键盘热键的回调?
How to async handle callback from keyboard hotkeys?
需要注册全局热键。例如,f4
和 f8
。使用 keyboard 库,而第一个回调没有 return,下一个不会调用。
换句话说,日志是这样的
pressed f4
end for f4
pressed f8
end for f8
但我想点赞
pressed f4
pressed f8
end for f4
end for f8
演示代码
# pip install keyboard
from keyboard import add_hotkey, wait
from time import sleep
def on_callback(key):
print('pressed', key)
sleep(5) # emulate long run task
print('end for', key)
add_hotkey("f4", lambda: on_callback("f4"))
add_hotkey("f8", lambda: on_callback("f8"))
wait('esc')
我尝试使用 asyncio
,但没有任何改变
pressed f4
end for f4
pressed f8
end for f8
from keyboard import add_hotkey, wait
import asyncio
async def on_callback(key):
print('pressed', key)
await asyncio.sleep(5) # emulate long run task
print('end for', key)
add_hotkey("f4", lambda: asyncio.run(on_callback("f4")))
add_hotkey("f8", lambda: asyncio.run(on_callback("f8")))
wait('esc')
更新 1
键盘库的开发人员 gave advise 使用 call_later
函数为每个 callback
创建新线程,它的工作方式如我所愿。
但是有没有办法在同一个线程中完成这样的任务(使用asyncio
)?我没有成功。
# example with 'call_later' function
from keyboard import add_hotkey, wait, call_later
from time import sleep
def on_callback(key):
print('pressed', key)
sleep(5) # emulate long run task
print('end for', key)
add_hotkey("f4", lambda: call_later(on_callback, args=("f4",)))
add_hotkey("f8", lambda: call_later(on_callback, args=("f8",)))
wait('esc')
更新 2
现在看起来像下面(github 上的完整代码)。我似乎为了等待 http 请求而创建新线程的操作过于繁重。因此我想在当前线程中使用 asyncio,同时继续处理其他热键。
from googleapiclient.discovery import build
from os import getenv
from settings import get_settings
from loguru import logger
import keyboard
class ScriptService():
def __init__(self):
# ...
self._script = AppsScript(id)
self._hotkeys = values["hotkeys"]
def _register_hotkeys(self):
self._add_hotkey(self._hotkeys["reload"], self._on_reload)
for item in self._hotkeys["goofy"]:
k, f = item["keys"], item["function"]
self._add_hotkey(k, self._on_callback, args=(f, k))
def _add_hotkey(self, keys, callback, args=()):
# lambda bug: https://github.com/boppreh/keyboard/issues/493
keyboard.add_hotkey(keys, lambda: keyboard.call_later(callback, args))
def _on_callback(self, function, keys):
response = self._script.run(function)
class AppsScript():
def __init__(self, id: str):
self._name = getenv("API_SERVICE_NAME")
self._version = getenv("API_VERSION")
self._id = id
def run(self, function: str):
body = {"function": function}
with build(self._name, self._version, credentials=get_credentials()) as service:
# http request
return service.scripts().run(scriptId=self._id, body=body).execute()
不幸的是,none 您正在使用的库实际上是可等待的,因此将它们与 asyncio 一起使用将是一个挑战。您可以从 google 库中提取实际的 http 调用,然后稍后使用与 asyncio 兼容的库实现您的 own 客户端,但这需要做很多工作才能避免开销启动一个新线程。
幸运的是,已经有一种方法可以避免昂贵的旋转线程:使用工作线程池。在这种方法中,我们不是为每个回调立即启动一个新线程,而是将 task 添加到由我们提前启动的线程池服务的任务队列中。这样我们只为启动一个线程付费,之后我们只为将请求序列化到线程付费——这不是什么都没有,但它比启动一个线程要少。
虽然可以让 asyncio 管理线程池,但在这种情况下它根本没有任何优势,因为您的代码中没有任何 else 是可等待的。 (如果你确实想这样做,你会使用 loop.run_in_exeuctor()
, taking care not to re-create the pool as noted in this question。)
这是一些虚拟代码,需要根据您的 类:
from threading import Thread
from queue import Queue
from time import sleep
from random import randint
def process(task):
print(task["name"])
sleep(3 + randint(0, 100) / 100)
print(f"Task {task['name']} done")
class WorkerThread(Thread):
def __init__(self, queue):
super().__init__()
self.queue = queue
print("Started worker thread")
self._open = True
def run(self):
while self._open:
task = self.queue.get()
process(task)
self.queue.task_done()
def close(self):
print("Closing", self)
self._open = False
task_queue = Queue()
THREADS = 6
worker_threads = [WorkerThread(task_queue) for _ in range(THREADS)]
for worker in worker_threads:
worker.setDaemon(True)
worker.start()
print("Sending one task")
task_queue.put({"name": "Task 1"})
sleep(1)
print("Sending a bunch of tasks")
for i in range(1, 15):
task_queue.put({"name": f"Task {i}"})
print("Sleeping for a bit")
sleep(2)
print("Shutting down")
# wrap this in your exit code
task_queue.join() # wait for everything to be done
for worker in worker_threads:
worker.close()
还有其他方法,但我认为在这里明确地写出来更清楚。请注意,我假设您的代码不受 cpu 约束,因此使用线程而不是进程更有意义。
顺便说一句,这看起来 非常 像是 celery
之类的最小实现.
顺便说一句,我不懂俄语,但这看起来是个有趣的项目。
需要注册全局热键。例如,f4
和 f8
。使用 keyboard 库,而第一个回调没有 return,下一个不会调用。
换句话说,日志是这样的
pressed f4
end for f4
pressed f8
end for f8
但我想点赞
pressed f4
pressed f8
end for f4
end for f8
演示代码
# pip install keyboard
from keyboard import add_hotkey, wait
from time import sleep
def on_callback(key):
print('pressed', key)
sleep(5) # emulate long run task
print('end for', key)
add_hotkey("f4", lambda: on_callback("f4"))
add_hotkey("f8", lambda: on_callback("f8"))
wait('esc')
我尝试使用 asyncio
,但没有任何改变
pressed f4
end for f4
pressed f8
end for f8
from keyboard import add_hotkey, wait
import asyncio
async def on_callback(key):
print('pressed', key)
await asyncio.sleep(5) # emulate long run task
print('end for', key)
add_hotkey("f4", lambda: asyncio.run(on_callback("f4")))
add_hotkey("f8", lambda: asyncio.run(on_callback("f8")))
wait('esc')
更新 1
键盘库的开发人员 gave advise 使用 call_later
函数为每个 callback
创建新线程,它的工作方式如我所愿。
但是有没有办法在同一个线程中完成这样的任务(使用asyncio
)?我没有成功。
# example with 'call_later' function
from keyboard import add_hotkey, wait, call_later
from time import sleep
def on_callback(key):
print('pressed', key)
sleep(5) # emulate long run task
print('end for', key)
add_hotkey("f4", lambda: call_later(on_callback, args=("f4",)))
add_hotkey("f8", lambda: call_later(on_callback, args=("f8",)))
wait('esc')
更新 2
现在看起来像下面(github 上的完整代码)。我似乎为了等待 http 请求而创建新线程的操作过于繁重。因此我想在当前线程中使用 asyncio,同时继续处理其他热键。
from googleapiclient.discovery import build
from os import getenv
from settings import get_settings
from loguru import logger
import keyboard
class ScriptService():
def __init__(self):
# ...
self._script = AppsScript(id)
self._hotkeys = values["hotkeys"]
def _register_hotkeys(self):
self._add_hotkey(self._hotkeys["reload"], self._on_reload)
for item in self._hotkeys["goofy"]:
k, f = item["keys"], item["function"]
self._add_hotkey(k, self._on_callback, args=(f, k))
def _add_hotkey(self, keys, callback, args=()):
# lambda bug: https://github.com/boppreh/keyboard/issues/493
keyboard.add_hotkey(keys, lambda: keyboard.call_later(callback, args))
def _on_callback(self, function, keys):
response = self._script.run(function)
class AppsScript():
def __init__(self, id: str):
self._name = getenv("API_SERVICE_NAME")
self._version = getenv("API_VERSION")
self._id = id
def run(self, function: str):
body = {"function": function}
with build(self._name, self._version, credentials=get_credentials()) as service:
# http request
return service.scripts().run(scriptId=self._id, body=body).execute()
不幸的是,none 您正在使用的库实际上是可等待的,因此将它们与 asyncio 一起使用将是一个挑战。您可以从 google 库中提取实际的 http 调用,然后稍后使用与 asyncio 兼容的库实现您的 own 客户端,但这需要做很多工作才能避免开销启动一个新线程。
幸运的是,已经有一种方法可以避免昂贵的旋转线程:使用工作线程池。在这种方法中,我们不是为每个回调立即启动一个新线程,而是将 task 添加到由我们提前启动的线程池服务的任务队列中。这样我们只为启动一个线程付费,之后我们只为将请求序列化到线程付费——这不是什么都没有,但它比启动一个线程要少。
虽然可以让 asyncio 管理线程池,但在这种情况下它根本没有任何优势,因为您的代码中没有任何 else 是可等待的。 (如果你确实想这样做,你会使用 loop.run_in_exeuctor()
, taking care not to re-create the pool as noted in this question。)
这是一些虚拟代码,需要根据您的 类:
from threading import Thread
from queue import Queue
from time import sleep
from random import randint
def process(task):
print(task["name"])
sleep(3 + randint(0, 100) / 100)
print(f"Task {task['name']} done")
class WorkerThread(Thread):
def __init__(self, queue):
super().__init__()
self.queue = queue
print("Started worker thread")
self._open = True
def run(self):
while self._open:
task = self.queue.get()
process(task)
self.queue.task_done()
def close(self):
print("Closing", self)
self._open = False
task_queue = Queue()
THREADS = 6
worker_threads = [WorkerThread(task_queue) for _ in range(THREADS)]
for worker in worker_threads:
worker.setDaemon(True)
worker.start()
print("Sending one task")
task_queue.put({"name": "Task 1"})
sleep(1)
print("Sending a bunch of tasks")
for i in range(1, 15):
task_queue.put({"name": f"Task {i}"})
print("Sleeping for a bit")
sleep(2)
print("Shutting down")
# wrap this in your exit code
task_queue.join() # wait for everything to be done
for worker in worker_threads:
worker.close()
还有其他方法,但我认为在这里明确地写出来更清楚。请注意,我假设您的代码不受 cpu 约束,因此使用线程而不是进程更有意义。
顺便说一句,这看起来 非常 像是 celery
之类的最小实现.
顺便说一句,我不懂俄语,但这看起来是个有趣的项目。