使用 asyncio 进行多次调用并将结果添加到字典中
Making multiple calls with asyncio and adding result to a dictionary
我无法围绕 Python 3 的 Asyncio 库展开工作。我有一个邮政编码列表,我正在尝试对 API 进行异步调用以获取每个邮政编码对应的城市和州。我可以使用 for 循环按顺序成功完成,但我想在邮政编码列表较大的情况下使其更快。
这是我原创作品的一个例子
import urllib.request, json
zips = ['90210', '60647']
def get_cities(zipcodes):
zip_cities = dict()
for idx, zipcode in enumerate(zipcodes):
url = 'http://maps.googleapis.com/maps/api/geocode/json?address='+zipcode+'&sensor=true'
response = urllib.request.urlopen(url)
string = response.read().decode('utf-8')
data = json.loads(string)
city = data['results'][0]['address_components'][1]['long_name']
state = data['results'][0]['address_components'][3]['long_name']
zip_cities.update({idx: [zipcode, city, state]})
return zip_cities
results = get_cities(zips)
print(results)
# returns {0: ['90210', 'Beverly Hills', 'California'],
# 1: ['60647', 'Chicago', 'Illinois']}
这是我尝试使其异步的可怕的非功能性尝试
import asyncio
import urllib.request, json
zips = ['90210', '60647']
zip_cities = dict()
@asyncio.coroutine
def get_cities(zipcodes):
url = 'http://maps.googleapis.com/maps/api/geocode/json?address='+zipcode+'&sensor=true'
response = urllib.request.urlopen(url)
string = response.read().decode('utf-8')
data = json.loads(string)
city = data['results'][0]['address_components'][1]['long_name']
state = data['results'][0]['address_components'][3]['long_name']
zip_cities.update({idx: [zipcode, city, state]})
loop = asyncio.get_event_loop()
loop.run_until_complete([get_cities(zip) for zip in zips])
loop.close()
print(zip_cities) # doesnt work
非常感谢任何帮助。我在网上看到的所有教程似乎都让我有些头疼。
注意:我看到一些示例使用 aiohttp
。如果可能的话,我希望坚持使用原生 Python 3 个库。
没有对 asyncio 做太多,但 asyncio.get_event_loop()
应该是你所需要的,你显然还必须更改你的函数作为参数的内容并根据 docs 使用 asyncio.wait(tasks)
:
zips = ['90210', '60647']
zip_cities = dict()
@asyncio.coroutine
def get_cities(zipcode):
url = 'https://maps.googleapis.com/maps/api/geocode/json?key=abcdefg&address='+zipcode+'&sensor=true'
fut = loop.run_in_executor(None,urllib.request.urlopen, url)
response = yield from fut
string = response.read().decode('utf-8')
data = json.loads(string)
city = data['results'][0]['address_components'][1]['long_name']
state = data['results'][0]['address_components'][3]['long_name']
zip_cities.update({idx: [zipcode, city, state]})
loop = asyncio.get_event_loop()
tasks = [asyncio.async(get_cities(z, i)) for i, z in enumerate(zips)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
print(zip_cities) # doesnt work
{0: ['90210', 'Beverly Hills', 'California'], 1: ['60647', 'Chicago', 'Illinois']}
我没有 >= 3.4.4 所以我不得不使用 asyncio.async
而不是 asyncio.ensure_future
或者更改逻辑并从 task.result 从任务创建字典:
@asyncio.coroutine
def get_cities(zipcode):
url = 'https://maps.googleapis.com/maps/api/geocode/json?key=abcdefg&address='+zipcode+'&sensor=true'
fut = loop.run_in_executor(None,urllib.request.urlopen, url)
response = yield from fut
string = response.read().decode('utf-8')
data = json.loads(string)
city = data['results'][0]['address_components'][1]['long_name']
state = data['results'][0]['address_components'][3]['long_name']
return [zipcode, city, state]
loop = asyncio.get_event_loop()
tasks = [asyncio.async(get_cities(z)) for z in zips]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
zip_cities = {i:tsk.result() for i,tsk in enumerate(tasks)}
print(zip_cities)
{0: ['90210', 'Beverly Hills', 'California'], 1: ['60647', 'Chicago', 'Illinois']}
如果您正在查看外部模块,还有一个 port of requests 可与 asyncio 配合使用。
如果您使用 urllib
执行 HTTP 请求,您将无法获得任何并发,因为它是一个同步库。将调用 urllib
的函数包装在 coroutine
中不会改变这一点。您必须使用集成到 asyncio
中的异步 HTTP 客户端,例如 aiohttp
:
import asyncio
import json
import aiohttp
zips = ['90210', '60647']
zip_cities = dict()
@asyncio.coroutine
def get_cities(zipcode,idx):
url = 'https://maps.googleapis.com/maps/api/geocode/json?key=abcdfg&address='+zipcode+'&sensor=true'
response = yield from aiohttp.request('get', url)
string = (yield from response.read()).decode('utf-8')
data = json.loads(string)
print(data)
city = data['results'][0]['address_components'][1]['long_name']
state = data['results'][0]['address_components'][3]['long_name']
zip_cities.update({idx: [zipcode, city, state]})
if __name__ == "__main__":
loop = asyncio.get_event_loop()
tasks = [asyncio.async(get_cities(z, i)) for i, z in enumerate(zips)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
print(zip_cities)
我知道您更喜欢只使用 stdlib,但是 asyncio
库不包含 HTTP 客户端,因此您基本上必须重新实现 aiohttp
的部分才能重新创建它提供的功能。我想另一种选择是在后台线程中进行 urllib
调用,这样它们就不会阻塞事件循环,但是当 aiohttp
可用时(有点像首先违背了使用 asyncio
的目的):
import asyncio
import json
import urllib.request
from concurrent.futures import ThreadPoolExecutor
zips = ['90210', '60647']
zip_cities = dict()
@asyncio.coroutine
def get_cities(zipcode,idx):
url = 'https://maps.googleapis.com/maps/api/geocode/json?key=abcdfg&address='+zipcode+'&sensor=true'
response = yield from loop.run_in_executor(executor, urllib.request.urlopen, url)
string = response.read().decode('utf-8')
data = json.loads(string)
print(data)
city = data['results'][0]['address_components'][1]['long_name']
state = data['results'][0]['address_components'][3]['long_name']
zip_cities.update({idx: [zipcode, city, state]})
if __name__ == "__main__":
executor = ThreadPoolExecutor(10)
loop = asyncio.get_event_loop()
tasks = [asyncio.async(get_cities(z, i)) for i, z in enumerate(zips)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
print(zip_cities)
我无法围绕 Python 3 的 Asyncio 库展开工作。我有一个邮政编码列表,我正在尝试对 API 进行异步调用以获取每个邮政编码对应的城市和州。我可以使用 for 循环按顺序成功完成,但我想在邮政编码列表较大的情况下使其更快。
这是我原创作品的一个例子
import urllib.request, json
zips = ['90210', '60647']
def get_cities(zipcodes):
zip_cities = dict()
for idx, zipcode in enumerate(zipcodes):
url = 'http://maps.googleapis.com/maps/api/geocode/json?address='+zipcode+'&sensor=true'
response = urllib.request.urlopen(url)
string = response.read().decode('utf-8')
data = json.loads(string)
city = data['results'][0]['address_components'][1]['long_name']
state = data['results'][0]['address_components'][3]['long_name']
zip_cities.update({idx: [zipcode, city, state]})
return zip_cities
results = get_cities(zips)
print(results)
# returns {0: ['90210', 'Beverly Hills', 'California'],
# 1: ['60647', 'Chicago', 'Illinois']}
这是我尝试使其异步的可怕的非功能性尝试
import asyncio
import urllib.request, json
zips = ['90210', '60647']
zip_cities = dict()
@asyncio.coroutine
def get_cities(zipcodes):
url = 'http://maps.googleapis.com/maps/api/geocode/json?address='+zipcode+'&sensor=true'
response = urllib.request.urlopen(url)
string = response.read().decode('utf-8')
data = json.loads(string)
city = data['results'][0]['address_components'][1]['long_name']
state = data['results'][0]['address_components'][3]['long_name']
zip_cities.update({idx: [zipcode, city, state]})
loop = asyncio.get_event_loop()
loop.run_until_complete([get_cities(zip) for zip in zips])
loop.close()
print(zip_cities) # doesnt work
非常感谢任何帮助。我在网上看到的所有教程似乎都让我有些头疼。
注意:我看到一些示例使用 aiohttp
。如果可能的话,我希望坚持使用原生 Python 3 个库。
没有对 asyncio 做太多,但 asyncio.get_event_loop()
应该是你所需要的,你显然还必须更改你的函数作为参数的内容并根据 docs 使用 asyncio.wait(tasks)
:
zips = ['90210', '60647']
zip_cities = dict()
@asyncio.coroutine
def get_cities(zipcode):
url = 'https://maps.googleapis.com/maps/api/geocode/json?key=abcdefg&address='+zipcode+'&sensor=true'
fut = loop.run_in_executor(None,urllib.request.urlopen, url)
response = yield from fut
string = response.read().decode('utf-8')
data = json.loads(string)
city = data['results'][0]['address_components'][1]['long_name']
state = data['results'][0]['address_components'][3]['long_name']
zip_cities.update({idx: [zipcode, city, state]})
loop = asyncio.get_event_loop()
tasks = [asyncio.async(get_cities(z, i)) for i, z in enumerate(zips)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
print(zip_cities) # doesnt work
{0: ['90210', 'Beverly Hills', 'California'], 1: ['60647', 'Chicago', 'Illinois']}
我没有 >= 3.4.4 所以我不得不使用 asyncio.async
而不是 asyncio.ensure_future
或者更改逻辑并从 task.result 从任务创建字典:
@asyncio.coroutine
def get_cities(zipcode):
url = 'https://maps.googleapis.com/maps/api/geocode/json?key=abcdefg&address='+zipcode+'&sensor=true'
fut = loop.run_in_executor(None,urllib.request.urlopen, url)
response = yield from fut
string = response.read().decode('utf-8')
data = json.loads(string)
city = data['results'][0]['address_components'][1]['long_name']
state = data['results'][0]['address_components'][3]['long_name']
return [zipcode, city, state]
loop = asyncio.get_event_loop()
tasks = [asyncio.async(get_cities(z)) for z in zips]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
zip_cities = {i:tsk.result() for i,tsk in enumerate(tasks)}
print(zip_cities)
{0: ['90210', 'Beverly Hills', 'California'], 1: ['60647', 'Chicago', 'Illinois']}
如果您正在查看外部模块,还有一个 port of requests 可与 asyncio 配合使用。
如果您使用 urllib
执行 HTTP 请求,您将无法获得任何并发,因为它是一个同步库。将调用 urllib
的函数包装在 coroutine
中不会改变这一点。您必须使用集成到 asyncio
中的异步 HTTP 客户端,例如 aiohttp
:
import asyncio
import json
import aiohttp
zips = ['90210', '60647']
zip_cities = dict()
@asyncio.coroutine
def get_cities(zipcode,idx):
url = 'https://maps.googleapis.com/maps/api/geocode/json?key=abcdfg&address='+zipcode+'&sensor=true'
response = yield from aiohttp.request('get', url)
string = (yield from response.read()).decode('utf-8')
data = json.loads(string)
print(data)
city = data['results'][0]['address_components'][1]['long_name']
state = data['results'][0]['address_components'][3]['long_name']
zip_cities.update({idx: [zipcode, city, state]})
if __name__ == "__main__":
loop = asyncio.get_event_loop()
tasks = [asyncio.async(get_cities(z, i)) for i, z in enumerate(zips)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
print(zip_cities)
我知道您更喜欢只使用 stdlib,但是 asyncio
库不包含 HTTP 客户端,因此您基本上必须重新实现 aiohttp
的部分才能重新创建它提供的功能。我想另一种选择是在后台线程中进行 urllib
调用,这样它们就不会阻塞事件循环,但是当 aiohttp
可用时(有点像首先违背了使用 asyncio
的目的):
import asyncio
import json
import urllib.request
from concurrent.futures import ThreadPoolExecutor
zips = ['90210', '60647']
zip_cities = dict()
@asyncio.coroutine
def get_cities(zipcode,idx):
url = 'https://maps.googleapis.com/maps/api/geocode/json?key=abcdfg&address='+zipcode+'&sensor=true'
response = yield from loop.run_in_executor(executor, urllib.request.urlopen, url)
string = response.read().decode('utf-8')
data = json.loads(string)
print(data)
city = data['results'][0]['address_components'][1]['long_name']
state = data['results'][0]['address_components'][3]['long_name']
zip_cities.update({idx: [zipcode, city, state]})
if __name__ == "__main__":
executor = ThreadPoolExecutor(10)
loop = asyncio.get_event_loop()
tasks = [asyncio.async(get_cities(z, i)) for i, z in enumerate(zips)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
print(zip_cities)