RuntimeError: There is no current event loop in thread in async + apscheduler

RuntimeError: There is no current event loop in thread in async + apscheduler

我有一个异步功能,需要每 N 分钟 运行 使用 apscheduller。 下面有一个python代码

URL_LIST = ['<url1>',
            '<url2>',
            '<url2>',
            ]

def demo_async(urls):
    """Fetch list of web pages asynchronously."""
    loop = asyncio.get_event_loop() # event loop
    future = asyncio.ensure_future(fetch_all(urls)) # tasks to do
    loop.run_until_complete(future) # loop until done

async def fetch_all(urls):
    tasks = [] # dictionary of start times for each url
    async with ClientSession() as session:
        for url in urls:
            task = asyncio.ensure_future(fetch(url, session))
            tasks.append(task) # create list of tasks
        _ = await asyncio.gather(*tasks) # gather task responses

async def fetch(url, session):
    """Fetch a url, using specified ClientSession."""
    async with session.get(url) as response:
        resp = await response.read()
        print(resp)

if __name__ == '__main__':
    scheduler = AsyncIOScheduler()
    scheduler.add_job(demo_async, args=[URL_LIST], trigger='interval', seconds=15)
    scheduler.start()
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

    # Execution will block here until Ctrl+C (Ctrl+Break on Windows) is pressed.
    try:
        asyncio.get_event_loop().run_forever()
    except (KeyboardInterrupt, SystemExit):
        pass

但是当我尝试 运行 时,我得到了下一个错误信息

Job "demo_async (trigger: interval[0:00:15], next run at: 2017-10-12 18:21:12 +04)" raised an exception.....
..........\lib\asyncio\events.py", line 584, in get_event_loop
    % threading.current_thread().name)
RuntimeError: There is no current event loop in thread '<concurrent.futures.thread.ThreadPoolExecutor object at 0x0356B150>_0'.

你能帮我解决这个问题吗? Python 3.6,APScheduler 3.3.1,

直接把fetch_all传给scheduler.add_job()就可以了。 asyncio 调度器支持协程函数作为作业目标。

如果目标可调用对象不是协程函数,它将运行在工作线程中(由于历史原因),因此异常。

在你的 def demo_async(urls) 中,尝试替换:

loop = asyncio.get_event_loop()

与:

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

没有提到的重要事情是错误发生的原因。对我个人而言,知道错误发生的原因与解决实际问题一样重要。

我们来看看BaseDefaultEventLoopPolicyget_event_loop的实现:

class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
    ...

    def get_event_loop(self):
        """Get the event loop.

        This may be None or an instance of EventLoop.
        """
        if (self._local._loop is None and
            not self._local._set_called and
            isinstance(threading.current_thread(), threading._MainThread)):
            self.set_event_loop(self.new_event_loop())
        if self._local._loop is None:
            raise RuntimeError('There is no current event loop in thread %r.'
                               % threading.current_thread().name)
        return self._local._loop

你可以看到 self.set_event_loop(self.new_event_loop()) 仅在满足以下所有条件时才会执行:

  • self._local._loop is None - _local._loop 未设置
  • not self._local._set_called - set_event_loop 尚未调用
  • isinstance(threading.current_thread(), threading._MainThread) - 当前线程是主要线程(在您的情况下这不是 True)

因此抛出异常,因为当前线程没有设置循环:

if self._local._loop is None:
    raise RuntimeError('There is no current event loop in thread %r.'
                       % threading.current_thread().name)

使用asyncio.run()而不是直接使用事件循环。 它创建一个新循环并在完成后将其关闭。

这是 'run' 的样子:

if events._get_running_loop() is not None:
    raise RuntimeError(
        "asyncio.run() cannot be called from a running event loop")

if not coroutines.iscoroutine(main):
    raise ValueError("a coroutine was expected, got {!r}".format(main))

loop = events.new_event_loop()
try:
    events.set_event_loop(loop)
    loop.set_debug(debug)
    return loop.run_until_complete(main)
finally:
    try:
        _cancel_all_tasks(loop)
        loop.run_until_complete(loop.shutdown_asyncgens())
    finally:
        events.set_event_loop(None)
        loop.close()

由于这个问题一直出现在第一页,所以我会把我的问题和我的答案写在这里。

我在使用 flask-socketio and Bleak 时有一个 RuntimeError: There is no current event loop in thread 'Thread-X'.


编辑: 好吧,我重构了我的文件并制作了一个 class.

我在构造函数中初始化了循环,现在一切正常:

class BLE:
    def __init__(self):
        self.loop = asyncio.get_event_loop()

    # function example, improvement of
    # https://github.com/hbldh/bleak/blob/master/examples/discover.py :
    def list_bluetooth_low_energy(self) -> list:
        async def run() -> list:
            BLElist = []
            devices = await bleak.discover()
            for d in devices:
                BLElist.append(d.name)
            return 'success', BLElist
        return self.loop.run_until_complete(run())

用法:

ble = path.to.lib.BLE()
list = ble.list_bluetooth_low_energy()

原回答:

解决方案很愚蠢。我没有注意我做了什么,但是我把一些import移出了一个函数,像这样:

import asyncio, platform
from bleak import discover

def listBLE() -> dict:
    async def run() -> dict:
        # my code that keep throwing exceptions.

    loop = asyncio.get_event_loop()
    ble_list = loop.run_until_complete(run())
    return ble_list

所以我认为我需要更改我的代码中的某些内容,并且我在 get_event_loop():

行之前使用这段代码创建了一个新的事件循环
loop = asyncio.new_event_loop()
loop = asyncio.set_event_loop()

此刻我很高兴,因为我有一个循环运行。

但没有回应。我的代码依赖于 return 一些值的超时,所以这对我的应用程序来说非常糟糕。

我花了将近两个小时才弄清楚问题出在 import,这是我的(工作)代码:

def list() -> dict:
    import asyncio, platform
    from bleak import discover

    async def run() -> dict:
        # my code running perfectly

    loop = asyncio.get_event_loop()
    ble_list  = loop.run_until_complete(run())
    return ble_list

阅读给出的答案后,我只能使用本页 中的提示(尝试替换)来修复我的 websocket 线程。

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

BaseDefaultEventLoopPolicy 的文档解释

Default policy implementation for accessing the event loop. In this policy, each thread has its own event loop. However, we only automatically create an event loop by default for the main thread; other threads by default have no event loop.

所以当使用 线程时 必须创建循环。

我不得不重新排序我的代码,所以我的最终代码

    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    # !!! Place code after setting the loop !!!
    server = Server()
    start_server = websockets.serve(server.ws_handler, 'localhost', port)

我有一个类似的问题,我希望我的 asyncio 模块可以从 non-asyncio 脚本调用(在 gevent 下是 运行...不要问...)。下面的代码解决了我的问题,因为它试图获取当前事件循环,但如果当前线程中没有事件循环,则会创建一个。在 python 3.9.11.

中测试
try:
    loop = asyncio.get_event_loop()
except RuntimeError as e:
    if str(e).startswith('There is no current event loop in thread'):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
    else:
        raise