如何避免每次都写 `await`

How to avoid writing `await` every time

我用aiologger的时候,要写await logger很多次

对于example,

import asyncio
from aiologger import Logger


async def main():
    logger = Logger.with_default_handlers(name='my-logger')

    await logger.debug("debug at stdout")
    await logger.info("info at stdout")

    await logger.warning("warning at stderr")
    await logger.error("error at stderr")
    await logger.critical("critical at stderr")

    await logger.shutdown()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

如果我能写出类似 al 的东西而不是 await logger 就好了。

async def main():
    logger = Logger.with_default_handlers(name='my-logger')

    async def al(s1: str, s2: str):
        await getattr(logger, s1)(s2)

    al("debug", "debug at stdout")
    al("info", "info at stdout")
    # ...etc

免责声明:我并不是说这是个好主意,或者我会喜欢维护以这种风格编写的程序。

此外,我的文本编辑器具有良好的宏功能...

免责声明:我已经写过这个 -- https://coxley.org/logging/#logging-over-the-network

请不要接受这样的日志界面。

您无法避免使用 await 来生成事件循环。你就是做不到。但是您可以利用现有功能在主线程之外执行 I/O 并且仍然使用 asyncio。您只需在该线程中启动第二个事件循环。

例子

我不喜欢在答案中推荐第三方库,但janus.Queue在这里很重要。使非异步编写器(例如:日志处理程序)和异步读取器(刷新器)之间的桥接更容易。

注意 1:如果您实际上不需要冲洗器中的异步兼容 I/O,请使用 stdlib queue.Queue,移除异步闭包, 并摆脱第二个循环。

注2:这个例子既有无界队列,又为每条消息做I/O。添加一个时间间隔 and/or 消息阈值,用于刷新生产就绪。根据您的系统,决定您是接受日志突发的内存增长、丢弃日志还是阻塞主代码路径。

import asyncio
import logging
import time
import threading
import typing as t

# pip install --user janus
import janus

LOG = logging.getLogger(__name__)

# Queue must be created within the event loop it will be used from. Start as
# None since this will not be the main thread.
_QUEUE: t.Optional[janus.Queue] = None

class IOHandler(logging.Handler):
    def __init__(self, *args, **kwargs):
        # This is set from the flusher thread
        global _QUEUE
        while _QUEUE is None:
            time.sleep(0.01)
        self.q = _QUEUE.sync_q
        super().__init__(*args, **kwargs)

    def emit(self, record: logging.LogRecord):
        self.q.put(record)

def flusher():
    async def run():
        global _QUEUE
        if _QUEUE is None:
            _QUEUE = janus.Queue()
        # Upload record instead of print
        # Perhaps flush every n-seconds w/ buffer for upper-bound on inserts.
        q = _QUEUE.async_q
        while True:
            record = await q.get()
            print("woohoo, doing i/o:", record.msg)

    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(run())

def foo():
    print("foo")

def bar():
    print("bar")

async def baz():
    await asyncio.sleep(1)
    print("baz")

async def main():
    threading.Thread(target=flusher, daemon=True).start()
    LOG.setLevel(logging.INFO)
    LOG.addHandler(IOHandler())

    foo()
    LOG.info("starting program")
    LOG.info("doing some stuff")
    LOG.info("mighty cool")
    bar()
    await baz()


if __name__ == "__main__":
    asyncio.run(main())