Python: asyncio 线程循环

Python: asyncio loops with threads

你能告诉我这是否是在自己的线程中构建多个独立异步循环的正确方法吗?

def init():
    print("Initializing Async...")
    global loop_heavy
    loop_heavy = asyncio.new_event_loop()
    start_loop(loop_heavy)

def start_loop(loop):
    thread = threading.Thread(target=loop.run_forever)
    thread.start()

def submit_heavy(task):
    future = asyncio.run_coroutine_threadsafe(task, loop_heavy)
    try:
        future.result()
    except Exception as e:
        print(e)

def stop():
    loop_heavy.call_soon_threadsafe(loop_heavy.stop)

async def heavy():
    print("3. heavy start %s" % threading.current_thread().name)
    await asyncio.sleep(3) # or await asyncio.sleep(3, loop=loop_heavy)
    print("4. heavy done")

然后我正在测试它:

if __name__ == "__main__":
    init()
    print("1. submit heavy: %s" % threading.current_thread().name)
    submit_heavy(heavy())
    print("2. submit is done")
    stop()

我期待看到 1->3->2->4 但实际上是 1->3->4->2:

Initializing Async...
1. submit heavy: MainThread
3. heavy start Thread-1
4. heavy done
2. submit is done

我认为我在理解异步和线程方面遗漏了一些东西。
线程是不同的。为什么我要在 MainThread 中等待,直到 Thread-1 中的作业完成?

Why am I waiting inside MainThread until the job inside Thread-1 is finished?

问得好,你为什么?

一个可能的答案是,因为您实际上想要阻塞当前线程直到作业完成。这是将事件循环放在另一个线程中并使用 run_coroutine_threadsafe.

的原因之一

另一个可能的答案是,如果您不想,则不必。您可以从 submit_heavy()concurrent.futures.Future object returned by run_coroutine_threadsafe, and leave it to the caller to wait for the result (or check if one is ready) 随意 return)。

最后,如果您的目标只是 运行 一个常规函数 "in the background"(不阻塞当前线程),也许您根本不需要 asyncio。看一下 concurrent.futures module, whose ThreadPoolExecutor 允许您轻松地将函数提交到线程池并使其独立执行。

我将添加我从 asyncio 文档中找到的可能解决方案之一。
我不确定这是不是正确的方法,但它按预期工作(MainThread 没有被子线程的执行阻塞)

Running Blocking Code
Blocking (CPU-bound) code should not be called directly. For example, if a function performs a CPU-intensive calculation for 1 second, all concurrent asyncio Tasks and IO operations would be delayed by 1 second.
An executor can be used to run a task in a different thread or even in a different process to avoid blocking block the OS thread with the event loop. See the loop.run_in_executor() method for more details.

应用到我的代码:

import asyncio
import threading
import concurrent.futures
import multiprocessing
import time

def init():
    print("Initializing Async...")

    global loop, thread_executor_pool

    thread_executor_pool = concurrent.futures.ThreadPoolExecutor(max_workers=multiprocessing.cpu_count())
    loop = asyncio.get_event_loop()

    thread = threading.Thread(target=loop.run_forever)
    thread.start()

def submit_task(task, *args):
    loop.run_in_executor(thread_executor_pool, task, *args)

def stop():
    loop.call_soon_threadsafe(loop.stop)
    thread_executor_pool.shutdown()

def blocked_task(msg1, msg2):
    print("3. task start msg: %s, %s, thread: %s" % (msg1, msg2, threading.current_thread().name))
    time.sleep(3)
    print("4. task is done -->")

if __name__ == "__main__":
    init()
    print("1. --> submit task: %s" % threading.current_thread().name)
    submit_task(blocked_task, "a", "b")

    print("2. --> submit is done")
    stop()

输出:

Initializing Async...

1. --> submit task: MainThread
3. task start msg: a, b, thread: ThreadPoolExecutor-0_0
2. --> submit is done
4. task is done  -->

如有错误请指正或换个方法