semaphore/multiple 1 个代理的 asyncio 中的池锁 - aiohttp

semaphore/multiple pool locks in asyncio for 1 proxy - aiohttp

我有 5,00,000 个网址。并希望异步获得每个响应。

import aiohttp
import asyncio    

@asyncio.coroutine
def worker(url):
    response = yield from aiohttp.request('GET', url, connector=aiohttp.TCPConnector(share_cookies=True, verify_ssl=False))
    body = yield from response.read_and_close()

    print(url)

def main():
    url_list = [] # lacs of urls, extracting from a file

    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait([worker(u) for u in url_list]))

main()

我一次要200个连接(并发200个),不能超过这个因为

当我 运行 这个程序用于 50 个 url 时它工作正常,即 url_list[:50] 但是如果我通过整个列表,我会得到这个错误

aiohttp.errors.ClientOSError: Cannot connect to host www.example.com:443 ssl:True Future/Task exception was never retrieved future: Task()

可能是频率太高了,服务器在限制后拒绝响应?

是的,可以预料到服务器在对其造成过多流量(无论 "too much traffic" 的定义如何)后停止响应。

在这种情况下限制并发请求数量(限制它们)的一种方法是使用 asyncio.Semaphore,类似于多线程中使用的方法:就像那里一样,您创建一个信号量并确保您想要限制的操作是在进行实际工作之前获取该信号量并在之后释放它。

为了您的方便,asyncio.Semaphore 实施了上下文管理器以使其更容易。

最基本的方法:

CONCURRENT_REQUESTS = 200


@asyncio.coroutine
def worker(url, semaphore):
    # Aquiring/releasing semaphore using context manager.
    with (yield from semaphore):
        response = yield from aiohttp.request(
            'GET',
            url,
            connector=aiohttp.TCPConnector(share_cookies=True,
                                           verify_ssl=False))
        body = yield from response.read_and_close()

        print(url)


def main():
    url_list = [] # lacs of urls, extracting from a file

    semaphore = asyncio.Semaphore(CONCURRENT_REQUESTS)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait([worker(u, semaphore) for u in url_list]))