如何避免每次都写 `await`
How to avoid writing `await` every time
我用aiologger的时候,要写await logger
很多次
对于example,
import asyncio
from aiologger import Logger
async def main():
logger = Logger.with_default_handlers(name='my-logger')
await logger.debug("debug at stdout")
await logger.info("info at stdout")
await logger.warning("warning at stderr")
await logger.error("error at stderr")
await logger.critical("critical at stderr")
await logger.shutdown()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
如果我能写出类似 al
的东西而不是 await logger
就好了。
async def main():
logger = Logger.with_default_handlers(name='my-logger')
async def al(s1: str, s2: str):
await getattr(logger, s1)(s2)
al("debug", "debug at stdout")
al("info", "info at stdout")
# ...etc
免责声明:我并不是说这是个好主意,或者我会喜欢维护以这种风格编写的程序。
此外,我的文本编辑器具有良好的宏功能...
免责声明:我已经写过这个 -- https://coxley.org/logging/#logging-over-the-network
请不要接受这样的日志界面。
您无法避免使用 await
来生成事件循环。你就是做不到。但是您可以利用现有功能在主线程之外执行 I/O 并且仍然使用 asyncio。您只需在该线程中启动第二个事件循环。
例子
我不喜欢在答案中推荐第三方库,但janus.Queue
在这里很重要。使非异步编写器(例如:日志处理程序)和异步读取器(刷新器)之间的桥接更容易。
注意 1:如果您实际上不需要冲洗器中的异步兼容 I/O,请使用 stdlib queue.Queue
,移除异步闭包, 并摆脱第二个循环。
注2:这个例子既有无界队列,又为每条消息做I/O。添加一个时间间隔 and/or 消息阈值,用于刷新生产就绪。根据您的系统,决定您是接受日志突发的内存增长、丢弃日志还是阻塞主代码路径。
import asyncio
import logging
import time
import threading
import typing as t
# pip install --user janus
import janus
LOG = logging.getLogger(__name__)
# Queue must be created within the event loop it will be used from. Start as
# None since this will not be the main thread.
_QUEUE: t.Optional[janus.Queue] = None
class IOHandler(logging.Handler):
def __init__(self, *args, **kwargs):
# This is set from the flusher thread
global _QUEUE
while _QUEUE is None:
time.sleep(0.01)
self.q = _QUEUE.sync_q
super().__init__(*args, **kwargs)
def emit(self, record: logging.LogRecord):
self.q.put(record)
def flusher():
async def run():
global _QUEUE
if _QUEUE is None:
_QUEUE = janus.Queue()
# Upload record instead of print
# Perhaps flush every n-seconds w/ buffer for upper-bound on inserts.
q = _QUEUE.async_q
while True:
record = await q.get()
print("woohoo, doing i/o:", record.msg)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(run())
def foo():
print("foo")
def bar():
print("bar")
async def baz():
await asyncio.sleep(1)
print("baz")
async def main():
threading.Thread(target=flusher, daemon=True).start()
LOG.setLevel(logging.INFO)
LOG.addHandler(IOHandler())
foo()
LOG.info("starting program")
LOG.info("doing some stuff")
LOG.info("mighty cool")
bar()
await baz()
if __name__ == "__main__":
asyncio.run(main())
我用aiologger的时候,要写await logger
很多次
对于example,
import asyncio
from aiologger import Logger
async def main():
logger = Logger.with_default_handlers(name='my-logger')
await logger.debug("debug at stdout")
await logger.info("info at stdout")
await logger.warning("warning at stderr")
await logger.error("error at stderr")
await logger.critical("critical at stderr")
await logger.shutdown()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
如果我能写出类似 al
的东西而不是 await logger
就好了。
async def main():
logger = Logger.with_default_handlers(name='my-logger')
async def al(s1: str, s2: str):
await getattr(logger, s1)(s2)
al("debug", "debug at stdout")
al("info", "info at stdout")
# ...etc
免责声明:我并不是说这是个好主意,或者我会喜欢维护以这种风格编写的程序。
此外,我的文本编辑器具有良好的宏功能...
免责声明:我已经写过这个 -- https://coxley.org/logging/#logging-over-the-network
请不要接受这样的日志界面。
您无法避免使用 await
来生成事件循环。你就是做不到。但是您可以利用现有功能在主线程之外执行 I/O 并且仍然使用 asyncio。您只需在该线程中启动第二个事件循环。
例子
我不喜欢在答案中推荐第三方库,但janus.Queue
在这里很重要。使非异步编写器(例如:日志处理程序)和异步读取器(刷新器)之间的桥接更容易。
注意 1:如果您实际上不需要冲洗器中的异步兼容 I/O,请使用 stdlib queue.Queue
,移除异步闭包, 并摆脱第二个循环。
注2:这个例子既有无界队列,又为每条消息做I/O。添加一个时间间隔 and/or 消息阈值,用于刷新生产就绪。根据您的系统,决定您是接受日志突发的内存增长、丢弃日志还是阻塞主代码路径。
import asyncio
import logging
import time
import threading
import typing as t
# pip install --user janus
import janus
LOG = logging.getLogger(__name__)
# Queue must be created within the event loop it will be used from. Start as
# None since this will not be the main thread.
_QUEUE: t.Optional[janus.Queue] = None
class IOHandler(logging.Handler):
def __init__(self, *args, **kwargs):
# This is set from the flusher thread
global _QUEUE
while _QUEUE is None:
time.sleep(0.01)
self.q = _QUEUE.sync_q
super().__init__(*args, **kwargs)
def emit(self, record: logging.LogRecord):
self.q.put(record)
def flusher():
async def run():
global _QUEUE
if _QUEUE is None:
_QUEUE = janus.Queue()
# Upload record instead of print
# Perhaps flush every n-seconds w/ buffer for upper-bound on inserts.
q = _QUEUE.async_q
while True:
record = await q.get()
print("woohoo, doing i/o:", record.msg)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(run())
def foo():
print("foo")
def bar():
print("bar")
async def baz():
await asyncio.sleep(1)
print("baz")
async def main():
threading.Thread(target=flusher, daemon=True).start()
LOG.setLevel(logging.INFO)
LOG.addHandler(IOHandler())
foo()
LOG.info("starting program")
LOG.info("doing some stuff")
LOG.info("mighty cool")
bar()
await baz()
if __name__ == "__main__":
asyncio.run(main())