运行 两个 asyncio 协程,每个都在自己的 Python 进程中
Running Two asyncio Coroutines, Each in its own Python Process
如果我们有 2 个 asyncio
协程,是否可以使用 Python multiproessing
让它们每个 运行 在自己的进程中,and 允许在用户点击 Ctrl+C[ 时停止两个进程中的协程(通过调用它们的 stop
方法) =49=]?
这将类似于下面的代码,除了 foo.start()
和 bar.start()
协程应该有自己的进程。
from builtins import KeyboardInterrupt
import asyncio
import multiprocessing
import signal
class App:
def __init__(self, text):
self.text = text
async def start(self):
self.loop_task = asyncio.create_task(self.hello())
await asyncio.wait([self.loop_task])
async def stop(self):
self.loop_task.cancel()
async def hello(self):
while True:
print(self.text)
await asyncio.sleep(2)
if __name__ == '__main__':
foo = App('foo')
bar = App('bar')
# Running in a single process works fine
try:
asyncio.run(asyncio.wait([foo.start(), bar.start()]))
except KeyboardInterrupt:
asyncio.run(asyncio.wait([foo.stop(), bar.stop()]))
尝试使用 multiprocessing
和 signals
,但我也不确定如何在 2 个进程终止之前调用 foo.stop()
和 bar.stop()
。
if __name__ == '__main__':
def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
def start_foo():
asyncio.run(foo.start())
def start_bar():
asyncio.run(bar.start())
foo = App('foo')
bar = App('bar')
pool = multiprocessing.Pool(10, init_worker)
try:
print('Starting 2 jobs')
pool.apply_async(start_foo)
pool.apply_async(start_bar)
while True:
time.sleep(1) # is sleeping like this a bad thing?
except KeyboardInterrupt:
print('Caught KeyboardInterrupt, terminating workers')
pool.terminate()
pool.join()
print('Shut down complete')
# Based on
在 Ubuntu 20.04
上使用 Python 3.9.5
基于@Will Da Silva 的解决方案,我做了微小的修改以检查 asyncio.run(app.stop())
是否在按下 Ctrl+C
时被调用
class App:
def __init__(self, text):
self.text = text
async def start(self):
self.loop_task = asyncio.create_task(self.hello())
await asyncio.wait([self.loop_task])
async def stop(self):
self.loop_task.cancel()
print(f'Stopping {self.text}')
async def hello(self):
while True:
print(self.text)
await asyncio.sleep(2)
def f(app):
try:
asyncio.run(app.start())
except KeyboardInterrupt:
asyncio.run(app.stop())
if __name__ == '__main__':
jobs = (App('foo'), App('bar'))
with multiprocessing.Pool(min(len(jobs), os.cpu_count())) as pool:
try:
print(f'Starting {len(jobs)} jobs')
pool.map(f, jobs)
except KeyboardInterrupt:
print('Caught KeyboardInterrupt, terminating workers')
print('Shut down complete')
但是,如果我多次重复启动和停止 Python 脚本,app.stop()
中的 print(f'Stopping {self.text}')
似乎有一半时间不会打印到标准输出。
输出:
$ python test.py
Starting 2 jobs
bar
foo
^CCaught KeyboardInterrupt, terminating workers
Shut down complete
$ python test.py
Starting 2 jobs
bar
foo
^CCaught KeyboardInterrupt, terminating workers
Stopping bar
Shut down complete
$ python test.py
Starting 2 jobs
foo
bar
^CCaught KeyboardInterrupt, terminating workers
Stopping bar
Stopping foo
Shut down complete
这里有一种方法,既不会干扰信号,也不会改变 App
class:
import asyncio
import multiprocessing
import os
class App:
def __init__(self, text):
self.text = text
async def start(self):
self.loop_task = asyncio.create_task(self.hello())
await asyncio.wait([self.loop_task])
async def stop(self):
self.loop_task.cancel()
async def hello(self):
while True:
print(self.text)
await asyncio.sleep(2)
def f(text):
app = App(text)
try:
asyncio.run(app.start())
except KeyboardInterrupt:
asyncio.run(app.stop())
if __name__ == '__main__':
jobs = ('foo', 'bar')
with multiprocessing.Pool(min(len(jobs), os.cpu_count())) as pool:
try:
pool.map(f, jobs)
except KeyboardInterrupt:
pool.close()
pool.join()
重要的是我们将池中的进程数限制为 min(len(jobs), os.cpu_count())
,因为任何未分配的工作程序都不会在您输入 ctrl- 时捕获 KeyboardInterrupt
的 try-except 块中c,所以他们会引发异常。
为了完全避免这个问题,您可以为池提供一个忽略 SIGINT
的初始化程序,但这也会阻止我们用 KeyboardInterrupt
捕获它。我不确定如何仅在未初始化的工作进程中忽略它。
您也可以在父进程中创建 App
个实例,只要它可以被 pickle 以跨越进程边界传递到子进程中。
def f(app):
try:
asyncio.run(app.start())
except KeyboardInterrupt:
asyncio.run(app.stop())
if __name__ == '__main__':
jobs = (App('foo'), App('bar'))
with multiprocessing.Pool(min(len(jobs), os.cpu_count())) as pool:
try:
pool.map(f, jobs)
except KeyboardInterrupt:
pool.close()
pool.join()
如果我们有 2 个 asyncio
协程,是否可以使用 Python multiproessing
让它们每个 运行 在自己的进程中,and 允许在用户点击 Ctrl+C[ 时停止两个进程中的协程(通过调用它们的 stop
方法) =49=]?
这将类似于下面的代码,除了 foo.start()
和 bar.start()
协程应该有自己的进程。
from builtins import KeyboardInterrupt
import asyncio
import multiprocessing
import signal
class App:
def __init__(self, text):
self.text = text
async def start(self):
self.loop_task = asyncio.create_task(self.hello())
await asyncio.wait([self.loop_task])
async def stop(self):
self.loop_task.cancel()
async def hello(self):
while True:
print(self.text)
await asyncio.sleep(2)
if __name__ == '__main__':
foo = App('foo')
bar = App('bar')
# Running in a single process works fine
try:
asyncio.run(asyncio.wait([foo.start(), bar.start()]))
except KeyboardInterrupt:
asyncio.run(asyncio.wait([foo.stop(), bar.stop()]))
尝试使用 multiprocessing
和 signals
,但我也不确定如何在 2 个进程终止之前调用 foo.stop()
和 bar.stop()
。
if __name__ == '__main__':
def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
def start_foo():
asyncio.run(foo.start())
def start_bar():
asyncio.run(bar.start())
foo = App('foo')
bar = App('bar')
pool = multiprocessing.Pool(10, init_worker)
try:
print('Starting 2 jobs')
pool.apply_async(start_foo)
pool.apply_async(start_bar)
while True:
time.sleep(1) # is sleeping like this a bad thing?
except KeyboardInterrupt:
print('Caught KeyboardInterrupt, terminating workers')
pool.terminate()
pool.join()
print('Shut down complete')
# Based on
在 Ubuntu 20.04
上使用 Python 3.9.5基于@Will Da Silva 的解决方案,我做了微小的修改以检查 asyncio.run(app.stop())
是否在按下 Ctrl+C
class App:
def __init__(self, text):
self.text = text
async def start(self):
self.loop_task = asyncio.create_task(self.hello())
await asyncio.wait([self.loop_task])
async def stop(self):
self.loop_task.cancel()
print(f'Stopping {self.text}')
async def hello(self):
while True:
print(self.text)
await asyncio.sleep(2)
def f(app):
try:
asyncio.run(app.start())
except KeyboardInterrupt:
asyncio.run(app.stop())
if __name__ == '__main__':
jobs = (App('foo'), App('bar'))
with multiprocessing.Pool(min(len(jobs), os.cpu_count())) as pool:
try:
print(f'Starting {len(jobs)} jobs')
pool.map(f, jobs)
except KeyboardInterrupt:
print('Caught KeyboardInterrupt, terminating workers')
print('Shut down complete')
但是,如果我多次重复启动和停止 Python 脚本,app.stop()
中的 print(f'Stopping {self.text}')
似乎有一半时间不会打印到标准输出。
输出:
$ python test.py
Starting 2 jobs
bar
foo
^CCaught KeyboardInterrupt, terminating workers
Shut down complete
$ python test.py
Starting 2 jobs
bar
foo
^CCaught KeyboardInterrupt, terminating workers
Stopping bar
Shut down complete
$ python test.py
Starting 2 jobs
foo
bar
^CCaught KeyboardInterrupt, terminating workers
Stopping bar
Stopping foo
Shut down complete
这里有一种方法,既不会干扰信号,也不会改变 App
class:
import asyncio
import multiprocessing
import os
class App:
def __init__(self, text):
self.text = text
async def start(self):
self.loop_task = asyncio.create_task(self.hello())
await asyncio.wait([self.loop_task])
async def stop(self):
self.loop_task.cancel()
async def hello(self):
while True:
print(self.text)
await asyncio.sleep(2)
def f(text):
app = App(text)
try:
asyncio.run(app.start())
except KeyboardInterrupt:
asyncio.run(app.stop())
if __name__ == '__main__':
jobs = ('foo', 'bar')
with multiprocessing.Pool(min(len(jobs), os.cpu_count())) as pool:
try:
pool.map(f, jobs)
except KeyboardInterrupt:
pool.close()
pool.join()
重要的是我们将池中的进程数限制为 min(len(jobs), os.cpu_count())
,因为任何未分配的工作程序都不会在您输入 ctrl- 时捕获 KeyboardInterrupt
的 try-except 块中c,所以他们会引发异常。
为了完全避免这个问题,您可以为池提供一个忽略 SIGINT
的初始化程序,但这也会阻止我们用 KeyboardInterrupt
捕获它。我不确定如何仅在未初始化的工作进程中忽略它。
您也可以在父进程中创建 App
个实例,只要它可以被 pickle 以跨越进程边界传递到子进程中。
def f(app):
try:
asyncio.run(app.start())
except KeyboardInterrupt:
asyncio.run(app.stop())
if __name__ == '__main__':
jobs = (App('foo'), App('bar'))
with multiprocessing.Pool(min(len(jobs), os.cpu_count())) as pool:
try:
pool.map(f, jobs)
except KeyboardInterrupt:
pool.close()
pool.join()