运行 两个 asyncio 协程,每个都在自己的 Python 进程中

Running Two asyncio Coroutines, Each in its own Python Process

如果我们有 2 个 asyncio 协程,是否可以使用 Python multiproessing 让它们每个 运行 在自己的进程中,and 允许在用户点击 Ctrl+C[ 时停止两个进程中的协程(通过调用它们的 stop 方法) =49=]?

这将类似于下面的代码,除了 foo.start()bar.start() 协程应该有自己的进程。

from builtins import KeyboardInterrupt
import asyncio
import multiprocessing
import signal

class App:
    def __init__(self, text):
        self.text = text

    async def start(self):
        self.loop_task = asyncio.create_task(self.hello())
        await asyncio.wait([self.loop_task])
        
    async def stop(self):
        self.loop_task.cancel()
        
    async def hello(self):
        while True:
            print(self.text)
            await asyncio.sleep(2)

if __name__ == '__main__':
    foo = App('foo')
    bar = App('bar')
    
    # Running in a single process works fine
    try:
        asyncio.run(asyncio.wait([foo.start(), bar.start()]))
    except KeyboardInterrupt:
        asyncio.run(asyncio.wait([foo.stop(), bar.stop()]))

尝试使用 multiprocessingsignals,但我也不确定如何在 2 个进程终止之前调用 foo.stop()bar.stop()

if __name__ == '__main__':
    
    def init_worker():
        signal.signal(signal.SIGINT, signal.SIG_IGN)
        
    def start_foo():
        asyncio.run(foo.start())
        
    def start_bar():
        asyncio.run(bar.start())
        
    foo = App('foo')
    bar = App('bar')    
    pool = multiprocessing.Pool(10, init_worker)
        
    try:
        print('Starting 2 jobs')
        pool.apply_async(start_foo)
        pool.apply_async(start_bar)

        while True:        
            time.sleep(1)  # is sleeping like this a bad thing?
                
    except KeyboardInterrupt:
        print('Caught KeyboardInterrupt, terminating workers')
        pool.terminate()
        pool.join()
    
    print('Shut down complete')

# Based on 

在 Ubuntu 20.04

上使用 Python 3.9.5

基于@Will Da Silva 的解决方案,我做了微小的修改以检查 asyncio.run(app.stop()) 是否在按下 Ctrl+C

时被调用
class App:
    def __init__(self, text):
        self.text = text
    async def start(self):
        self.loop_task = asyncio.create_task(self.hello())
        await asyncio.wait([self.loop_task])
        
    async def stop(self):
        self.loop_task.cancel()
        print(f'Stopping {self.text}')
        
    async def hello(self):
        while True:
            print(self.text)
            await asyncio.sleep(2)

def f(app):
    try:
        asyncio.run(app.start())
    except KeyboardInterrupt:
        asyncio.run(app.stop())
        
if __name__ == '__main__':  
    
    jobs = (App('foo'), App('bar'))
    with multiprocessing.Pool(min(len(jobs), os.cpu_count())) as pool:        
        try:
            print(f'Starting {len(jobs)} jobs')
            pool.map(f, jobs)
                
        except KeyboardInterrupt:
            print('Caught KeyboardInterrupt, terminating workers')
                
    print('Shut down complete')

但是,如果我多次重复启动和停止 Python 脚本,app.stop() 中的 print(f'Stopping {self.text}') 似乎有一半时间不会打印到标准输出。

输出:

$ python test.py
Starting 2 jobs
bar
foo
^CCaught KeyboardInterrupt, terminating workers
Shut down complete

$ python test.py
Starting 2 jobs
bar
foo
^CCaught KeyboardInterrupt, terminating workers
Stopping bar
Shut down complete

$ python test.py
Starting 2 jobs
foo
bar
^CCaught KeyboardInterrupt, terminating workers
Stopping bar
Stopping foo
Shut down complete

这里有一种方法,既不会干扰信号,也不会改变 App class:

import asyncio
import multiprocessing
import os


class App:
    def __init__(self, text):
        self.text = text

    async def start(self):
        self.loop_task = asyncio.create_task(self.hello())
        await asyncio.wait([self.loop_task])
        
    async def stop(self):
        self.loop_task.cancel()
        
    async def hello(self):
        while True:
            print(self.text)
            await asyncio.sleep(2)


def f(text):
    app = App(text)
    try:
        asyncio.run(app.start())
    except KeyboardInterrupt:
        asyncio.run(app.stop())


if __name__ == '__main__':
    jobs = ('foo', 'bar')
    with multiprocessing.Pool(min(len(jobs), os.cpu_count())) as pool:
        try:
            pool.map(f, jobs)
        except KeyboardInterrupt:
            pool.close()
            pool.join()

重要的是我们将池中的进程数限制为 min(len(jobs), os.cpu_count()),因为任何未分配的工作程序都不会在您输入 ctrl- 时捕获 KeyboardInterrupt 的 try-except 块中c,所以他们会引发异常。

为了完全避免这个问题,您可以为池提供一个忽略 SIGINT 的初始化程序,但这也会阻止我们用 KeyboardInterrupt 捕获它。我不确定如何仅在未初始化的工作进程中忽略它。

您也可以在父进程中创建 App 个实例,只要它可以被 pickle 以跨越进程边界传递到子进程中。

def f(app):
    try:
        asyncio.run(app.start())
    except KeyboardInterrupt:
        asyncio.run(app.stop())


if __name__ == '__main__':
    jobs = (App('foo'), App('bar'))
    with multiprocessing.Pool(min(len(jobs), os.cpu_count())) as pool:
        try:
            pool.map(f, jobs)
        except KeyboardInterrupt:
            pool.close()
            pool.join()