gevent / requests 在发出大量 head 请求时挂起

gevent / requests hangs while making lots of head requests

我需要发出 100k head 请求,并且我在请求之上使用 gevent。我的代码运行了一段时间,但最终挂起。我不确定它为什么挂起,或者它是否挂在请求或 gevent 中。我在请求和 gevent 中都使用了超时参数。

请查看下面的代码片段,让我知道我应该更改哪些内容。

import gevent
from gevent import monkey, pool
monkey.patch_all()
import requests

def get_head(url, timeout=3):
    try:
        return requests.head(url, allow_redirects=True, timeout=timeout)
    except:
        return None

def expand_short_urls(short_urls, chunk_size=100, timeout=60*5):
    chunk_list = lambda l, n: ( l[i:i+n] for i in range(0, len(l), n) )
    p = pool.Pool(chunk_size)
    print 'Expanding %d short_urls' % len(short_urls)
    results = {}
    for i, _short_urls_chunked in enumerate(chunk_list(short_urls, chunk_size)):
        print '\t%d. processing %d urls @ %s' % (i, chunk_size, str(datetime.datetime.now()))
        jobs = [p.spawn(get_head, _short_url) for _short_url in _short_urls_chunked]
        gevent.joinall(jobs, timeout=timeout)
        results.update({_short_url:job.get().url for _short_url, job in zip(_short_urls_chunked, jobs) if job.get() is not None and job.get().status_code==200})
    return results 

我试过 grequests,但它被放弃了,我已经完成了 github 拉取请求,但它们也都有问题。

我不确定这是否能解决您的问题,但您没有正确使用 pool.Pool()。

试试这个:

def expand_short_urls(short_urls, chunk_size=100):
    # Pool() automatically limits your process to chunk_size greenlets running concurrently
    # thus you don't need to do all that chunking business you were doing in your for loop
    p = pool.Pool(chunk_size)
    print 'Expanding %d short_urls' % len(short_urls)

    # spawn() (both gevent.spawn() and Pool.spawn()) returns a gevent.Greenlet object
    # NOT the value your function, get_head, will return
    threads = [p.spawn(get_head, short_url) for short_url in short_urls]
    p.join()

    # to access the returned value of your function, access the Greenlet.value property
    results = {short_url: thread.value.url for short_url, thread in zip(short_urls, threads) 

如果 thread.value 不是 None 并且 thread.value.status_code == 200} return 个结果

您观察到的 RAM 使用情况主要源于存储 100.000 个响应对象时堆积的所有数据,以及所有底层开销。我已经复制了您的应用案例,并针对 Alexa 排名靠前的 15000 个 URL 发出了 HEAD 请求。没关系

  • 我是使用 gevent 池(即每个连接一个 greenlet)还是一组固定的 greenlet,所有这些都请求多个 URL
  • 我将池大小设置为多大

最后,RAM 使用量随着时间的推移增长到相当大的数量。但是,我注意到从 requests 更改为 urllib2 已经导致 RAM 使用量减少了大约两倍。也就是我换成了

result = requests.head(url)

request = urllib2.Request(url)
request.get_method = lambda : 'HEAD'
result = urllib2.urlopen(request)

其他一些建议:不要使用两种超时机制。 Gevent 的超时方法非常扎实,你可以像这样轻松使用它:

def gethead(url):
    result = None
    try:
        with Timeout(5, False):
            result = requests.head(url)
    except Exception as e:
        result = e
    return result

可能看起来很棘手,但是 returns None(恰好 5 秒后,并指示超时)、表示通信错误的任何异常对象或响应。效果很好!

虽然这可能不是问题的一部分,但在这种情况下,我建议让工人 活着 并让他们每个人处理多个项目!生成小绿叶的开销确实很小。不过,这将是一个非常简单的解决方案,其中包含一组长寿命的 greenlets:

def qworker(qin, qout):
    while True:
        try:
            qout.put(gethead(qin.get(block=False)))
        except Empty:
            break

qin = Queue()
qout = Queue()

for url in urls:
    qin.put(url)

workers = [spawn(qworker, qin, qout) for i in xrange(POOLSIZE)]
joinall(workers)
returnvalues = [qout.get() for _ in xrange(len(urls))]

此外,您确实需要意识到这是您在那里解决的一个大规模问题,会产生非标准问题。当我用 20 秒的超时时间和 100 个工人和 15000 个要请求的 URL 重现你的场景时,我很容易得到大量的套接字:

# netstat -tpn | wc -l
10074

也就是说,OS 有超过 10000 个套接字需要管理,其中大部分处于 TIME_WAIT 状态。我还观察到 "Too many open files" 错误,并通过 sysctl 调高了限制。当您请求 100.000 个 URL 时,您也可能会达到这样的限制,您需要采取措施防止系统耗尽。

还要注意您使用请求的方式,它会自动遵循从 HTTP 到 HTTPS 的重定向,并自动验证证书,所有这些肯定会消耗 RAM。

在我的测量中,当我将请求的 URL 数除以程序的运行时间时,我几乎没有超过 100 responses/s,这是与国外服务器的高延迟连接的结果世界。我猜你也受到这样的限制的影响。将体系结构的其余部分调整到此限制,您可能能够生成从 Internet 到磁盘(或数据库)的数据流,而中间的 RAM 使用量不会那么大。

我应该回答你的两个主要问题,具体来说:

我认为gevent/the您使用它的方式不是您的问题。我认为您只是低估了任务的复杂性。它伴随着严重的问题,并将您的系统推向极限。

  • 您的 RAM 使用问题:如果可以,请先使用 urllib2。那么,如果事情仍然积累太多,你就需要去积累。尝试产生一个稳定状态:您可能想要开始将数据写入磁盘,并且通常会努力解决对象可以被垃圾收集的情况。

  • 您的代码 "eventually hangs":这可能是您的 RAM 问题。如果不是,则不要生成那么多小绿叶,而是按照说明 重新使用 它们。此外,进一步降低并发性,监控打开的套接字数量,必要时增加系统限制,并尝试找出确切你的软件挂起的地方。