如何使用 parent child 函数收集结果和使用限制
How to gather results and using limit with parent child functions
我想要实现的是生成多个 parents 并且每个 parent 做一些工作然后生成几个孩子来检查其他东西并抓住这些结果在 parent 中进行进一步的工作。
我还试图制作 2 个不同的生成限制,因为 parent 工作可以做的比孩子更多。
我该如何完成?
如果我不使用 limit2 就可以,但我想要两个限制器。
import trio
import asks
import time
import random
async def child(parent, i, sender, limit2):
async with limit2:
print('Parent {0}, Child {1}: started! Sleeping now...'.format(parent, i))
#await trio.sleep(random.randrange(0, 3))
print('Parent {0}, Child {1}: exiting!'.format(parent, i))
async with sender:
await sender.send('Parent {0}, Child {1}: exiting!'.format(parent, i))
async def parent(i, limit):
async with limit:
print('Parent {0}: started! Sleeping now...'.format(i))
#await trio.sleep(random.randrange(0, 3))
sender, receiver = trio.open_memory_channel(10)
limit2 = trio.CapacityLimiter(2)
async with trio.open_nursery() as nursery:
for j in range(10):
nursery.start_soon(child, i, j, sender, limit2)
async with receiver:
async for value in receiver:
print('Got value: {!r}'.format(value))
print('Parent {0}: exiting!'.format(i))
async def main():
limit = trio.CapacityLimiter(1)
async with trio.open_nursery() as nursery:
for i in range(1):
nursery.start_soon(parent, i, limit)
if __name__ == "__main__":
start_time = time.perf_counter()
trio.run(main)
duration = time.perf_counter() - start_time
print("Took {:.2f} seconds".format(duration))
当我运行你的代码时,我得到:
File "/tmp/zigb.py", line 12, in child
await sender.send('Parent {0}, Child {1}: exiting!'.format(parent, i))
File "/home/njs/.user-python3.7/lib/python3.7/site-packages/trio/_channel.py", line 157, in send
self.send_nowait(value)
File "/home/njs/.user-python3.7/lib/python3.7/site-packages/trio/_core/_ki.py", line 167, in wrapper
return fn(*args, **kwargs)
File "/home/njs/.user-python3.7/lib/python3.7/site-packages/trio/_channel.py", line 135, in send_nowait
raise trio.ClosedResourceError
trio.ClosedResourceError
这里发生的事情是,您将 sender
通道传递给所有 10 个 child 任务,然后每个 child 任务都在执行 async with sender: ...
,关闭 sender
通道。所以第一个任务使用它,然后关闭它,然后下一个任务尝试使用它......但是它已经关闭,所以它得到一个错误。
幸运的是,Trio 为这个问题提供了一个解决方案:您可以在内存通道 object 上使用 clone
方法来创建该内存通道的第二个副本,其工作方式完全相同,但会独立关闭。所以诀窍是向每个 children 传递一个 sender
的克隆,然后他们各自关闭他们的克隆,然后一旦所有克隆都关闭,接收者就会收到通知并停止循环。
您的代码的固定版本:
import trio
import asks
import time
import random
async def child(parent, i, sender, limit2):
async with limit2:
print('Parent {0}, Child {1}: started! Sleeping now...'.format(parent, i))
#await trio.sleep(random.randrange(0, 3))
print('Parent {0}, Child {1}: exiting!'.format(parent, i))
async with sender:
await sender.send('Parent {0}, Child {1}: exiting!'.format(parent, i))
async def parent(i, limit):
async with limit:
print('Parent {0}: started! Sleeping now...'.format(i))
#await trio.sleep(random.randrange(0, 3))
sender, receiver = trio.open_memory_channel(10)
limit2 = trio.CapacityLimiter(2)
async with trio.open_nursery() as nursery:
for j in range(10):
# CHANGED: Give each child its own clone of 'sender', which
# it will close when it's done
nursery.start_soon(child, i, j, sender.clone(), limit2)
# CHANGED: Close the original 'sender', once we're done making clones
await sender.aclose()
async with receiver:
async for value in receiver:
print('Got value: {!r}'.format(value))
print('Parent {0}: exiting!'.format(i))
async def main():
limit = trio.CapacityLimiter(1)
async with trio.open_nursery() as nursery:
for i in range(1):
nursery.start_soon(parent, i, limit)
if __name__ == "__main__":
start_time = time.perf_counter()
trio.run(main)
duration = time.perf_counter() - start_time
print("Took {:.2f} seconds".format(duration))
我想要实现的是生成多个 parents 并且每个 parent 做一些工作然后生成几个孩子来检查其他东西并抓住这些结果在 parent 中进行进一步的工作。 我还试图制作 2 个不同的生成限制,因为 parent 工作可以做的比孩子更多。
我该如何完成?
如果我不使用 limit2 就可以,但我想要两个限制器。
import trio
import asks
import time
import random
async def child(parent, i, sender, limit2):
async with limit2:
print('Parent {0}, Child {1}: started! Sleeping now...'.format(parent, i))
#await trio.sleep(random.randrange(0, 3))
print('Parent {0}, Child {1}: exiting!'.format(parent, i))
async with sender:
await sender.send('Parent {0}, Child {1}: exiting!'.format(parent, i))
async def parent(i, limit):
async with limit:
print('Parent {0}: started! Sleeping now...'.format(i))
#await trio.sleep(random.randrange(0, 3))
sender, receiver = trio.open_memory_channel(10)
limit2 = trio.CapacityLimiter(2)
async with trio.open_nursery() as nursery:
for j in range(10):
nursery.start_soon(child, i, j, sender, limit2)
async with receiver:
async for value in receiver:
print('Got value: {!r}'.format(value))
print('Parent {0}: exiting!'.format(i))
async def main():
limit = trio.CapacityLimiter(1)
async with trio.open_nursery() as nursery:
for i in range(1):
nursery.start_soon(parent, i, limit)
if __name__ == "__main__":
start_time = time.perf_counter()
trio.run(main)
duration = time.perf_counter() - start_time
print("Took {:.2f} seconds".format(duration))
当我运行你的代码时,我得到:
File "/tmp/zigb.py", line 12, in child
await sender.send('Parent {0}, Child {1}: exiting!'.format(parent, i))
File "/home/njs/.user-python3.7/lib/python3.7/site-packages/trio/_channel.py", line 157, in send
self.send_nowait(value)
File "/home/njs/.user-python3.7/lib/python3.7/site-packages/trio/_core/_ki.py", line 167, in wrapper
return fn(*args, **kwargs)
File "/home/njs/.user-python3.7/lib/python3.7/site-packages/trio/_channel.py", line 135, in send_nowait
raise trio.ClosedResourceError
trio.ClosedResourceError
这里发生的事情是,您将 sender
通道传递给所有 10 个 child 任务,然后每个 child 任务都在执行 async with sender: ...
,关闭 sender
通道。所以第一个任务使用它,然后关闭它,然后下一个任务尝试使用它......但是它已经关闭,所以它得到一个错误。
幸运的是,Trio 为这个问题提供了一个解决方案:您可以在内存通道 object 上使用 clone
方法来创建该内存通道的第二个副本,其工作方式完全相同,但会独立关闭。所以诀窍是向每个 children 传递一个 sender
的克隆,然后他们各自关闭他们的克隆,然后一旦所有克隆都关闭,接收者就会收到通知并停止循环。
您的代码的固定版本:
import trio
import asks
import time
import random
async def child(parent, i, sender, limit2):
async with limit2:
print('Parent {0}, Child {1}: started! Sleeping now...'.format(parent, i))
#await trio.sleep(random.randrange(0, 3))
print('Parent {0}, Child {1}: exiting!'.format(parent, i))
async with sender:
await sender.send('Parent {0}, Child {1}: exiting!'.format(parent, i))
async def parent(i, limit):
async with limit:
print('Parent {0}: started! Sleeping now...'.format(i))
#await trio.sleep(random.randrange(0, 3))
sender, receiver = trio.open_memory_channel(10)
limit2 = trio.CapacityLimiter(2)
async with trio.open_nursery() as nursery:
for j in range(10):
# CHANGED: Give each child its own clone of 'sender', which
# it will close when it's done
nursery.start_soon(child, i, j, sender.clone(), limit2)
# CHANGED: Close the original 'sender', once we're done making clones
await sender.aclose()
async with receiver:
async for value in receiver:
print('Got value: {!r}'.format(value))
print('Parent {0}: exiting!'.format(i))
async def main():
limit = trio.CapacityLimiter(1)
async with trio.open_nursery() as nursery:
for i in range(1):
nursery.start_soon(parent, i, limit)
if __name__ == "__main__":
start_time = time.perf_counter()
trio.run(main)
duration = time.perf_counter() - start_time
print("Took {:.2f} seconds".format(duration))