关于aiohttp和asyncio的抓狂现象

Maddening phenomenon about aiohttp and asyncio

下面我用两个例子来说明这个现象:

示例 1:

import asyncio
import time
import sys
import aiohttp
import threading

async def request():
    print("enter function")
    timeout = aiohttp.ClientTimeout(total = 5)
    async with aiohttp.ClientSession(timeout= timeout) as session:
        async with session.get("A url that is unreachable on the network") as rsp:
            if rsp.status != 200:
                print("error")
            print("ok")
asyncio.run(request())

示例 2:

import asyncio
import time
import sys
import aiohttp
import threading

def _thread_running_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()
    print("loop exit.")

async def request():
    print("enter function")
    timeout = aiohttp.ClientTimeout(total = 5)
    async with aiohttp.ClientSession(timeout= timeout) as session:
        async with session.get("A url that is unreachable on the network") as rsp:
            if rsp.status != 200:
                print("error")
            print("ok")

loop = asyncio.new_event_loop()
thread = threading.Thread(target=_thread_running_loop,args=(loop,))
thread.setDaemon(True)
thread.start()
asyncio.run_coroutine_threadsafe(request(), loop)
time.sleep(1000000)

在 'example 1',它 return 'raise asyncio.TimeoutError' 符合我的预期。但是在'example 2',就像睡着了一样,没有任何反应或异常,只打印“enter function”。我不知道问题出在哪里,谁能帮忙。

"更新示例 2"

import asyncio
import time
import sys
import aiohttp
import threading
import logging

def _thread_running_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()
    print("loop exit.")

async def request():
    print("enter function")
    timeout = aiohttp.ClientTimeout(total = 5)
    async with aiohttp.ClientSession(timeout= timeout) as session:
        async with session.get("A url that is unreachable on the network") as rsp:
            if rsp.status != 200:
                print("error")
            print("ok")

async def call_request():
    try:
        return await request()
    except Exception as e:
        logging.exception(e)

def main():
    loop = asyncio.new_event_loop()
    thread = threading.Thread(target=_thread_running_loop,args=(loop,))
    thread.setDaemon(True)
    thread.start()
    while True:
        print("call_request")
        asyncio.run_coroutine_threadsafe(call_request(), loop)
        asyncio.run_coroutine_threadsafe(call_request(), loop)
        print("wait 100s")
        time.sleep(100)
main()
time.sleep(1000000)

TL;DR 您永远不会收集协程的 result,因此异步线程中引发的异常无法传播。添加对 .result() 的调用,代码段将按预期工作。

But at 'example 2', it's like falling asleep, no response or exception, only printing "enter function". I can't figure out where the problem is, can anyone help.

它睡着了,因为你真的告诉它要睡着了:

time.sleep(1000000)

run_coroutine_threadsafe(a, b)表示“在事件循环b中将协程对象a提交给运行”。默认情况下,它不会等待协程完成,因为它的全部目的是让您的线程继续做其他事情。然而,它 returns 一个 concurrent.futures.Future (不要与 asyncio.Future 混淆),你可以用它来检查计算是否完成,等待它完成,并检索它的结果或异常.如果你从不做任何这些,你永远不会注意到异常,因为它发生在一个完全不同的线程中。但是如果你这样做,例如通过将调用更改为:

asyncio.run_coroutine_threadsafe(request(), loop).result()

...你实现了两件事:你等待协程完成,并且引发的异常(如果有的话)传播到你的主线程,使其表现得像示例 1。

请注意,示例 1 也没有真正打印任何内容,它只是引发了一个 InvalidURL 异常,与上述修复的示例 2 一样。