如何使用 asyncio 在 set/list 理解中使用并行化?

How to use parallelization in set/list comprehension using asyncio?

我想在 Python 3.7.

中创建多进程理解

这是我的代码:

async def _url_exists(url):
  """Check whether a url is reachable"""
  request = requests.get(url)
  return request.status_code == 200:

async def _remove_unexisting_urls(rows):
  return {row for row in rows if await _url_exists(row[0])}

rows = [
  'http://example.com/',
  'http://example.org/',
  'http://foo.org/',
]
rows = asyncio.run(_remove_unexisting_urls(rows))

在此代码示例中,我想从列表中删除不存在的 URL。 (请注意,我使用的是集合而不是列表,因为我还想删除重复项)。

我的问题是我仍然看到执行是顺序的。 HTTP 请求使执行等待。 与串行执行相比,执行时间相同。

asyncio 本身不会 运行 不同的 async 同时运行。但是,使用 multiprocessing 模块的 Pool.map,您可以在另一个进程中将函数调度到 运行:

from multiprocessing.pool import Pool

pool = Pool()

def fetch(url):
    request = requests.get(url)
    return request.status_code == 200

rows = [
  'http://example.com/',
  'http://example.org/',
  'http://foo.org/',
]
rows = [r for r in pool.map(fetch, rows) if r]

requests不支持asyncio。如果你想要真正的异步执行,你将不得不查看像 aiohttp or asks

这样的库

你的集合应该在卸载到任务之前构建,所以你甚至不会重复执行,而不是简化结果。

使用 requests 本身,您可以回退到 run_in_executor,它将在 ThreadPoolExecutor 内执行您的请求,因此不是真正的异步 I/O:

import asyncio
import time
from requests import exceptions, get

def _url_exists(url):
    try:
        r = get(url, timeout=10)
    except (exceptions.ConnectionError, exceptions.ConnectTimeout):
        return False
    else:
        return r.status_code is 200

async def _remove_unexisting_urls(l, r):
    # making a set from the list before passing it to the futures
    # so we just have three tasks instead of nine
    futures = [l.run_in_executor(None, _url_exists, url) for url in set(r)]
    return [await f for f in futures]

rows = [ # added some dupes
    'http://example.com/',
    'http://example.com/',
    'http://example.com/',
    'http://example.org/',
    'http://example.org/',
    'http://example.org/',
    'http://foo.org/',
    'http://foo.org/',
    'http://foo.org/',
]

loop = asyncio.get_event_loop()
print(time.time())
result = loop.run_until_complete(_remove_unexisting_urls(loop, rows))
print(time.time())
print(result)

输出

1537266974.403686
1537266986.6789136
[False, False, False]

如您所见,初始化线程池有一个惩罚,在这种情况下约为 2.3 秒。然而,考虑到这三个任务中的每一个都运行十秒直到我的盒子超时(我的 IDE 不允许通过代理),总共十二秒的执行时间看起来相当并发。