Python和Trio,其中生产者是消费者,如何在工作完成后优雅退出?
Python and Trio, where producers are consumers, how to exit gracefully when the job is done?
我正在尝试使用 trio
和 asks
制作一个简单的网络爬虫。我使用 Nursery 一次启动几个爬虫,并使用内存通道来维护要访问的 url 列表。
每个爬虫都会收到该通道两端的克隆,因此它们可以抓取 url(通过 receive_channel),读取它,查找并添加新的 url访问过(通过send_channel)。
async def main():
send_channel, receive_channel = trio.open_memory_channel(math.inf)
async with trio.open_nursery() as nursery:
async with send_channel, receive_channel:
nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())
nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())
nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())
async def crawler(send_channel, receive_channel):
async for url in receive_channel: # I'm a consumer!
content = await ...
urls_found = ...
for u in urls_found:
await send_channel.send(u) # I'm a producer too!
在这种情况下,消费者是生产者。如何优雅地停止一切?
关闭一切的条件是:
- 频道为空
- 和
- 所有爬虫都卡在第一个 for 循环,等待 url 出现在 receive_channel(这...不会再发生了)
我尝试在 crawler()
中使用 async with send_channel
,但找不到好的方法。我也试图找到一些不同的方法(一些内存通道绑定工作池等),这里也没有运气。
这里至少有两个问题。
首先是您关于在频道为空时停止的假设。由于您分配的内存通道大小为 0,因此它将始终为空。如果爬虫准备好接收它,您只能传递 url。
这产生了第二个问题。如果您发现的 url 多于您分配的爬虫,您的应用程序就会死锁。
原因是,由于您无法将找到的所有 url 交给爬虫,爬虫永远不会准备好接收新的 url 进行爬网,因为它一直在等待另一个爬虫获取它的 urls.
这变得更糟,因为假设其他爬虫之一找到新的 urls,它们也会被困在已经在等待移交其 urls 的爬虫后面,并且它们将永远无法获取正在等待处理的 url 之一。
文档的相关部分:
https://trio.readthedocs.io/en/stable/reference-core.html#buffering-in-channels
假设我们解决了这个问题,下一步该去哪里?
您可能需要保留所有访问过的 url 的列表(集合?),以确保您不会再次访问它们。
要真正弄清楚什么时候停止,而不是关闭通道,简单地取消 nursery 可能要容易得多。
假设我们像这样修改主循环:
async def main():
send_channel, receive_channel = trio.open_memory_channel(math.inf)
active_workers = trio.CapacityLimiter(3) # Number of workers
async with trio.open_nursery() as nursery:
async with send_channel, receive_channel:
nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
while True:
await trio.sleep(1) # Give the workers a chance to start up.
if active_workers.borrowed_tokens == 0 and send_channel.statistics().current_buffer_used == 0:
nursery.cancel_scope.cancel() # All done!
现在我们需要稍微修改爬虫,以便在活动时获取令牌。
async def crawler(active_workers, send_channel, receive_channel):
async for url in receive_channel: # I'm a consumer!
with active_workers:
content = await ...
urls_found = ...
for u in urls_found:
await send_channel.send(u) # I'm a producer too!
其他需要考虑的事情-
您可能想在抓取工具中使用 send_channel.send_noblock(u)
。由于您有一个无界缓冲区,因此不可能出现 WouldBlock 异常,并且可能需要在每次发送时不触发检查点的行为。这样你就可以肯定地知道,在其他任务有机会获取新的 url 或父任务之前,特定的 url 已被完全处理并且所有新的 url 已被添加任务有机会检查工作是否完成。
这是我在尝试重组问题时想到的解决方案:
async def main():
send_channel, receive_channel = trio.open_memory_channel(math.inf)
limit = trio.CapacityLimiter(3)
async with send_channel:
await send_channel.send(('https://start-url', send_channel.clone()))
#HERE1
async with trio.open_nursery() as nursery:
async for url, send_channel in receive_channel: #HERE3
nursery.start(consumer, url, send_channel, limit)
async def crawler(url, send_channel, limit, task_status):
async with limit, send_channel:
content = await ...
links = ...
for link in links:
await send_channel.send((link, send_channel.clone()))
#HERE2
(我跳过了跳过访问过的网址)
这里没有3个long lived consumer,但是只要有足够的工作给他们,最多有3个consumer。
在#HERE1 处,send_channel 已关闭(因为它被用作上下文管理器),唯一使通道保持活动状态的是它在该通道内的克隆。
在#HERE2,克隆也被关闭(因为上下文管理器)。如果通道为空,则该克隆是保持通道活动的最后一件事。通道消失,循环结束 (#HERE3)。
除非找到 URL,在这种情况下,它们会与更多 send_channel 的克隆一起添加到频道中,这将使频道保持足够长的时间来处理这些 URL。
这个和 Anders E. Andersen 的解决方案对我来说都感觉很老套:一个是使用 sleep
和 statistics()
,另一个创建 send_channel 的克隆并将它们放入频道中。 .. 对我来说感觉就像是克莱因瓶的软件实现。我可能会寻找其他一些方法。
我正在尝试使用 trio
和 asks
制作一个简单的网络爬虫。我使用 Nursery 一次启动几个爬虫,并使用内存通道来维护要访问的 url 列表。
每个爬虫都会收到该通道两端的克隆,因此它们可以抓取 url(通过 receive_channel),读取它,查找并添加新的 url访问过(通过send_channel)。
async def main():
send_channel, receive_channel = trio.open_memory_channel(math.inf)
async with trio.open_nursery() as nursery:
async with send_channel, receive_channel:
nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())
nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())
nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())
async def crawler(send_channel, receive_channel):
async for url in receive_channel: # I'm a consumer!
content = await ...
urls_found = ...
for u in urls_found:
await send_channel.send(u) # I'm a producer too!
在这种情况下,消费者是生产者。如何优雅地停止一切?
关闭一切的条件是:
- 频道为空
- 和
- 所有爬虫都卡在第一个 for 循环,等待 url 出现在 receive_channel(这...不会再发生了)
我尝试在 crawler()
中使用 async with send_channel
,但找不到好的方法。我也试图找到一些不同的方法(一些内存通道绑定工作池等),这里也没有运气。
这里至少有两个问题。
首先是您关于在频道为空时停止的假设。由于您分配的内存通道大小为 0,因此它将始终为空。如果爬虫准备好接收它,您只能传递 url。
这产生了第二个问题。如果您发现的 url 多于您分配的爬虫,您的应用程序就会死锁。
原因是,由于您无法将找到的所有 url 交给爬虫,爬虫永远不会准备好接收新的 url 进行爬网,因为它一直在等待另一个爬虫获取它的 urls.
这变得更糟,因为假设其他爬虫之一找到新的 urls,它们也会被困在已经在等待移交其 urls 的爬虫后面,并且它们将永远无法获取正在等待处理的 url 之一。
文档的相关部分:
https://trio.readthedocs.io/en/stable/reference-core.html#buffering-in-channels
假设我们解决了这个问题,下一步该去哪里?
您可能需要保留所有访问过的 url 的列表(集合?),以确保您不会再次访问它们。
要真正弄清楚什么时候停止,而不是关闭通道,简单地取消 nursery 可能要容易得多。
假设我们像这样修改主循环:
async def main():
send_channel, receive_channel = trio.open_memory_channel(math.inf)
active_workers = trio.CapacityLimiter(3) # Number of workers
async with trio.open_nursery() as nursery:
async with send_channel, receive_channel:
nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
while True:
await trio.sleep(1) # Give the workers a chance to start up.
if active_workers.borrowed_tokens == 0 and send_channel.statistics().current_buffer_used == 0:
nursery.cancel_scope.cancel() # All done!
现在我们需要稍微修改爬虫,以便在活动时获取令牌。
async def crawler(active_workers, send_channel, receive_channel):
async for url in receive_channel: # I'm a consumer!
with active_workers:
content = await ...
urls_found = ...
for u in urls_found:
await send_channel.send(u) # I'm a producer too!
其他需要考虑的事情-
您可能想在抓取工具中使用 send_channel.send_noblock(u)
。由于您有一个无界缓冲区,因此不可能出现 WouldBlock 异常,并且可能需要在每次发送时不触发检查点的行为。这样你就可以肯定地知道,在其他任务有机会获取新的 url 或父任务之前,特定的 url 已被完全处理并且所有新的 url 已被添加任务有机会检查工作是否完成。
这是我在尝试重组问题时想到的解决方案:
async def main():
send_channel, receive_channel = trio.open_memory_channel(math.inf)
limit = trio.CapacityLimiter(3)
async with send_channel:
await send_channel.send(('https://start-url', send_channel.clone()))
#HERE1
async with trio.open_nursery() as nursery:
async for url, send_channel in receive_channel: #HERE3
nursery.start(consumer, url, send_channel, limit)
async def crawler(url, send_channel, limit, task_status):
async with limit, send_channel:
content = await ...
links = ...
for link in links:
await send_channel.send((link, send_channel.clone()))
#HERE2
(我跳过了跳过访问过的网址)
这里没有3个long lived consumer,但是只要有足够的工作给他们,最多有3个consumer。
在#HERE1 处,send_channel 已关闭(因为它被用作上下文管理器),唯一使通道保持活动状态的是它在该通道内的克隆。
在#HERE2,克隆也被关闭(因为上下文管理器)。如果通道为空,则该克隆是保持通道活动的最后一件事。通道消失,循环结束 (#HERE3)。
除非找到 URL,在这种情况下,它们会与更多 send_channel 的克隆一起添加到频道中,这将使频道保持足够长的时间来处理这些 URL。
这个和 Anders E. Andersen 的解决方案对我来说都感觉很老套:一个是使用 sleep
和 statistics()
,另一个创建 send_channel 的克隆并将它们放入频道中。 .. 对我来说感觉就像是克莱因瓶的软件实现。我可能会寻找其他一些方法。