我怎样才能让一些 asyncio 脚本继续抓取当前卡在执行过程中的某个地方

How can I let some asyncio script go on scraping which currently gets stuck somewhere in it's execution

我使用 asyncio 库创建了一个脚本来解析网页中不同 post 所有者的名称。这个想法是在脚本中提供此 link,该脚本解析每个页面中不同 post 的所有链接并遍历下一页以执行相同的操作。但是,脚本随后使用此函数 fetch_again() 中的所有链接到达内页,以便获得所有 post 的所有者。

虽然我可以从它的着陆页上删除所有者的名字,但我使用以下方法只是为了了解如何使用我正在尝试的设计实现相同的目的。我在脚本中使用 semaphore 来限制请求的数量。

当我使用以下脚本时,我发现它工作了 100 或更多 post 秒,然后卡住了。它不会抛出任何错误。

我试过:

import aiohttp
import asyncio
from lxml.html import fromstring
from urllib.parse import urljoin

link = "https://whosebug.com/questions/tagged/web-scraping"

semaphore = asyncio.Semaphore(10)

async def fetch(url):
    async with semaphore:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                text = await response.text()
                result = await processing_docs(session, text)
            return result

async def processing_docs(session, html):
    coros = []
    tree = fromstring(html)
    titles = [urljoin(link,title.attrib['href']) for title in tree.cssselect(".summary .question-hyperlink")]
    for title in titles:
        coros.append(fetch_again(session,title))

    next_page = tree.cssselect("div.pager a[rel='next']")
    if next_page:
        page_link = urljoin(link,next_page[0].attrib['href'])
        coros.append(fetch(page_link))
    await asyncio.gather(*coros)

async def fetch_again(session,url):
    async with semaphore:
        async with session.get(url) as response:
            text = await response.text()
            tree = fromstring(text)
            title = tree.cssselect("h1[itemprop='name'] a")[0].text
            print(title)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(asyncio.gather(*(fetch(url) for url in [link])))
    loop.run_until_complete(future)
    loop.close()

我怎样才能让脚本继续解析当前卡在执行过程中的某个地方?

脚本可能由于 死锁 而阻塞:fetch 获取信号量并调用 processing_docs,后者递归调用 [=11= 的更多实例] 和 fetch_again 信号量仍然保持 。如果 fetch 的递归深度达到 10,最里面的 fetch 将永远不会获取信号量,因为它已被其 调用者 获取。我建议您用 asyncio.Queue 替换递归,并用固定数量的工作任务排出(并填充)队列。这样你甚至不需要信号量并且保证不会死锁。

一个不需要重构的更简单的修复方法是将递归调用移到 async with semaphore 块之外的 processing_docs() ,即用信号量调用 processing_docs()释放。毕竟,信号量的目的是限制对远程服务器的并发访问,而不是本地处理,因为 asyncio 是 single-threaded。那应该消除死锁,同时仍然限制并发连接数:

async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with semaphore:
            async with session.get(url) as response:
                text = await response.text()
        result = await processing_docs(session, text)
        return result

另请注意,您可能应该在 top-level 协程中创建一个会话并将其传播到整个代码。您已经在 fetchprocessing_docsfetch_again 之间执行此操作,但您也可以在 top-level 调用 fetch.

时执行此操作