并发期货以非阻塞方式将任务提交到进程池

Concurrent futures submit task to process pool non-blockingly

这个问题是 Python 2-specific,使用社区维护的 concurrent.futures backport。

我正在尝试使用 ProcessPoolExecutor(将 maxWorkers 简单地设置为 2)来并行 运行 两个任务。这些任务都是 Python 功能,我希望它们每个都在自己的进程中 运行。他们不需要相互协调(我什至不需要知道退出状态)。我只想能够同时启动进程并限制在任何给定时刻并行 运行 的进程数。

import concurrent.futures as futures
import time


def do_stuff(name):
    for x in range(10):
        print name, x
        time.sleep(1)


pool = futures.ProcessPoolExecutor(max_workers=2)
pool.submit(do_stuff("a"))
print "a submitted!"
pool.submit(do_stuff("b"))

然而,这会打印

a 0 a 1 ... a 9 a submitted! b 0 b 1 ... b 9

为什么submit是阻塞操作?是否有非阻塞等效项?

这是一个使用 multiprocessing 库的示例,它具有我想要的行为。它以非阻塞方式启动每个进程,然后调用 join(大概只是 waitpid(2) 的薄包装)。但是,此技术无法让我限制在任何给定时刻并行 运行 的进程数。

import multiprocessing
import time


def do_stuff(name):
    for x in range(10):
        print name, x
        time.sleep(1)


proc_a = multiprocessing.Process(target=do_stuff, args="a")
proc_b = multiprocessing.Process(target=do_stuff, args="b")
proc_a.start()
proc_b.start()
proc_a.join()
proc_b.join()

您的代码中的串行打印输出(而不是并发打印输出)似乎是由您用于提交函数参数的语法错误引起的。应使用 Executor.submit() 的逗号将参数与函数分开。试试这个版本。

import concurrent.futures as futures
import time

def do_stuff(name):
    for x in range(10):
        print name, x
        time.sleep(1)

pool = futures.ProcessPoolExecutor(max_workers=2)
pool.submit(do_stuff, "a")
print "a submitted!"
pool.submit(do_stuff, "b")
print "b submitted!"

此外,我建议尽可能使用 "with" 语句来管理您的提交,因为这将确保 concurrent.futures.Executor 的正确 closure/shutdown。文档中提到了这一点。

with futures.ProcessPoolExecutor(max_workers=2) as executor:
    executor.submit(do_stuff, "a") 
    print "a submitted!"
    executor.submit(do_stuff, "b")
    print "b submitted!"