如何使用 asyncio 在 set/list 理解中使用并行化?
How to use parallelization in set/list comprehension using asyncio?
我想在 Python 3.7.
中创建多进程理解
这是我的代码:
async def _url_exists(url):
"""Check whether a url is reachable"""
request = requests.get(url)
return request.status_code == 200:
async def _remove_unexisting_urls(rows):
return {row for row in rows if await _url_exists(row[0])}
rows = [
'http://example.com/',
'http://example.org/',
'http://foo.org/',
]
rows = asyncio.run(_remove_unexisting_urls(rows))
在此代码示例中,我想从列表中删除不存在的 URL。 (请注意,我使用的是集合而不是列表,因为我还想删除重复项)。
我的问题是我仍然看到执行是顺序的。 HTTP 请求使执行等待。
与串行执行相比,执行时间相同。
- 我是不是做错了什么?
- 这些 await/async 关键字应该如何与 python 理解一起使用?
asyncio
本身不会 运行 不同的 async
同时运行。但是,使用 multiprocessing
模块的 Pool.map
,您可以在另一个进程中将函数调度到 运行:
from multiprocessing.pool import Pool
pool = Pool()
def fetch(url):
request = requests.get(url)
return request.status_code == 200
rows = [
'http://example.com/',
'http://example.org/',
'http://foo.org/',
]
rows = [r for r in pool.map(fetch, rows) if r]
requests
不支持asyncio
。如果你想要真正的异步执行,你将不得不查看像 aiohttp or asks
这样的库
你的集合应该在卸载到任务之前构建,所以你甚至不会重复执行,而不是简化结果。
使用 requests
本身,您可以回退到 run_in_executor
,它将在 ThreadPoolExecutor 内执行您的请求,因此不是真正的异步 I/O:
import asyncio
import time
from requests import exceptions, get
def _url_exists(url):
try:
r = get(url, timeout=10)
except (exceptions.ConnectionError, exceptions.ConnectTimeout):
return False
else:
return r.status_code is 200
async def _remove_unexisting_urls(l, r):
# making a set from the list before passing it to the futures
# so we just have three tasks instead of nine
futures = [l.run_in_executor(None, _url_exists, url) for url in set(r)]
return [await f for f in futures]
rows = [ # added some dupes
'http://example.com/',
'http://example.com/',
'http://example.com/',
'http://example.org/',
'http://example.org/',
'http://example.org/',
'http://foo.org/',
'http://foo.org/',
'http://foo.org/',
]
loop = asyncio.get_event_loop()
print(time.time())
result = loop.run_until_complete(_remove_unexisting_urls(loop, rows))
print(time.time())
print(result)
输出
1537266974.403686
1537266986.6789136
[False, False, False]
如您所见,初始化线程池有一个惩罚,在这种情况下约为 2.3 秒。然而,考虑到这三个任务中的每一个都运行十秒直到我的盒子超时(我的 IDE 不允许通过代理),总共十二秒的执行时间看起来相当并发。
我想在 Python 3.7.
中创建多进程理解这是我的代码:
async def _url_exists(url):
"""Check whether a url is reachable"""
request = requests.get(url)
return request.status_code == 200:
async def _remove_unexisting_urls(rows):
return {row for row in rows if await _url_exists(row[0])}
rows = [
'http://example.com/',
'http://example.org/',
'http://foo.org/',
]
rows = asyncio.run(_remove_unexisting_urls(rows))
在此代码示例中,我想从列表中删除不存在的 URL。 (请注意,我使用的是集合而不是列表,因为我还想删除重复项)。
我的问题是我仍然看到执行是顺序的。 HTTP 请求使执行等待。 与串行执行相比,执行时间相同。
- 我是不是做错了什么?
- 这些 await/async 关键字应该如何与 python 理解一起使用?
asyncio
本身不会 运行 不同的 async
同时运行。但是,使用 multiprocessing
模块的 Pool.map
,您可以在另一个进程中将函数调度到 运行:
from multiprocessing.pool import Pool
pool = Pool()
def fetch(url):
request = requests.get(url)
return request.status_code == 200
rows = [
'http://example.com/',
'http://example.org/',
'http://foo.org/',
]
rows = [r for r in pool.map(fetch, rows) if r]
requests
不支持asyncio
。如果你想要真正的异步执行,你将不得不查看像 aiohttp or asks
你的集合应该在卸载到任务之前构建,所以你甚至不会重复执行,而不是简化结果。
使用 requests
本身,您可以回退到 run_in_executor
,它将在 ThreadPoolExecutor 内执行您的请求,因此不是真正的异步 I/O:
import asyncio
import time
from requests import exceptions, get
def _url_exists(url):
try:
r = get(url, timeout=10)
except (exceptions.ConnectionError, exceptions.ConnectTimeout):
return False
else:
return r.status_code is 200
async def _remove_unexisting_urls(l, r):
# making a set from the list before passing it to the futures
# so we just have three tasks instead of nine
futures = [l.run_in_executor(None, _url_exists, url) for url in set(r)]
return [await f for f in futures]
rows = [ # added some dupes
'http://example.com/',
'http://example.com/',
'http://example.com/',
'http://example.org/',
'http://example.org/',
'http://example.org/',
'http://foo.org/',
'http://foo.org/',
'http://foo.org/',
]
loop = asyncio.get_event_loop()
print(time.time())
result = loop.run_until_complete(_remove_unexisting_urls(loop, rows))
print(time.time())
print(result)
输出
1537266974.403686
1537266986.6789136
[False, False, False]
如您所见,初始化线程池有一个惩罚,在这种情况下约为 2.3 秒。然而,考虑到这三个任务中的每一个都运行十秒直到我的盒子超时(我的 IDE 不允许通过代理),总共十二秒的执行时间看起来相当并发。