异步http客户端无需等待整个队列

Asynchronous http client without waiting for the whole queue

有一个任务要处理多个长 http 请求(20-30 秒)。一旦收到回复或发生超时,则必须重新执行请求,而无需等待其余部分。

我的代码:

enter code here

async def get(key):
    url = f "https://mysite?key={key}"
    async with aiohttp.ClientSession() as session:
        async with session.get(url, timeout=20) as response:
            return await response.json()

coroutines = [get(key1), get(key1), get(key3)]

async def main():
    for task in asyncio.as_completed(coroutines):
        resp = await task
        print(f'resp: {resp}')

当其中一个任务完成而不停止其中的其他任务时,我能否以某种方式更新 asyncio.as_completed(coroutines) 生成器的内容?

因此,我想要一个无限循环,在每个请求完成后立即重复。 协程列表的内容不是常量。任何时候都可能出现新的密钥,或者现有的密钥可能会过时。 我可能选择了错误的方法。

是的,您可能选择了错误的方式 - 使用 Consumer-Producer 模式设计它更容易。

您在 resp = await task 所做的基本上可以被侦听传入数据的单独任务所取代。

并且生产者将是多个正在监听个人 URL 的任务,while 循环保持每个任务 运行 non-stop.

连接这两类任务,常用Queue

强烈建议阅读Official document,我知道这确实是一项无聊的任务!但会在 long-run.

帮助你

client.py

"""
Demo codes for 
"""


import asyncio
from typing import Dict

import aiohttp


async def process_response_queue(queue: asyncio.Queue):
    """
    Get json response data from queue.
    Effectively consumer.

    Args:
        queue: Queue for receiving url & json response pair
    """
    print("Processor started")

    while True:
        url_from, data = await queue.get()

        # do what you want here
        print(f"Received {data} from {url_from}")


class TaskManager:
    """
    Manage data fetching tasks
    """

    def __init__(self):
        self.queue = asyncio.Queue()
        self.tasks: Dict[str, asyncio.Task] = {}

    async def get_repeat(self, url, timeout=20):
        """
        Repeatedly fetch json response from given url and put into queue.
        Effectively producer.

        Args:
            url: URL to fetch from
            timeout: Time until timeout
        """
        print(f"Task for {url} started")

        try:
            async with aiohttp.ClientSession() as session:
                while True:
                    async with session.get(url, timeout=timeout) as resp:
                        await self.queue.put((url, await resp.json()))
        finally:
            del self.tasks[url]
            print(f"Task for {url} canceled")

    def start_processor(self):
        """
        Starts the processor.
        """
        self.tasks["_processor"] = asyncio.create_task(process_response_queue(self.queue))

    def start_new_task(self, url):
        """
        Create new task from url.

        Args:
            url: URL to fetch from.
        """
        self.tasks[url] = asyncio.create_task(self.get_repeat(url))

    def stop_task(self, url):
        """
        Stop existing task associated with url.

        Args:
            url: URL associated with task.

        Raises:
            KeyError: If no task associated with given url exists.
        """
        self.tasks[url].cancel()

    def close(self):
        """
        Cancels all tasks
        """

        for task in self.tasks.values():
            task.cancel()


async def main():
    """
    Starter code
    """

    task_manager = TaskManager()

    task_manager.start_processor()

    for n in range(5):
        task_manager.start_new_task(f"http://127.0.0.1:5000/json?key={n}")

    # wait 10 sec
    await asyncio.sleep(10)

    # cancel 1 task
    task_manager.stop_task("http://127.0.0.1:5000/json?key=3")

    # wait 20 sec
    await asyncio.sleep(20)

    # stop all
    task_manager.close()


if __name__ == '__main__':
    asyncio.run(main())

server.py

"""
Demo codes for 
"""

import trio
from quart import request, jsonify
from quart_trio import QuartTrio


app = QuartTrio("Very named Much app")


@app.get("/json")
async def send_json():
    """
    Sleeps 5 + n seconds before returning response.

    Returns:
        json response
    """
    key = int(request.args["key"])

    await trio.sleep(5 + key)
    return jsonify({"key": key})


trio.run(app.run_task)

我只是将 trio 用于服务器部分,因为我喜欢 Trio - 仅以服务器部分为例,因为您似乎拥有自己的服务器。


示例输出:

Task for http://127.0.0.1:5000/json?key=0 started
Task for http://127.0.0.1:5000/json?key=1 started
Task for http://127.0.0.1:5000/json?key=2 started
Task for http://127.0.0.1:5000/json?key=3 started
Task for http://127.0.0.1:5000/json?key=4 started
Received {'key': 0} from http://127.0.0.1:5000/json?key=0
Received {'key': 1} from http://127.0.0.1:5000/json?key=1
Received {'key': 2} from http://127.0.0.1:5000/json?key=2
Received {'key': 3} from http://127.0.0.1:5000/json?key=3
Received {'key': 4} from http://127.0.0.1:5000/json?key=4
Task for http://127.0.0.1:5000/json?key=3 canceled
Received {'key': 0} from http://127.0.0.1:5000/json?key=0
Received {'key': 1} from http://127.0.0.1:5000/json?key=1
Received {'key': 2} from http://127.0.0.1:5000/json?key=2
Received {'key': 0} from http://127.0.0.1:5000/json?key=0
Received {'key': 4} from http://127.0.0.1:5000/json?key=4
Received {'key': 1} from http://127.0.0.1:5000/json?key=1
Received {'key': 0} from http://127.0.0.1:5000/json?key=0
Received {'key': 2} from http://127.0.0.1:5000/json?key=2
Received {'key': 1} from http://127.0.0.1:5000/json?key=1
Received {'key': 0} from http://127.0.0.1:5000/json?key=0
Received {'key': 4} from http://127.0.0.1:5000/json?key=4
Received {'key': 2} from http://127.0.0.1:5000/json?key=2
Task for http://127.0.0.1:5000/json?key=0 canceled
Task for http://127.0.0.1:5000/json?key=1 canceled
Task for http://127.0.0.1:5000/json?key=2 canceled
Task for http://127.0.0.1:5000/json?key=4 canceled

Process finished with exit code 0

如果您还没有,请随时查看 Stack Overflow Tour