我应该使用带队列的池还是进程?

Should I use Pool or Process with a Queue?

我有一个函数 (A) 以恒定速率创建数据,假设每秒 100 个。我想在 A 生成的数据上 运行 另一个函数 (B)。函数 B 可能需要比 0.01s 到 运行 更长的时间,但我不希望它备份数据流。我是否应该创建一个 BPool 并将一个通用的 Queue 传递给 AB 以供使用(如下面的代码)?我还看到您应该使用 Pools 来处理数据列表。这是它们应该如何使用(关于我描述的方法)?我应该只使用两个 Process 并交替向它们发送数据吗?

def A(queue):
  while True:
    data = data_getter()
    queue.put(data)
def B(queue):
  while True:
    data = queue.get(True):
    do_something(data)
# main.py
q = Queue()

pool = Pool(initializer=B, initargs=[q])

A(q)

这是我的简短回答:

进程池存在的目的是允许您以并行方式最大程度地处理 N 个“作业”,前提是您已为该任务分配了 M 个物理处理器。

创建一个 Process 实例正在写入 N 次的队列(相当于提交 N 个“作业”)并让 M Process 个实例读取和处理这些消息,即“作业",并处理它们,实际上是一种进程池的实现。使用单独的进程池只是为了创建队列 reader 进程所需的进程似乎是一个不必要的复杂层。因此,我将创建 M Process 个实例,这些实例从编写器进程向其添加消息的公共队列中读取。

TL;DR(或长答案)

正如您正确推测的那样,您可以通过 (1) 创建单独的 Process 实例或 (2) 使用进程池来实现。方法 1 直觉上似乎是最合乎逻辑的做法,但它不一定是最直接的代码。我在下面使用模拟展示了一些方法,其中队列编写器进程每 .01 秒创建一个队列条目,但队列 reader 进程需要 .06 秒来处理一个队列条目,因此至少有 6 个这样的进程 (从公共队列读取) 需要跟上:

方法 1 -- 显式处理

import multiprocessing as mp
import time


class Sentinel():
    pass

def a(queue, n_readers):
    for i in range(1000):
        time.sleep(.01)
        queue.put(i)
    print('queue size is now approximately: ', queue.qsize()) # print queue size
    # signal readers to terminate:
    end_of_queue = Sentinel()
    for _ in range(n_readers):
        queue.put(end_of_queue)


def b(queue):
    while True:
        value = queue.get(True)
        # signal to terminate?
        if isinstance(value, Sentinel):
            break
        print(value, flush=True)
        time.sleep(.06)



def main():
    n_readers = mp.cpu_count() - 1
    queue = mp.Queue()
    # create queue readers:
    readers = [mp.Process(target=b, args=(queue,)) for _ in range(n_readers)]
    for p in readers:
        p.start()
    # now start queue writer:
    writer = mp.Process(target=a, args=(queue, n_readers))
    writer.start()
    # wait for writer to terminate:
    writer.join()
    for p in readers:
        p.join()
    print('Done')

if __name__ == '__main__':
    main()

方法 2 - 使用进程池

import multiprocessing as mp
import time


class Sentinel():
    pass

def init_pool(q):
    global queue
    queue = q

def a(n_readers):
    for i in range(1000):
        time.sleep(.01)
        queue.put(i)
    print('queue size is now approximately: ', queue.qsize()) # print queue size
    end_of_queue = Sentinel()
    for _ in range(n_readers):
        queue.put(end_of_queue)


def b():
    while True:
        value = queue.get(True)
        # signal to terminate?
        if isinstance(value, Sentinel):
            break
        print(value, flush=True)
        time.sleep(.06)



def main():
    n_readers = mp.cpu_count() - 1
    queue = mp.Queue()
    pool = mp.Pool(n_readers + 1, initializer=init_pool, initargs=(queue,))
    readers_results = [pool.apply_async(b) for _ in range(n_readers)]
    # now submit writer:
    pool.apply(a, args=(n_readers,))
    # wait for readers to finish:
    for r in readers_results:
        r.get()
    print('Done')

if __name__ == '__main__':
    main()

第二种方法的唯一优点是,如果有必要将工人a and/or b 的值返回到主进程,它就变成了使用进程池时很简单。

备注

通过使用 Pool 构造函数的 initializer 参数来实现队列 reader 进程,函数 B 也是可行的(请参阅下面的方法池 2A),但是函数 A 必须在主进程下 运行 。但是这些 Pool 进程是守护进程,一旦所有非守护进程终止,它们就会终止。这就是为什么我在方法 2 中安排将特殊的哨兵消息写入队列作为“作业”的信号(但不是 运行 正在执行作业的进程)在读取哨兵消息时终止.因此,我知道当作业完成时,队列中不再有消息,并且队列中再也不会有任何消息。类似的逻辑适用于方法 1,除了整个过程也终止,我可以使用 join 知道何时发生。但是在您使用隐式守护线程执行队列读取的情况下,即使您添加了额外的代码以在读取所有输入队列值和初始化函数 B 后将哨兵值添加到队列中,终止,主进程如何知道?同样,您可以调用池上的方法 Pool.join(),这会阻止任何未来的工作被提交到池中(我们从未真正明确地提交工作;所有工作都在池初始化函数中完成)。然后调用 Pool.join(),等待每个工作进程退出。一旦每个流程实例的池初始化函数完成,这将立即发生,因为之前对 Pool.close 的调用告诉池永远不会向池中添加任何额外的工作。

方法 2A - 使用带有池初始化程序的进程池

import multiprocessing as mp
import time


class Sentinel():
    pass

def a(queue, n_readers):
    for i in range(1000):
        time.sleep(.01)
        queue.put(i)
    end_of_queue = Sentinel()
    for _ in range(n_readers):
        queue.put(end_of_queue)


def b(the_queue):
    global queue
    queue = the_queue
    while True:
        value = queue.get(True)
        # signal to terminate?
        if isinstance(value, Sentinel):
            break
        print(value, flush=True)
        time.sleep(.06)



def main():
    n_readers = mp.cpu_count() - 1
    queue = mp.Queue()
    pool = mp.Pool(n_readers, initializer=b, initargs=(queue,))
    a(queue, n_readers)
    # wait for readers to finish:
    pool.close() # must be called before pool.join()
    pool.join()
    print('Done')

if __name__ == '__main__':
    main()

备注

所有这三种方法都可以工作,并且所有这三种方法都预先假设 reader 进程不会 运行 无限期地进行,因此我们对有序终止感兴趣(因此需要标记值向 reader 进程发出终止信号)。但是如果writer进程被设计为运行无限期直到进程被用户中断,那么例如方法2a可以修改为使用用户输入ctrl-C产生的键盘中断,来终止执行:

修改后的方法 2A 仅通过键盘中断终止

import multiprocessing as mp
import time
import itertools


def a(queue, n_readers):
    try:
        for i in itertools.count(0):
            time.sleep(.01)
            queue.put(i)
    except KeyboardInterrupt:
        pass



def b(the_queue):
    global queue
    queue = the_queue
    try:
        while True:
            value = queue.get(True)
            print(value, end=' ', flush=True)
            time.sleep(.06)
    except KeyboardInterrupt:
        pass



def main():
    n_readers = mp.cpu_count() - 1
    queue = mp.Queue()
    pool = mp.Pool(n_readers, initializer=b, initargs=(queue,))
    a(queue, n_readers)
    # wait for readers to finish:
    pool.close() # must be called before pool.join()
    try:
        pool.join()
    except KeyboardInterrupt:
        pool.terminate()
    print('Done')

if __name__ == '__main__':
    main()

修改后的方法 1 仅通过键盘输入终止

import multiprocessing as mp
import time
import itertools


def a(queue, n_readers):
    for i in itertools.count(0):
        time.sleep(.01)
        queue.put(i)

def b(queue):
    while True:
        value = queue.get(True)
        if value % 100 == 0:
            print(value, end=' ', flush=True)
        time.sleep(.06)



def main():
    n_readers = mp.cpu_count() - 1
    queue = mp.Queue()
    # create queue readers:
    readers = [mp.Process(target=b, args=(queue,), daemon=True) for _ in range(n_readers)]
    for p in readers:
        p.start()
    # now start queue writer:
    writer = mp.Process(target=a, args=(queue, n_readers), daemon=True)
    writer.start()
    input('Enter return to terminate...')
    print()
    print('Done')

if __name__ == '__main__':
    main()

结论

你显然有选择。如果程序不会无限期地 运行 并且您希望有序关闭以确保所有已排队的消息都已处理,我的首选方法是方法 1。方法 2 和 2a 似乎只是懒惰的获取方式N 个进程以相同的参数为您执行相同的相同工作。

另一方面,如果您的编写器无休止地处理任务 运行,您需要终止它并且不介意队列中可能还有一两个未处理的消息(毕竟您在一个相当任意的时间点终止程序,所以这应该没什么大不了的),那么如果一个简单的 input 语句足以输入终止命令,修改后的方法 1 似乎是需要的方法最少的修改。但是如果运行ning程序一直在输出消息,input语句显示的文本就会丢失,你需要依赖于为每个进程使用键盘中断处理程序,这比较复杂。如果有任何修改过的示例,您可以使用此技术;我在修改后的方法 2a 中使用它作为示例,因为该代码不适合使用 input 语句技术,因为终端输出太多。毫无疑问,当有any终端输出时,最可靠的方法是使用键盘处理程序中断处理程序方法。只要不需要从任何进程返回 return 值,我仍然倾向于使用方法 1 及其变体而不是进程池: