使用 contextvar 跟踪 Python 中的异步循环

Using contextvar to keep track of async loop in Python

我正在尝试使用 Python 中的一个简单异步示例,主要遵循 .

我的目标是设置一个上下文变量并通过不断附加到它来跟踪一系列调用。我知道可以使用 .get() 方法访问上下文变量,并使用 .set() 方法更改它们的值。然而,在下面的例子中,尽管从控制台可以明显看出对函数 sum() 的一系列调用,但变量没有被修改。

编辑: 根据下面 Michael Butscher 的评论,我将原始上下文变量(它是一个字符串)替换为列表:output_list 并使用 .append() 迭代修改列表。这现在确实使我能够查看最终输出,但不能查看单个 sum() 方法中的中间输出。

完整代码:

import asyncio
import contextvars
import time

output_list = contextvars.ContextVar('output_list', default=list())

async def sleep():
    print(f'Time: {time.time() - start:.2f}')
    await asyncio.sleep(1)

async def sum(name, numbers):
    total = 0
    for number in numbers:
        print(f'Task {name}: Computing {total}+{number}')
        await sleep()
        total += number
    output_list.set(output_list.get().append(f"{name}"))
    print(f'Task {name}: Sum = {total}\n')
    print(f'Partial output from task {name}:', output_list.get())

start = time.time()

loop = asyncio.get_event_loop()
tasks = [
    loop.create_task(sum("A", [1, 2])),
    loop.create_task(sum("B", [1, 2, 3])),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

end = time.time()
print(f'Time: {end-start:.2f} sec')
print("Final output_str =", output_list.get())

如何迭代地跟随上下文变量列表的扩展output_list

我想要的控制台输出是:

Task A: Computing 0+1
Time: 0.00
Task B: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.02
Task B: Computing 1+2
Time: 1.02
Task A: Sum = 3

Partial output from task A: ['A']
Task B: Computing 3+3
Time: 2.02
Task B: Sum = 6

Partial output from task B: ['A', 'B']
Time: 3.03 sec
Final output_str = ['A', 'B']

相反,我得到:

Task A: Computing 0+1
Time: 0.00
Task B: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.02
Task B: Computing 1+2
Time: 1.02
Task A: Sum = 3

Partial output from task A: None
Task B: Computing 3+3
Time: 2.02
Task B: Sum = 6

Partial output from task B: None
Time: 3.03 sec
Final output_str = ['A', 'B']

根据 asyncio 文档:

Tasks support the contextvars module. When a Task is created it copies the current context and later runs its coroutine in the copied context.

所以,如果你在程序的顶部声明 cvar = contextvars.ContextVar('cvar', default='x'),当你创建一个任务时,这将复制当前的上下文,如果你修改 cvar 它只会影响复制但没有原始上下文。这就是你在最终输出中得到 ''(空字符串) 的主要原因。

要实现您想要的“轨道”,您必须使用全局变量以便在任何地方修改它。但是如果你想尝试使用 asynciocontextvars 看看它是如何工作的,请看下面的例子:

import asyncio
import contextvars
import time

output = contextvars.ContextVar('output', default='No changes at all') 

async def sleep():
    print(f'Time: {time.time() - start:.2f}')
    await asyncio.sleep(1)

async def sum(name, numbers):
    total = 0
    for number in numbers:
        print(f'Task {name}: Computing {total}+{number}')
        await sleep()
        total += number
        output.set(output.get()+name) #Here we modify the respective context
    print(f'Task {name}: Sum = {total}\n')
    print(f'Partial output from task {name}:', output.get())
    return output.get() #Here we return the variable modified
start = time.time()

# main() will have its own copy of the context
async def main():
    output.set('Changed - ') # Change output var in this function context
    # task1 and task2 will copy this context (In this contect output=='Changed - ')
    task1 = asyncio.create_task(sum("A", [1, 2])) #This task has its own copy of the context of main()
    task2 = asyncio.create_task(sum("B", [1, 2, 3])) #This task also has its own copy of the context of main()
    done, pending = await asyncio.wait({task1,task2})
    resultTask1 = task1.result() # get the value of return of task1
    resultTask2 = task2.result() # get the value of return of task1
    print('Result1: ', resultTask1)
    print('Result2: ', resultTask2)
    print('Variable output in main(): ',output.get()) # However, output in main() is sitill 'Changed - '
    output.set(output.get()+'/'+resultTask1+'/'+resultTask2) #Modify the var in this context
    print('Variable modified in main(): ', output.get())
    return output.get() #Return modified value

x = asyncio.run(main()) # Assign the return value to x

end = time.time()
print(f'Time: {end-start:.2f} sec')
print("Final output (without changes) =", output.get())
output.set(x)
print("Final output (changed) =", output.get())

##### OUTPUT #####
# Time: 0.00
# Task B: Computing 0+1
# Time: 0.00
# Task A: Computing 1+2
# Time: 1.01
# Task B: Computing 1+2
# Time: 1.01
# Task A: Sum = 3

# Partial output from task A: Changed - AA
# Task B: Computing 3+3
# Time: 2.02
# Task B: Sum = 6

# Partial output from task B: Changed - BBB
# Result1:  Changed - AA
# Result2:  Changed - BBB
# Variable output in main():  Changed -
# Variable modified in main():  Changed - /Changed - AA/Changed - BBB
# Time: 3.03 sec
# Final output (without changes) = No changes at all
# Final output (changed) = Changed - /Changed - AA/Changed - BBB

如你所见,不可能同时修改同一个变量。当 task1 正在修改其副本时,task2 也在修改其副本。