Python 异步循环 concurrent.futures.ThreadPoolExecutor
Python async loop concurrent.futures.ThreadPoolExecutor
我正在尝试以异步方式从一组 URL 中提取数据。我想每 10 秒(或多或少)按一组 URL 执行请求。
import aiohttp
import asyncio
from aiohttp import ClientSession
def create_list_urls():
list_urls = [["http://apiexample.com/param1", "http://apiexample2.com/param1"],
["http://apiexample.com/param2", "http://apiexample2.com/param2"]]
return list_urls
async def retrieve_datas(url, session):
async with session.get(url) as response:
return await response.json()
async def main():
while True:
urls_to_crawl = create_list_urls()
for urls in urls_to_crawl:
tasks = []
async with ClientSession() as session:
for url in urls:
tasks.append(asyncio.ensure_future(
retrieve_datas(url, session)))
datas_extracted = await asyncio.gather(*tasks, return_exceptions=False)
print(datas_extracted)
asyncio.sleep(10)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(main())
loop.run_until_complete(future)
但是我收到这个错误:
Traceback (most recent call last):
File "test.py", line 34, in <module>
loop.run_until_complete(future)
File "/usr/lib/python3.5/asyncio/base_events.py", line 466, in run_until_complete
return future.result()
File "/usr/lib/python3.5/asyncio/futures.py", line 293, in result
raise self._exception
File "/usr/lib/python3.5/asyncio/tasks.py", line 241, in _step
result = coro.throw(exc)
File "test.py", line 27, in main
datas_extracted = await asyncio.gather(*tasks, return_exceptions=False)
File "/usr/lib/python3.5/asyncio/futures.py", line 380, in __iter__
yield self # This tells Task to wait for completion.
File "/usr/lib/python3.5/asyncio/tasks.py", line 304, in _wakeup
future.result()
File "/usr/lib/python3.5/asyncio/futures.py", line 293, in result
raise self._exception
File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
File "test.py", line 14, in retrieve_datas
async with session.get(url) as response:
File "/usr/local/lib/python3.5/dist-packages/aiohttp/client.py", line 603, in __aenter__
self._resp = yield from self._coro
File "/usr/local/lib/python3.5/dist-packages/aiohttp/client.py", line 231, in _request
conn = yield from self._connector.connect(req)
File "/usr/local/lib/python3.5/dist-packages/aiohttp/connector.py", line 378, in connect
proto = yield from self._create_connection(req)
File "/usr/local/lib/python3.5/dist-packages/aiohttp/connector.py", line 687, in _create_connection
_, proto = yield from self._create_direct_connection(req)
File "/usr/local/lib/python3.5/dist-packages/aiohttp/connector.py", line 698, in _create_direct_connection
hosts = yield from self._resolve_host(req.url.raw_host, req.port)
File "/usr/local/lib/python3.5/dist-packages/aiohttp/connector.py", line 669, in _resolve_host
self._resolver.resolve(host, port, family=self._family)
File "/usr/local/lib/python3.5/dist-packages/aiohttp/resolver.py", line 31, in resolve
host, port, type=socket.SOCK_STREAM, family=family)
File "/usr/lib/python3.5/asyncio/base_events.py", line 673, in getaddrinfo
host, port, family, type, proto, flags)
File "/usr/lib/python3.5/asyncio/base_events.py", line 634, in run_in_executor
executor = concurrent.futures.ThreadPoolExecutor()
TypeError: __init__() missing 1 required positional argument: 'max_workers'
所以我的问题是如何修复它,但更多的是,我认为我没有以正确的方式进行异步操作。奇怪的问题是,如果我使用我的 IDE 手动迭代(逐步调试),我可以在错误引发之前执行一次迭代(接收第一个 URL 组的数据)但是如果我直接执行这个立即编写异常代码。
编辑:
如果我使用的是 python 3.6,则异常会消失...除了 asyncio.sleep(10) 未执行(???),我的代码从不休眠。如果我将 asyncio.sleep(10) 替换为 time.sleep(10) 它会起作用。我想我错过了什么。我的问题已解决,但如果我的代码对执行异步请求是正确的,如果有人可以解释为什么这种关于睡眠和全局的行为。
错误不是由 aiohttp
引起的,而是由 asyncio
本身引发的,这很奇怪,因为代码已被测试覆盖。
您使用什么 python 版本?是自定义构建吗?
关于 asyncio.sleep()
-- 在调用之前放置 await
。
我正在尝试以异步方式从一组 URL 中提取数据。我想每 10 秒(或多或少)按一组 URL 执行请求。
import aiohttp
import asyncio
from aiohttp import ClientSession
def create_list_urls():
list_urls = [["http://apiexample.com/param1", "http://apiexample2.com/param1"],
["http://apiexample.com/param2", "http://apiexample2.com/param2"]]
return list_urls
async def retrieve_datas(url, session):
async with session.get(url) as response:
return await response.json()
async def main():
while True:
urls_to_crawl = create_list_urls()
for urls in urls_to_crawl:
tasks = []
async with ClientSession() as session:
for url in urls:
tasks.append(asyncio.ensure_future(
retrieve_datas(url, session)))
datas_extracted = await asyncio.gather(*tasks, return_exceptions=False)
print(datas_extracted)
asyncio.sleep(10)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(main())
loop.run_until_complete(future)
但是我收到这个错误:
Traceback (most recent call last):
File "test.py", line 34, in <module>
loop.run_until_complete(future)
File "/usr/lib/python3.5/asyncio/base_events.py", line 466, in run_until_complete
return future.result()
File "/usr/lib/python3.5/asyncio/futures.py", line 293, in result
raise self._exception
File "/usr/lib/python3.5/asyncio/tasks.py", line 241, in _step
result = coro.throw(exc)
File "test.py", line 27, in main
datas_extracted = await asyncio.gather(*tasks, return_exceptions=False)
File "/usr/lib/python3.5/asyncio/futures.py", line 380, in __iter__
yield self # This tells Task to wait for completion.
File "/usr/lib/python3.5/asyncio/tasks.py", line 304, in _wakeup
future.result()
File "/usr/lib/python3.5/asyncio/futures.py", line 293, in result
raise self._exception
File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
File "test.py", line 14, in retrieve_datas
async with session.get(url) as response:
File "/usr/local/lib/python3.5/dist-packages/aiohttp/client.py", line 603, in __aenter__
self._resp = yield from self._coro
File "/usr/local/lib/python3.5/dist-packages/aiohttp/client.py", line 231, in _request
conn = yield from self._connector.connect(req)
File "/usr/local/lib/python3.5/dist-packages/aiohttp/connector.py", line 378, in connect
proto = yield from self._create_connection(req)
File "/usr/local/lib/python3.5/dist-packages/aiohttp/connector.py", line 687, in _create_connection
_, proto = yield from self._create_direct_connection(req)
File "/usr/local/lib/python3.5/dist-packages/aiohttp/connector.py", line 698, in _create_direct_connection
hosts = yield from self._resolve_host(req.url.raw_host, req.port)
File "/usr/local/lib/python3.5/dist-packages/aiohttp/connector.py", line 669, in _resolve_host
self._resolver.resolve(host, port, family=self._family)
File "/usr/local/lib/python3.5/dist-packages/aiohttp/resolver.py", line 31, in resolve
host, port, type=socket.SOCK_STREAM, family=family)
File "/usr/lib/python3.5/asyncio/base_events.py", line 673, in getaddrinfo
host, port, family, type, proto, flags)
File "/usr/lib/python3.5/asyncio/base_events.py", line 634, in run_in_executor
executor = concurrent.futures.ThreadPoolExecutor()
TypeError: __init__() missing 1 required positional argument: 'max_workers'
所以我的问题是如何修复它,但更多的是,我认为我没有以正确的方式进行异步操作。奇怪的问题是,如果我使用我的 IDE 手动迭代(逐步调试),我可以在错误引发之前执行一次迭代(接收第一个 URL 组的数据)但是如果我直接执行这个立即编写异常代码。
编辑:
如果我使用的是 python 3.6,则异常会消失...除了 asyncio.sleep(10) 未执行(???),我的代码从不休眠。如果我将 asyncio.sleep(10) 替换为 time.sleep(10) 它会起作用。我想我错过了什么。我的问题已解决,但如果我的代码对执行异步请求是正确的,如果有人可以解释为什么这种关于睡眠和全局的行为。
错误不是由 aiohttp
引起的,而是由 asyncio
本身引发的,这很奇怪,因为代码已被测试覆盖。
您使用什么 python 版本?是自定义构建吗?
关于 asyncio.sleep()
-- 在调用之前放置 await
。