Python 异步上下文

Python asyncio context

在线程中,我们有一个叫做"Thread Context"的东西,我们可以在其中保存一些数据(状态)以供在特殊线程中访问。在 asyncio 中,我需要在当前执行路径中保存一些状态,以便所有后续协程都可以访问它。解决办法是什么? 注意:我知道每个协程函数都是为 asyncio 中的执行路径实例化的,但由于某种原因我无法在函数属性中保存状态。 (虽然这个方法os反正不是很好)

还有 https://github.com/azazel75/metapensiero.asyncio.tasklocal,但您必须知道任务通常由库在内部创建,也由使用 ensure_future(a_coroutine) 的 asyncio 创建,并且没有实际的方法来跟踪这些新任务并初始化它们的局部变量(也许与创建它们的任务有关)。 (一个 "hack" 应该设置一个 loop.set_task_factory() 函数来完成这项工作,希望所有代码都使用 loop.create_task() 来创建任务,这并不总是正确的...)

另一个问题是,如果您的某些代码是在 Future 回调 Task.current_task() 函数中执行的,该函数由两个库使用 select,要服务的本地人的正确副本将始终 return None...

从 Python 3.7 开始,您可以使用 contextvars.ContextVar

在下面的示例中,我声明了 request_id 并在 some_outer_coroutine 中设置了值,然后访问了它在 some_inner_coroutine.

import asyncio
import contextvars

# declare context var
request_id = contextvars.ContextVar('Id of request.')


async def some_inner_coroutine():
    # get value
    print('Processed inner coroutine of request: {}'.format(request_id.get()))


async def some_outer_coroutine(req_id):
    # set value
    request_id.set(req_id)

    await some_inner_coroutine()

    # get value
    print('Processed outer coroutine of request: {}'.format(request_id.get()))


async def main():
    tasks = []
    for req_id in range(1, 5):
        tasks.append(asyncio.create_task(some_outer_coroutine(req_id)))

    await asyncio.gather(*tasks)


if __name__ == '__main__':
    asyncio.run(main())

输出:

Processed inner coroutine of request: 1
Processed outer coroutine of request: 1
Processed inner coroutine of request: 2
Processed outer coroutine of request: 2
Processed inner coroutine of request: 3
Processed outer coroutine of request: 3
Processed inner coroutine of request: 4
Processed outer coroutine of request: 4