(Python) 如何在带累加器的 while 循环中应用 asyncio?
(Python) How can I apply asyncio in while loop with accumulator?
我有一段代码可以很好地从 API 对特定站点的请求中获取数据。问题是该站点每次调用只给我 50 个对象的限制,我必须进行多次调用。结果,我完成抓取工作的时间太长了(有时我要等将近20分钟)。这是我的代码:
import concurrent.futures
import requests
supply = 3000
offset = 0
token_ids = []
while offset < supply:
url = "url_1" + str(offset)
response = requests.request("GET", url)
a = response.json()
assets = a["assets"]
def get_token_ids(an):
if str(an['sell_orders']) == 'None' and str(an['last_sale']) == 'None' and str(an['num_sales']) == '0':
token_ids.append(str(an['token_id']))
with concurrent.futures.ThreadPoolExecutor() as executor:
results = [executor.submit(get_token_ids, asset) for asset in assets]
offset += 50
print(token_ids)
问题是代码 运行 通过并等待所有操作完成后再发出另一个请求。我正在考虑一个改进,当发送请求时,偏移值被添加,并且循环处理到另一个请求,因此我不必等待。我不知道怎么做,我研究了'asyncio',但这对我来说仍然是一个挑战。谁能帮我解决这个问题?
问题是 Requests 不是异步代码,所以它的每个网络调用都会阻塞循环直到完成。
https://docs.python-requests.org/en/latest/user/advanced/#blocking-or-non-blocking
因此,最好尝试异步库,例如aiohttp:
https://github.com/aio-libs/aiohttp
例子
为所有连接创建会话:
async with aiohttp.ClientSession() as session:
和 运行 所有需要的请求:
results = await asyncio.gather(
*[get_data(session, offset) for offset in range(0, supply, step)]
)
这里请求是异步执行的,session.get(url)
只得到响应头,内容得到await response.json()
:
async with session.get(url) as response:
a = await response.json()
并且在主块中主循环开始:
loop = asyncio.get_event_loop()
token_ids = loop.run_until_complete(main())
loop.close()
完整代码
import aiohttp
import asyncio
async def get_data(session, offset):
token_ids = []
url = "url_1" + str(offset)
async with session.get(url) as response:
# For tests:
# print("Status:", response.status)
# print("Content-type:", response.headers['content-type'])
a = await response.json()
assets = a["assets"]
for asset in assets:
if str(asset['sell_orders']) == 'None' and str(asset['last_sale']) == 'None' and str(asset['num_sales']) == '0':
token_ids.append(str(asset['token_id']))
return token_ids
async def main():
supply = 3000
step = 50
token_ids = []
# Create session for all connections and pass it to "get" function
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(
*[get_data(session, offset) for offset in range(0, supply, step)]
)
for ids in results:
token_ids.extend(ids)
return token_ids
if __name__ == "__main__":
# asynchronous code start here
loop = asyncio.get_event_loop()
token_ids = loop.run_until_complete(main())
loop.close()
# asynchronous code end here
print(token_ids)
我有一段代码可以很好地从 API 对特定站点的请求中获取数据。问题是该站点每次调用只给我 50 个对象的限制,我必须进行多次调用。结果,我完成抓取工作的时间太长了(有时我要等将近20分钟)。这是我的代码:
import concurrent.futures
import requests
supply = 3000
offset = 0
token_ids = []
while offset < supply:
url = "url_1" + str(offset)
response = requests.request("GET", url)
a = response.json()
assets = a["assets"]
def get_token_ids(an):
if str(an['sell_orders']) == 'None' and str(an['last_sale']) == 'None' and str(an['num_sales']) == '0':
token_ids.append(str(an['token_id']))
with concurrent.futures.ThreadPoolExecutor() as executor:
results = [executor.submit(get_token_ids, asset) for asset in assets]
offset += 50
print(token_ids)
问题是代码 运行 通过并等待所有操作完成后再发出另一个请求。我正在考虑一个改进,当发送请求时,偏移值被添加,并且循环处理到另一个请求,因此我不必等待。我不知道怎么做,我研究了'asyncio',但这对我来说仍然是一个挑战。谁能帮我解决这个问题?
问题是 Requests 不是异步代码,所以它的每个网络调用都会阻塞循环直到完成。
https://docs.python-requests.org/en/latest/user/advanced/#blocking-or-non-blocking
因此,最好尝试异步库,例如aiohttp:
https://github.com/aio-libs/aiohttp
例子
为所有连接创建会话:
async with aiohttp.ClientSession() as session:
和 运行 所有需要的请求:
results = await asyncio.gather(
*[get_data(session, offset) for offset in range(0, supply, step)]
)
这里请求是异步执行的,session.get(url)
只得到响应头,内容得到await response.json()
:
async with session.get(url) as response:
a = await response.json()
并且在主块中主循环开始:
loop = asyncio.get_event_loop()
token_ids = loop.run_until_complete(main())
loop.close()
完整代码
import aiohttp
import asyncio
async def get_data(session, offset):
token_ids = []
url = "url_1" + str(offset)
async with session.get(url) as response:
# For tests:
# print("Status:", response.status)
# print("Content-type:", response.headers['content-type'])
a = await response.json()
assets = a["assets"]
for asset in assets:
if str(asset['sell_orders']) == 'None' and str(asset['last_sale']) == 'None' and str(asset['num_sales']) == '0':
token_ids.append(str(asset['token_id']))
return token_ids
async def main():
supply = 3000
step = 50
token_ids = []
# Create session for all connections and pass it to "get" function
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(
*[get_data(session, offset) for offset in range(0, supply, step)]
)
for ids in results:
token_ids.extend(ids)
return token_ids
if __name__ == "__main__":
# asynchronous code start here
loop = asyncio.get_event_loop()
token_ids = loop.run_until_complete(main())
loop.close()
# asynchronous code end here
print(token_ids)