Python和Trio,其中生产者是消费者,如何在工作完成后优雅退出?

Python and Trio, where producers are consumers, how to exit gracefully when the job is done?

我正在尝试使用 trioasks 制作一个简单的网络爬虫。我使用 Nursery 一次启动几个爬虫,并使用内存通道来维护要访问的 url 列表。

每个爬虫都会收到该通道两端的克隆,因此它们可以抓取 url(通过 receive_channel),读取它,查找并添加新的 url访问过(通过send_channel)。

async def main():
    send_channel, receive_channel = trio.open_memory_channel(math.inf)
    async with trio.open_nursery() as nursery:
        async with send_channel, receive_channel:
            nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())
            nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())
            nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())


async def crawler(send_channel, receive_channel):
    async for url in receive_channel:  # I'm a consumer!
        content = await ...
        urls_found = ...
        for u in urls_found:
            await send_channel.send(u)  # I'm a producer too!

在这种情况下,消费者生产者。如何优雅地停止一切?

关闭一切的条件是:

我尝试在 crawler() 中使用 async with send_channel,但找不到好的方法。我也试图找到一些不同的方法(一些内存通道绑定工作池等),这里也没有运气。

这里至少有两个问题。

首先是您关于在频道为空时停止的假设。由于您分配的内存通道大小为 0,因此它将始终为空。如果爬虫准备好接收它,您只能传递 url。

这产生了第二个问题。如果您发现的 url 多于您分配的爬虫,您的应用程序就会死锁。

原因是,由于您无法将找到的所有 url 交给爬虫,爬虫永远不会准备好接收新的 url 进行爬网,因为它一直在等待另一个爬虫获取它的 urls.

这变得更糟,因为假设其他爬虫之一找到新的 urls,它们也会被困在已经在等待移交其 urls 的爬虫后面,并且它们将永远无法获取正在等待处理的 url 之一。

文档的相关部分:

https://trio.readthedocs.io/en/stable/reference-core.html#buffering-in-channels

假设我们解决了这个问题,下一步该去哪里?

您可能需要保留所有访问过的 url 的列表(集合?),以确保您不会再次访问它们。

要真正弄清楚什么时候停止,而不是关闭通道,简单地取消 nursery 可能要容易得多。

假设我们像这样修改主循环:

async def main():
    send_channel, receive_channel = trio.open_memory_channel(math.inf)
    active_workers = trio.CapacityLimiter(3) # Number of workers
    async with trio.open_nursery() as nursery:
        async with send_channel, receive_channel:
            nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
            nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
            nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
            while True:
                await trio.sleep(1) # Give the workers a chance to start up.
                if active_workers.borrowed_tokens == 0 and send_channel.statistics().current_buffer_used == 0:
                    nursery.cancel_scope.cancel() # All done!

现在我们需要稍微修改爬虫,以便在活动时获取令牌。

async def crawler(active_workers, send_channel, receive_channel):
    async for url in receive_channel:  # I'm a consumer!
        with active_workers:
            content = await ...
            urls_found = ...
            for u in urls_found:
                await send_channel.send(u)  # I'm a producer too!

其他需要考虑的事情-

您可能想在抓取工具中使用 send_channel.send_noblock(u)。由于您有一个无界缓冲区,因此不可能出现 WouldBlock 异常,并且可能需要在每次发送时不触发检查点的行为。这样你就可以肯定地知道,在其他任务有机会获取新的 url 或父任务之前,特定的 url 已被完全处理并且所有新的 url 已被添加任务有机会检查工作是否完成。

这是我在尝试重组问题时想到的解决方案:

async def main():
    send_channel, receive_channel = trio.open_memory_channel(math.inf)
 
    limit = trio.CapacityLimiter(3)

    async with send_channel:
        await send_channel.send(('https://start-url', send_channel.clone()))
    #HERE1

    async with trio.open_nursery() as nursery:
        async for url, send_channel in receive_channel:  #HERE3
            nursery.start(consumer, url, send_channel, limit)

async def crawler(url, send_channel, limit, task_status):
    async with limit, send_channel:
        content = await ...
        links = ...
        for link in links:
            await send_channel.send((link, send_channel.clone()))
    #HERE2

(我跳过了跳过访问过的网址)

这里没有3个long lived consumer,但是只要有足够的工作给他们,最多有3个consumer。

在#HERE1 处,send_channel 已关闭(因为它被用作上下文管理器),唯一使通道保持活动状态的是它在该通道内的克隆。

在#HERE2,克隆也被关闭(因为上下文管理器)。如果通道为空,则该克隆是保持通道活动的最后一件事。通道消失,循环结束 (#HERE3)。

除非找到 URL,在这种情况下,它们会与更多 send_channel 的克隆一起添加到频道中,这将使频道保持足够长的时间来处理这些 URL。

这个和 Anders E. Andersen 的解决方案对我来说都感觉很老套:一个是使用 sleepstatistics(),另一个创建 send_channel 的克隆并将它们放入频道中。 .. 对我来说感觉就像是克莱因瓶的软件实现。我可能会寻找其他一些方法。