如果在返回的结果之一中满足条件,则使用 aiohttp 每隔 0.1 秒触发一次 API 请求。提前退出函数

using aiohttp fire off an API request exactly every 0.1s, if a condition is met in one of the returned results. Exit the function early

我最近一直在努力研究 async 和 aiohttp,但收效甚微,需要一些帮助。

我想发出一个 API 请求,如发条,它们之间的时间长度固定,假设为 0.1 秒,我们假设 api 速率限制为 10 requests/second,使用 aiohttp。 当结果返回时,我想对其进行一些检查,如果成功,将提前终止该功能。

根据我在网上找到的一些示例,我构建了以下脚本,它几乎符合我的预期,只是异步 http GET 请求实际上并不是每 0.1 秒发送一次。它们似乎有点慢,比如 0.25 秒或 0.3 秒,这大约是请求所需的长度。这意味着它没有提供比 运行 串行的任何好处。 请有人指出我可以在哪里更改代码以获得所需的行为? 我还想尽可能对其进行优化,因为 GET 请求将始终请求相同的端点,并且可能会在那里进行优化,例如共享会话对象。

谢谢

import time
import datetime
import aiohttp
import asyncio

url = 'http://www.randomnumberapi.com/api/v1.0/random'

async def main():
    async with aiohttp.ClientSession() as session:

        for i in range(100):
            start = time.time()
            print(f'{i}: {datetime.datetime.now()}')
            await asyncio.sleep(0.1)
            print(f'{i}: {time.time() - start}')
            async with session.get(url) as response:

                answer_list = await response.json()
                print(f'{i}: {time.time() - start}')
                ans = answer_list[0]
                if i == 25:
                    print('early exit condition met')
                    break
        return(i)

loop = asyncio.get_event_loop()
ans = loop.run_until_complete(main())

print(ans)

哪个returns...

0: 2021-06-04 14:55:38.917310
0: 0.10055017471313477
0: 1.1943635940551758
1: 2021-06-04 14:55:40.111763
1: 0.10061764717102051
1: 0.4190835952758789
2: 2021-06-04 14:55:40.530876
2: 0.10037660598754883
2: 0.39967823028564453
3: 2021-06-04 14:55:40.930675
3: 0.10052680969238281
3: 0.4090113639831543
4: 2021-06-04 14:55:41.339719
4: 0.10062289237976074
4: 0.4102184772491455
5: 2021-06-04 14:55:41.749971
5: 0.10028338432312012
5: 0.33177995681762695
6: 2021-06-04 14:55:42.081782
6: 0.10028529167175293
6: 0.32780933380126953
7: 2021-06-04 14:55:42.409627
7: 0.10028696060180664
7: 0.3641927242279053
8: 2021-06-04 14:55:42.773969
8: 0.10053634643554688
8: 0.4099152088165283
9: 2021-06-04 14:55:43.183998
9: 0.10070633888244629
9: 0.4089639186859131
10: 2021-06-04 14:55:43.593011
10: 0.10048651695251465
10: 0.3309924602508545
11: 2021-06-04 14:55:43.924210
11: 0.1008145809173584
11: 0.38551807403564453
12: 2021-06-04 14:55:44.309783
12: 0.10041999816894531
12: 0.4093167781829834
13: 2021-06-04 14:55:44.719141
13: 0.10042858123779297
13: 0.409212589263916
14: 2021-06-04 14:55:45.128383
14: 0.10032796859741211
14: 0.5117332935333252
15: 2021-06-04 14:55:45.640148
15: 0.10029864311218262
15: 0.4099447727203369
16: 2021-06-04 14:55:46.050127
16: 0.10030388832092285
16: 0.5113239288330078
17: 2021-06-04 14:55:46.561480
17: 0.10030794143676758
17: 0.5102083683013916
18: 2021-06-04 14:55:47.071729
18: 0.10080623626708984
18: 0.5144610404968262
19: 2021-06-04 14:55:47.586245
19: 0.10080385208129883
19: 0.4096643924713135
20: 2021-06-04 14:55:47.995943
20: 0.10080790519714355
20: 0.5115323066711426
21: 2021-06-04 14:55:48.507522
21: 0.10048961639404297
21: 0.47547459602355957
22: 2021-06-04 14:55:48.983046
22: 0.10050201416015625
22: 0.4206221103668213
23: 2021-06-04 14:55:49.403701
23: 0.10030198097229004
23: 0.5344793796539307
24: 2021-06-04 14:55:49.938217
24: 0.10035061836242676
24: 0.4128684997558594
25: 2021-06-04 14:55:50.351116
25: 0.10033893585205078
25: 0.4100759029388428
early exit condition met
25

可以用BoundedSempahore来限制人数。并发 api 个请求。

您在代码中没有看到 async 行为的原因是因为您在 for 循环内 awaiting 异步函数的(session.get)响应.

在每次迭代中,在进入下一次迭代之前等待异步函数 return,这相当于按顺序调用 urls。
所有迭代并没有像您想象的那样立即开始。

如果你想运行一组任务并发并作为一个单元来管理,你可以使用asyncio.gather.

import datetime
import aiohttp
import asyncio

MAX_CONCURRENT_API_REQUESTS = 10

URL = 'http://www.randomnumberapi.com/api/v1.0/random'


async def main():
    tasks = []
    sema = asyncio.BoundedSemaphore(MAX_CONCURRENT_API_REQUESTS)
    first_required_random = None

    async with aiohttp.ClientSession() as session:

        for i in range(100):
            await asyncio.sleep(0.1)
            tasks.append(asyncio.create_task(async_call_api(sema, session, i)))

        try:
            await asyncio.gather(*tasks, return_exceptions=False)
        except MyRandomNumber as err:
            if first_required_random is None:
                first_required_random = err.args[0]

            for t in tasks:
                try:
                    t.cancel()
                except asyncio.CancelledError:
                    pass

            return first_required_random
        else:
            print('Required condition not satisfied')

async def async_call_api(sema, session, i):
    print(f'{i}: Start {datetime.datetime.now()}')
    async with sema:
        response = await session.get(URL)
        answer_list = await response.json()
        print(f'{i}: End {datetime.datetime.now()}')
        ans = answer_list[0]
        print(f'Ans: {ans}')
        if ans == 25:
            raise MyRandomNumber(ans)


class MyRandomNumber(Exception):
    pass

loop = asyncio.new_event_loop()
ans = loop.run_until_complete(main())

通过在附加到任务列表之前添加 await asyncio.sleep(0.1) 将确保发出请求 至少 相隔 0.1 秒。

我更新了代码以在找到所需数量时取消剩余任务(url 请求)。不幸的是,这不是 100% 可靠,因为在代码忙于取消任务的那几毫秒内,一些请求可能已经 return 编辑了结果。尽管如此,多亏了 first_required_random.

,您总是会获得第一名。