异步http客户端无需等待整个队列
Asynchronous http client without waiting for the whole queue
有一个任务要处理多个长 http 请求(20-30 秒)。一旦收到回复或发生超时,则必须重新执行请求,而无需等待其余部分。
我的代码:
enter code here
async def get(key):
url = f "https://mysite?key={key}"
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=20) as response:
return await response.json()
coroutines = [get(key1), get(key1), get(key3)]
async def main():
for task in asyncio.as_completed(coroutines):
resp = await task
print(f'resp: {resp}')
当其中一个任务完成而不停止其中的其他任务时,我能否以某种方式更新 asyncio.as_completed(coroutines)
生成器的内容?
因此,我想要一个无限循环,在每个请求完成后立即重复。
协程列表的内容不是常量。任何时候都可能出现新的密钥,或者现有的密钥可能会过时。
我可能选择了错误的方法。
是的,您可能选择了错误的方式 - 使用 Consumer-Producer 模式设计它更容易。
您在 resp = await task
所做的基本上可以被侦听传入数据的单独任务所取代。
并且生产者将是多个正在监听个人 URL 的任务,while
循环保持每个任务 运行 non-stop.
连接这两类任务,常用Queue
。
强烈建议阅读Official document,我知道这确实是一项无聊的任务!但会在 long-run.
帮助你
client.py
"""
Demo codes for
"""
import asyncio
from typing import Dict
import aiohttp
async def process_response_queue(queue: asyncio.Queue):
"""
Get json response data from queue.
Effectively consumer.
Args:
queue: Queue for receiving url & json response pair
"""
print("Processor started")
while True:
url_from, data = await queue.get()
# do what you want here
print(f"Received {data} from {url_from}")
class TaskManager:
"""
Manage data fetching tasks
"""
def __init__(self):
self.queue = asyncio.Queue()
self.tasks: Dict[str, asyncio.Task] = {}
async def get_repeat(self, url, timeout=20):
"""
Repeatedly fetch json response from given url and put into queue.
Effectively producer.
Args:
url: URL to fetch from
timeout: Time until timeout
"""
print(f"Task for {url} started")
try:
async with aiohttp.ClientSession() as session:
while True:
async with session.get(url, timeout=timeout) as resp:
await self.queue.put((url, await resp.json()))
finally:
del self.tasks[url]
print(f"Task for {url} canceled")
def start_processor(self):
"""
Starts the processor.
"""
self.tasks["_processor"] = asyncio.create_task(process_response_queue(self.queue))
def start_new_task(self, url):
"""
Create new task from url.
Args:
url: URL to fetch from.
"""
self.tasks[url] = asyncio.create_task(self.get_repeat(url))
def stop_task(self, url):
"""
Stop existing task associated with url.
Args:
url: URL associated with task.
Raises:
KeyError: If no task associated with given url exists.
"""
self.tasks[url].cancel()
def close(self):
"""
Cancels all tasks
"""
for task in self.tasks.values():
task.cancel()
async def main():
"""
Starter code
"""
task_manager = TaskManager()
task_manager.start_processor()
for n in range(5):
task_manager.start_new_task(f"http://127.0.0.1:5000/json?key={n}")
# wait 10 sec
await asyncio.sleep(10)
# cancel 1 task
task_manager.stop_task("http://127.0.0.1:5000/json?key=3")
# wait 20 sec
await asyncio.sleep(20)
# stop all
task_manager.close()
if __name__ == '__main__':
asyncio.run(main())
server.py
"""
Demo codes for
"""
import trio
from quart import request, jsonify
from quart_trio import QuartTrio
app = QuartTrio("Very named Much app")
@app.get("/json")
async def send_json():
"""
Sleeps 5 + n seconds before returning response.
Returns:
json response
"""
key = int(request.args["key"])
await trio.sleep(5 + key)
return jsonify({"key": key})
trio.run(app.run_task)
我只是将 trio 用于服务器部分,因为我喜欢 Trio - 仅以服务器部分为例,因为您似乎拥有自己的服务器。
示例输出:
Task for http://127.0.0.1:5000/json?key=0 started
Task for http://127.0.0.1:5000/json?key=1 started
Task for http://127.0.0.1:5000/json?key=2 started
Task for http://127.0.0.1:5000/json?key=3 started
Task for http://127.0.0.1:5000/json?key=4 started
Received {'key': 0} from http://127.0.0.1:5000/json?key=0
Received {'key': 1} from http://127.0.0.1:5000/json?key=1
Received {'key': 2} from http://127.0.0.1:5000/json?key=2
Received {'key': 3} from http://127.0.0.1:5000/json?key=3
Received {'key': 4} from http://127.0.0.1:5000/json?key=4
Task for http://127.0.0.1:5000/json?key=3 canceled
Received {'key': 0} from http://127.0.0.1:5000/json?key=0
Received {'key': 1} from http://127.0.0.1:5000/json?key=1
Received {'key': 2} from http://127.0.0.1:5000/json?key=2
Received {'key': 0} from http://127.0.0.1:5000/json?key=0
Received {'key': 4} from http://127.0.0.1:5000/json?key=4
Received {'key': 1} from http://127.0.0.1:5000/json?key=1
Received {'key': 0} from http://127.0.0.1:5000/json?key=0
Received {'key': 2} from http://127.0.0.1:5000/json?key=2
Received {'key': 1} from http://127.0.0.1:5000/json?key=1
Received {'key': 0} from http://127.0.0.1:5000/json?key=0
Received {'key': 4} from http://127.0.0.1:5000/json?key=4
Received {'key': 2} from http://127.0.0.1:5000/json?key=2
Task for http://127.0.0.1:5000/json?key=0 canceled
Task for http://127.0.0.1:5000/json?key=1 canceled
Task for http://127.0.0.1:5000/json?key=2 canceled
Task for http://127.0.0.1:5000/json?key=4 canceled
Process finished with exit code 0
如果您还没有,请随时查看 Stack Overflow Tour!
有一个任务要处理多个长 http 请求(20-30 秒)。一旦收到回复或发生超时,则必须重新执行请求,而无需等待其余部分。
我的代码:
enter code here
async def get(key):
url = f "https://mysite?key={key}"
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=20) as response:
return await response.json()
coroutines = [get(key1), get(key1), get(key3)]
async def main():
for task in asyncio.as_completed(coroutines):
resp = await task
print(f'resp: {resp}')
当其中一个任务完成而不停止其中的其他任务时,我能否以某种方式更新 asyncio.as_completed(coroutines)
生成器的内容?
因此,我想要一个无限循环,在每个请求完成后立即重复。 协程列表的内容不是常量。任何时候都可能出现新的密钥,或者现有的密钥可能会过时。 我可能选择了错误的方法。
是的,您可能选择了错误的方式 - 使用 Consumer-Producer 模式设计它更容易。
您在 resp = await task
所做的基本上可以被侦听传入数据的单独任务所取代。
并且生产者将是多个正在监听个人 URL 的任务,while
循环保持每个任务 运行 non-stop.
连接这两类任务,常用Queue
。
强烈建议阅读Official document,我知道这确实是一项无聊的任务!但会在 long-run.
帮助你client.py
"""
Demo codes for
"""
import asyncio
from typing import Dict
import aiohttp
async def process_response_queue(queue: asyncio.Queue):
"""
Get json response data from queue.
Effectively consumer.
Args:
queue: Queue for receiving url & json response pair
"""
print("Processor started")
while True:
url_from, data = await queue.get()
# do what you want here
print(f"Received {data} from {url_from}")
class TaskManager:
"""
Manage data fetching tasks
"""
def __init__(self):
self.queue = asyncio.Queue()
self.tasks: Dict[str, asyncio.Task] = {}
async def get_repeat(self, url, timeout=20):
"""
Repeatedly fetch json response from given url and put into queue.
Effectively producer.
Args:
url: URL to fetch from
timeout: Time until timeout
"""
print(f"Task for {url} started")
try:
async with aiohttp.ClientSession() as session:
while True:
async with session.get(url, timeout=timeout) as resp:
await self.queue.put((url, await resp.json()))
finally:
del self.tasks[url]
print(f"Task for {url} canceled")
def start_processor(self):
"""
Starts the processor.
"""
self.tasks["_processor"] = asyncio.create_task(process_response_queue(self.queue))
def start_new_task(self, url):
"""
Create new task from url.
Args:
url: URL to fetch from.
"""
self.tasks[url] = asyncio.create_task(self.get_repeat(url))
def stop_task(self, url):
"""
Stop existing task associated with url.
Args:
url: URL associated with task.
Raises:
KeyError: If no task associated with given url exists.
"""
self.tasks[url].cancel()
def close(self):
"""
Cancels all tasks
"""
for task in self.tasks.values():
task.cancel()
async def main():
"""
Starter code
"""
task_manager = TaskManager()
task_manager.start_processor()
for n in range(5):
task_manager.start_new_task(f"http://127.0.0.1:5000/json?key={n}")
# wait 10 sec
await asyncio.sleep(10)
# cancel 1 task
task_manager.stop_task("http://127.0.0.1:5000/json?key=3")
# wait 20 sec
await asyncio.sleep(20)
# stop all
task_manager.close()
if __name__ == '__main__':
asyncio.run(main())
server.py
"""
Demo codes for
"""
import trio
from quart import request, jsonify
from quart_trio import QuartTrio
app = QuartTrio("Very named Much app")
@app.get("/json")
async def send_json():
"""
Sleeps 5 + n seconds before returning response.
Returns:
json response
"""
key = int(request.args["key"])
await trio.sleep(5 + key)
return jsonify({"key": key})
trio.run(app.run_task)
我只是将 trio 用于服务器部分,因为我喜欢 Trio - 仅以服务器部分为例,因为您似乎拥有自己的服务器。
示例输出:
Task for http://127.0.0.1:5000/json?key=0 started
Task for http://127.0.0.1:5000/json?key=1 started
Task for http://127.0.0.1:5000/json?key=2 started
Task for http://127.0.0.1:5000/json?key=3 started
Task for http://127.0.0.1:5000/json?key=4 started
Received {'key': 0} from http://127.0.0.1:5000/json?key=0
Received {'key': 1} from http://127.0.0.1:5000/json?key=1
Received {'key': 2} from http://127.0.0.1:5000/json?key=2
Received {'key': 3} from http://127.0.0.1:5000/json?key=3
Received {'key': 4} from http://127.0.0.1:5000/json?key=4
Task for http://127.0.0.1:5000/json?key=3 canceled
Received {'key': 0} from http://127.0.0.1:5000/json?key=0
Received {'key': 1} from http://127.0.0.1:5000/json?key=1
Received {'key': 2} from http://127.0.0.1:5000/json?key=2
Received {'key': 0} from http://127.0.0.1:5000/json?key=0
Received {'key': 4} from http://127.0.0.1:5000/json?key=4
Received {'key': 1} from http://127.0.0.1:5000/json?key=1
Received {'key': 0} from http://127.0.0.1:5000/json?key=0
Received {'key': 2} from http://127.0.0.1:5000/json?key=2
Received {'key': 1} from http://127.0.0.1:5000/json?key=1
Received {'key': 0} from http://127.0.0.1:5000/json?key=0
Received {'key': 4} from http://127.0.0.1:5000/json?key=4
Received {'key': 2} from http://127.0.0.1:5000/json?key=2
Task for http://127.0.0.1:5000/json?key=0 canceled
Task for http://127.0.0.1:5000/json?key=1 canceled
Task for http://127.0.0.1:5000/json?key=2 canceled
Task for http://127.0.0.1:5000/json?key=4 canceled
Process finished with exit code 0
如果您还没有,请随时查看 Stack Overflow Tour!