运行 asyncio 在两个不同的线程中即时处理动作
Running asyncio in two different threads to process actions on-the-fly
我正在尝试异步处理许多操作:我想将操作发送到我的循环并同时 运行 它们在 ProcessPoolExecutor
中。我想我不知道一开始我会 运行ning 的所有工作,所以我无法定义所有工作然后 然后 启动事件循环。
我找到的唯一解决方案是运行一个可以处理动作的主线程,另一个线程loop.run_forever
,这似乎可行。但是,我没有看到任何以这种方式在同一循环上 运行ning 两个单独线程的示例。有没有其他方法可以解决这个问题,如果我的解决方案有什么问题?
import asyncio
from concurrent.futures import ProcessPoolExecutor
import functools
import time
import threading
executor = ProcessPoolExecutor(max_workers=3)
def do_work(eventloop, value):
future = eventloop.run_in_executor(executor, functools.partial(process_action, value))
future.add_done_callback(run_job_success)
def process_action(value):
print("Processing %i" % value)
time.sleep(1)
return value
def run_job_success(f):
print("Success : %s" % f.result())
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop_thread = threading.Thread(target=loop.run_forever)
loop_thread.start()
while True:
msg = recv()
if msg is not None:
do_work(loop, msg)
编辑:
我使用 recv
方法获得要接收的作业。
您尝试做的事情有点模棱两可 - 您说在程序开始时您不知道想要 运行 的所有工作。没关系 - 但如何做才能找到自己想要的工作运行?在任何情况下,您上面的测试程序都可以(并且应该)被重写为单线程的,方法是使用 BaseEventLoop.call_soon
来安排您在开始事件循环之前要进行的所有 do_work
调用:
import asyncio
from concurrent.futures import ProcessPoolExecutor
import functools
import time
def do_work(eventloop, value):
future = eventloop.run_in_executor(executor, functools.partial(process_action, value))
future.add_done_callback(run_job_success)
def process_action(value):
print("Processing %i" % value)
time.sleep(1)
return value
def run_job_success(f):
print("Success : %s" % f.result())
if __name__ == "__main__":
executor = ProcessPoolExecutor(max_workers=3)
loop = asyncio.get_event_loop()
for i in range(5):
loop.call_soon(do_work, loop, i)
loop.run_forever()
或者可以进一步重构为使用协程而不是回调,这通常是使用 asyncio
:
时的首选样式
import time
import asyncio
import functools
from concurrent.futures import ProcessPoolExecutor
def do_work(loop, value):
return loop.run_in_executor(executor, functools.partial(process_action, value))
def process_action(value):
print("Processing %i" % value)
time.sleep(1)
return value
@asyncio.coroutine
def main(loop):
tasks = [do_work(loop, i) for i in range(5)]
for fut in asyncio.as_completed(tasks):
result = yield from fut
print("Success : %s" % result)
if __name__ == "__main__":
executor = ProcessPoolExecutor(max_workers=3)
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
这也使得在完成所有工作后更容易退出程序,而不必使用 Ctrl+C 和 loop.run_forever
。
您当前的方法是安全的(loop.run_in_executor
在幕后使用 loop.call_soon_threadsafe
,这是您可以 safely/correctly 从单独的线程在事件循环中安排工作的唯一方法),它只是过于复杂和不必要; asyncio
的设计使得使用它的程序是单线程的(除非阻塞操作需要 运行,这就是 run_in_executor
的用途)。
我正在尝试异步处理许多操作:我想将操作发送到我的循环并同时 运行 它们在 ProcessPoolExecutor
中。我想我不知道一开始我会 运行ning 的所有工作,所以我无法定义所有工作然后 然后 启动事件循环。
我找到的唯一解决方案是运行一个可以处理动作的主线程,另一个线程loop.run_forever
,这似乎可行。但是,我没有看到任何以这种方式在同一循环上 运行ning 两个单独线程的示例。有没有其他方法可以解决这个问题,如果我的解决方案有什么问题?
import asyncio
from concurrent.futures import ProcessPoolExecutor
import functools
import time
import threading
executor = ProcessPoolExecutor(max_workers=3)
def do_work(eventloop, value):
future = eventloop.run_in_executor(executor, functools.partial(process_action, value))
future.add_done_callback(run_job_success)
def process_action(value):
print("Processing %i" % value)
time.sleep(1)
return value
def run_job_success(f):
print("Success : %s" % f.result())
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop_thread = threading.Thread(target=loop.run_forever)
loop_thread.start()
while True:
msg = recv()
if msg is not None:
do_work(loop, msg)
编辑:
我使用 recv
方法获得要接收的作业。
您尝试做的事情有点模棱两可 - 您说在程序开始时您不知道想要 运行 的所有工作。没关系 - 但如何做才能找到自己想要的工作运行?在任何情况下,您上面的测试程序都可以(并且应该)被重写为单线程的,方法是使用 BaseEventLoop.call_soon
来安排您在开始事件循环之前要进行的所有 do_work
调用:
import asyncio
from concurrent.futures import ProcessPoolExecutor
import functools
import time
def do_work(eventloop, value):
future = eventloop.run_in_executor(executor, functools.partial(process_action, value))
future.add_done_callback(run_job_success)
def process_action(value):
print("Processing %i" % value)
time.sleep(1)
return value
def run_job_success(f):
print("Success : %s" % f.result())
if __name__ == "__main__":
executor = ProcessPoolExecutor(max_workers=3)
loop = asyncio.get_event_loop()
for i in range(5):
loop.call_soon(do_work, loop, i)
loop.run_forever()
或者可以进一步重构为使用协程而不是回调,这通常是使用 asyncio
:
import time
import asyncio
import functools
from concurrent.futures import ProcessPoolExecutor
def do_work(loop, value):
return loop.run_in_executor(executor, functools.partial(process_action, value))
def process_action(value):
print("Processing %i" % value)
time.sleep(1)
return value
@asyncio.coroutine
def main(loop):
tasks = [do_work(loop, i) for i in range(5)]
for fut in asyncio.as_completed(tasks):
result = yield from fut
print("Success : %s" % result)
if __name__ == "__main__":
executor = ProcessPoolExecutor(max_workers=3)
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
这也使得在完成所有工作后更容易退出程序,而不必使用 Ctrl+C 和 loop.run_forever
。
您当前的方法是安全的(loop.run_in_executor
在幕后使用 loop.call_soon_threadsafe
,这是您可以 safely/correctly 从单独的线程在事件循环中安排工作的唯一方法),它只是过于复杂和不必要; asyncio
的设计使得使用它的程序是单线程的(除非阻塞操作需要 运行,这就是 run_in_executor
的用途)。