Python 异步上下文
Python asyncio context
在线程中,我们有一个叫做"Thread Context"的东西,我们可以在其中保存一些数据(状态)以供在特殊线程中访问。在 asyncio 中,我需要在当前执行路径中保存一些状态,以便所有后续协程都可以访问它。解决办法是什么?
注意:我知道每个协程函数都是为 asyncio 中的执行路径实例化的,但由于某种原因我无法在函数属性中保存状态。 (虽然这个方法os反正不是很好)
还有 https://github.com/azazel75/metapensiero.asyncio.tasklocal,但您必须知道任务通常由库在内部创建,也由使用 ensure_future(a_coroutine)
的 asyncio 创建,并且没有实际的方法来跟踪这些新任务并初始化它们的局部变量(也许与创建它们的任务有关)。 (一个 "hack" 应该设置一个 loop.set_task_factory()
函数来完成这项工作,希望所有代码都使用 loop.create_task()
来创建任务,这并不总是正确的...)
另一个问题是,如果您的某些代码是在 Future 回调 Task.current_task()
函数中执行的,该函数由两个库使用 select,要服务的本地人的正确副本将始终 return None
...
从 Python 3.7 开始,您可以使用 contextvars.ContextVar。
在下面的示例中,我声明了 request_id 并在 some_outer_coroutine 中设置了值,然后访问了它在 some_inner_coroutine.
import asyncio
import contextvars
# declare context var
request_id = contextvars.ContextVar('Id of request.')
async def some_inner_coroutine():
# get value
print('Processed inner coroutine of request: {}'.format(request_id.get()))
async def some_outer_coroutine(req_id):
# set value
request_id.set(req_id)
await some_inner_coroutine()
# get value
print('Processed outer coroutine of request: {}'.format(request_id.get()))
async def main():
tasks = []
for req_id in range(1, 5):
tasks.append(asyncio.create_task(some_outer_coroutine(req_id)))
await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(main())
输出:
Processed inner coroutine of request: 1
Processed outer coroutine of request: 1
Processed inner coroutine of request: 2
Processed outer coroutine of request: 2
Processed inner coroutine of request: 3
Processed outer coroutine of request: 3
Processed inner coroutine of request: 4
Processed outer coroutine of request: 4
在线程中,我们有一个叫做"Thread Context"的东西,我们可以在其中保存一些数据(状态)以供在特殊线程中访问。在 asyncio 中,我需要在当前执行路径中保存一些状态,以便所有后续协程都可以访问它。解决办法是什么? 注意:我知道每个协程函数都是为 asyncio 中的执行路径实例化的,但由于某种原因我无法在函数属性中保存状态。 (虽然这个方法os反正不是很好)
还有 https://github.com/azazel75/metapensiero.asyncio.tasklocal,但您必须知道任务通常由库在内部创建,也由使用 ensure_future(a_coroutine)
的 asyncio 创建,并且没有实际的方法来跟踪这些新任务并初始化它们的局部变量(也许与创建它们的任务有关)。 (一个 "hack" 应该设置一个 loop.set_task_factory()
函数来完成这项工作,希望所有代码都使用 loop.create_task()
来创建任务,这并不总是正确的...)
另一个问题是,如果您的某些代码是在 Future 回调 Task.current_task()
函数中执行的,该函数由两个库使用 select,要服务的本地人的正确副本将始终 return None
...
从 Python 3.7 开始,您可以使用 contextvars.ContextVar。
在下面的示例中,我声明了 request_id 并在 some_outer_coroutine 中设置了值,然后访问了它在 some_inner_coroutine.
import asyncio
import contextvars
# declare context var
request_id = contextvars.ContextVar('Id of request.')
async def some_inner_coroutine():
# get value
print('Processed inner coroutine of request: {}'.format(request_id.get()))
async def some_outer_coroutine(req_id):
# set value
request_id.set(req_id)
await some_inner_coroutine()
# get value
print('Processed outer coroutine of request: {}'.format(request_id.get()))
async def main():
tasks = []
for req_id in range(1, 5):
tasks.append(asyncio.create_task(some_outer_coroutine(req_id)))
await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(main())
输出:
Processed inner coroutine of request: 1
Processed outer coroutine of request: 1
Processed inner coroutine of request: 2
Processed outer coroutine of request: 2
Processed inner coroutine of request: 3
Processed outer coroutine of request: 3
Processed inner coroutine of request: 4
Processed outer coroutine of request: 4