Mac Web 请求在数千次请求后挂起

Mac web requests hanging after thousands of requests

我遇到了一个非常奇怪的问题,我正在寻求有关如何调试它的建议,而不是简单的修复,因为我一直无法创建一个简单的可重现案例。

在几个小时内,我使用 httpx 打开了 10,000-100,000 个对远程 Web 域的异步请求。具体来说,我正在使用上下文管理器的共享池来跨请求共享 TCP 套接字/其他资源。我在任何时候都只有几千个请求待处理。我的代码的核心是执行以下操作:

from random import choice
from httpx import AsyncClient

clients = []

for _ in range(200):
   client = AsyncClient()
   clients.append(await client.__aopen__())

async def run_request(url):
    try:
        client = choice(clients)
        response = await client.get(url, timeout=15)
    except Exception as e:
        continue
with ProcessPoolExecutor() as executor:
    await gather(
        *[
            asyncio.get_event_loop().run_in_executor(
                executor,
                partial(run_request, url=url)
            )
            for url in urls
          ]
      )

有时会在超时或无法访问主机的情况下抛出异常循环。

有时我的整个机器在尝试创建新连接时会挂起。 Chrome 冻结,本地托管的 postgres 实例冻结,甚至 lsof -i -a 冻结。然而 none 实际上超时,它们只是永远旋转。似乎 OS 无法分配新的套接字以便与远程主机通信,但我不确定这是否解释了 postgres 或 lsof 的行为。

尽管有上下文管理器,套接字打开是否有可能被泄露而不被释放?有没有人见过类似的东西?要探索哪些分析方法来确定根本原因?

在 OSX 中调试此类问题的“正确”方法是获取 DTrace 并查看系统调用。如果您不想进入那个兔子洞,那么启动 tcpdump 并监控 HTTP(s) 流量以检查该进程是否确实在工作或实际上处于循环中。

就是说,问题是共享可变状态导致代码不是下面的 thread-safe 我推测为什么 OS 可能会冻结,提出一些解决方案,然后是我的代码旨在减少资源使用并使代码在可能受到攻击的服务器方面表现良好。


你所做的根本不是thread-safe,所以可能是因为这个。 AsyncClients 包含可变状态,可以被多个执行者改变。由于没有同步,执行者可能会更改其他执行者正在处理的某些客户端的状态。例如,它可以将客户端的内部状态机从“已收到响应并等待 python 读取”更改为“等待来自服务器的响应”,该响应永远不会到达,从而将执行程序锁定到位;有了足够多的无限期等待,OS 的事件循环 (kqueue iirc) 不堪重负。

上下文不够,无法多说。调试代码是否不是 thread-safe 是很困难的,尤其是当发生的事情比您分享的片段多时。这就是 Rust 如此受欢迎的全部原因,因为它甚至可以防止这些类型的问题。

有几种方法可以解决这个问题。一种是将 AsyncClient 交给每个执行程序(并为每个 url 创建一个客户端):

with ProcessPoolExecutor() as executor:
    await gather(
        *[
            asyncio.get_event_loop().run_in_executor(
                executor,
                partial(run_request, url=url, client=client)
            )
            for url,client in zip(urls,clients)
          ]
      )

或者,在生成每个执行程序后在每个执行程序中创建客户端:

async def run_request(url):
    async with httpx.AsyncClient() as client:
        try:
            response = await client.get(url, timeout=15)
        except Exception as e:
            continue

您也可以在客户周围放置一个 asyncio.Lock,然后执行者可以共享它们,但最好不要这样做。

我的首选方法是将每个域的 url 分组(例如 {"reddit.com":{<set of reddit urls>}, ...}),然后将 domain-specific url 的集合传递给函数运行 并行,并让该函数创建自己的客户端。

from httpx import AsyncClient
from concurrent.futures import ProcessPoolExecutor
import asyncio
from functools import partial
from urllib.parse import urlparse

clients = []

async def run_request(urls):
    async with httpx.AsyncClient(timeout=15) as client:
        resps = []
        deadletter = []
        for url in urls:
            try:
                resp = await client.get(url)
                resps.append(resp)
            except Exception as e:
                deadletter.append((url,e,resp))
        return resps,deadletter

def get_domain_from_url(url):
    c = urlparse(url)
    return c.netloc


async def entrypoint(urls):
    domain_url_map = {}
    for url in urls:
        domain = get_domain_from_url(url)
        
        # using a set deduplicates the urls
        domain_url_map[domain] = domain_url_map.get(domain,set())

        # set.add == list.append
        domain_url_map[domain].add(url) 


    with ProcessPoolExecutor() as executor:
        responses, errors = await asyncio.gather(
            *[
                asyncio.get_event_loop().run_in_executor(
                    executor,
                    partial(run_request, urls=urls)
                )
                for urls in domain_url_map.values()
            ]
        )

这样你就大大减少了客户端(可能还有套接字)的数量,这很好。域不会相互影响,因此 rate-limiting 不太可能影响多个客户端。您还可以删除重复数据 urls,并且可以轻松添加每个域的速率限制。

查看您发布的代码,您似乎没有处理 httpx async documentation 中提到的 AsyncClient 的 release/closing:

Use async with httpx.AsyncClient() if you want a context-managed client

async with httpx.AsyncClient() as client:
   <your code here>

Alternatively, use await client.aclose() if you want to close a client explicitly:

client = httpx.AsyncClient() 
<your code here> 
await client.aclose()

如文档中所述,我认为最好的方法是在 run_request 函数中打开上下文管理器。

您还可以在此 github issue on httpx repo 中找到报告的类似问题,他们的方法是使用上下文管理器。

除了他们自己的回购协议和文档,只是为了提供一个示例,例如:

async def run_request(url):
    async with httpx.AsyncClient() as client:
        try:
            response = await client.get(url, timeout=15)
        except Exception as e:
            continue