我应该使用带队列的池还是进程?
Should I use Pool or Process with a Queue?
我有一个函数 (A
) 以恒定速率创建数据,假设每秒 100 个。我想在 A
生成的数据上 运行 另一个函数 (B
)。函数 B
可能需要比 0.01s
到 运行 更长的时间,但我不希望它备份数据流。我是否应该创建一个 B
的 Pool
并将一个通用的 Queue
传递给 A
和 B
以供使用(如下面的代码)?我还看到您应该使用 Pool
s 来处理数据列表。这是它们应该如何使用(关于我描述的方法)?我应该只使用两个 Process
并交替向它们发送数据吗?
def A(queue):
while True:
data = data_getter()
queue.put(data)
def B(queue):
while True:
data = queue.get(True):
do_something(data)
# main.py
q = Queue()
pool = Pool(initializer=B, initargs=[q])
A(q)
这是我的简短回答:
进程池存在的目的是允许您以并行方式最大程度地处理 N 个“作业”,前提是您已为该任务分配了 M 个物理处理器。
创建一个 Process
实例正在写入 N 次的队列(相当于提交 N 个“作业”)并让 M Process
个实例读取和处理这些消息,即“作业",并处理它们,实际上是一种进程池的实现。使用单独的进程池只是为了创建队列 reader 进程所需的进程似乎是一个不必要的复杂层。因此,我将创建 M Process
个实例,这些实例从编写器进程向其添加消息的公共队列中读取。
TL;DR(或长答案)
正如您正确推测的那样,您可以通过 (1) 创建单独的 Process
实例或 (2) 使用进程池来实现。方法 1 直觉上似乎是最合乎逻辑的做法,但它不一定是最直接的代码。我在下面使用模拟展示了一些方法,其中队列编写器进程每 .01 秒创建一个队列条目,但队列 reader 进程需要 .06 秒来处理一个队列条目,因此至少有 6 个这样的进程 (从公共队列读取) 需要跟上:
方法 1 -- 显式处理
import multiprocessing as mp
import time
class Sentinel():
pass
def a(queue, n_readers):
for i in range(1000):
time.sleep(.01)
queue.put(i)
print('queue size is now approximately: ', queue.qsize()) # print queue size
# signal readers to terminate:
end_of_queue = Sentinel()
for _ in range(n_readers):
queue.put(end_of_queue)
def b(queue):
while True:
value = queue.get(True)
# signal to terminate?
if isinstance(value, Sentinel):
break
print(value, flush=True)
time.sleep(.06)
def main():
n_readers = mp.cpu_count() - 1
queue = mp.Queue()
# create queue readers:
readers = [mp.Process(target=b, args=(queue,)) for _ in range(n_readers)]
for p in readers:
p.start()
# now start queue writer:
writer = mp.Process(target=a, args=(queue, n_readers))
writer.start()
# wait for writer to terminate:
writer.join()
for p in readers:
p.join()
print('Done')
if __name__ == '__main__':
main()
方法 2 - 使用进程池
import multiprocessing as mp
import time
class Sentinel():
pass
def init_pool(q):
global queue
queue = q
def a(n_readers):
for i in range(1000):
time.sleep(.01)
queue.put(i)
print('queue size is now approximately: ', queue.qsize()) # print queue size
end_of_queue = Sentinel()
for _ in range(n_readers):
queue.put(end_of_queue)
def b():
while True:
value = queue.get(True)
# signal to terminate?
if isinstance(value, Sentinel):
break
print(value, flush=True)
time.sleep(.06)
def main():
n_readers = mp.cpu_count() - 1
queue = mp.Queue()
pool = mp.Pool(n_readers + 1, initializer=init_pool, initargs=(queue,))
readers_results = [pool.apply_async(b) for _ in range(n_readers)]
# now submit writer:
pool.apply(a, args=(n_readers,))
# wait for readers to finish:
for r in readers_results:
r.get()
print('Done')
if __name__ == '__main__':
main()
第二种方法的唯一优点是,如果有必要将工人a
and/or b
的值返回到主进程,它就变成了使用进程池时很简单。
备注
通过使用 Pool
构造函数的 initializer
参数来实现队列 reader 进程,函数 B
也是可行的(请参阅下面的方法池 2A),但是函数 A
必须在主进程下 运行 。但是这些 Pool 进程是守护进程,一旦所有非守护进程终止,它们就会终止。这就是为什么我在方法 2 中安排将特殊的哨兵消息写入队列作为“作业”的信号(但不是 运行 正在执行作业的进程)在读取哨兵消息时终止.因此,我知道当作业完成时,队列中不再有消息,并且队列中再也不会有任何消息。类似的逻辑适用于方法 1,除了整个过程也终止,我可以使用 join
知道何时发生。但是在您使用隐式守护线程执行队列读取的情况下,即使您添加了额外的代码以在读取所有输入队列值和初始化函数 B
后将哨兵值添加到队列中,终止,主进程如何知道?同样,您可以调用池上的方法 Pool.join()
,这会阻止任何未来的工作被提交到池中(我们从未真正明确地提交工作;所有工作都在池初始化函数中完成)。然后调用 Pool.join()
,等待每个工作进程退出。一旦每个流程实例的池初始化函数完成,这将立即发生,因为之前对 Pool.close
的调用告诉池永远不会向池中添加任何额外的工作。
方法 2A - 使用带有池初始化程序的进程池
import multiprocessing as mp
import time
class Sentinel():
pass
def a(queue, n_readers):
for i in range(1000):
time.sleep(.01)
queue.put(i)
end_of_queue = Sentinel()
for _ in range(n_readers):
queue.put(end_of_queue)
def b(the_queue):
global queue
queue = the_queue
while True:
value = queue.get(True)
# signal to terminate?
if isinstance(value, Sentinel):
break
print(value, flush=True)
time.sleep(.06)
def main():
n_readers = mp.cpu_count() - 1
queue = mp.Queue()
pool = mp.Pool(n_readers, initializer=b, initargs=(queue,))
a(queue, n_readers)
# wait for readers to finish:
pool.close() # must be called before pool.join()
pool.join()
print('Done')
if __name__ == '__main__':
main()
备注
所有这三种方法都可以工作,并且所有这三种方法都预先假设 reader 进程不会 运行 无限期地进行,因此我们对有序终止感兴趣(因此需要标记值向 reader 进程发出终止信号)。但是如果writer进程被设计为运行无限期直到进程被用户中断,那么例如方法2a可以修改为使用用户输入ctrl-C产生的键盘中断,来终止执行:
修改后的方法 2A 仅通过键盘中断终止
import multiprocessing as mp
import time
import itertools
def a(queue, n_readers):
try:
for i in itertools.count(0):
time.sleep(.01)
queue.put(i)
except KeyboardInterrupt:
pass
def b(the_queue):
global queue
queue = the_queue
try:
while True:
value = queue.get(True)
print(value, end=' ', flush=True)
time.sleep(.06)
except KeyboardInterrupt:
pass
def main():
n_readers = mp.cpu_count() - 1
queue = mp.Queue()
pool = mp.Pool(n_readers, initializer=b, initargs=(queue,))
a(queue, n_readers)
# wait for readers to finish:
pool.close() # must be called before pool.join()
try:
pool.join()
except KeyboardInterrupt:
pool.terminate()
print('Done')
if __name__ == '__main__':
main()
修改后的方法 1 仅通过键盘输入终止
import multiprocessing as mp
import time
import itertools
def a(queue, n_readers):
for i in itertools.count(0):
time.sleep(.01)
queue.put(i)
def b(queue):
while True:
value = queue.get(True)
if value % 100 == 0:
print(value, end=' ', flush=True)
time.sleep(.06)
def main():
n_readers = mp.cpu_count() - 1
queue = mp.Queue()
# create queue readers:
readers = [mp.Process(target=b, args=(queue,), daemon=True) for _ in range(n_readers)]
for p in readers:
p.start()
# now start queue writer:
writer = mp.Process(target=a, args=(queue, n_readers), daemon=True)
writer.start()
input('Enter return to terminate...')
print()
print('Done')
if __name__ == '__main__':
main()
结论
你显然有选择。如果程序不会无限期地 运行 并且您希望有序关闭以确保所有已排队的消息都已处理,我的首选方法是方法 1。方法 2 和 2a 似乎只是懒惰的获取方式N 个进程以相同的参数为您执行相同的相同工作。
另一方面,如果您的编写器无休止地处理任务 运行,您需要终止它并且不介意队列中可能还有一两个未处理的消息(毕竟您在一个相当任意的时间点终止程序,所以这应该没什么大不了的),那么如果一个简单的 input
语句足以输入终止命令,修改后的方法 1 似乎是需要的方法最少的修改。但是如果运行ning程序一直在输出消息,input
语句显示的文本就会丢失,你需要依赖于为每个进程使用键盘中断处理程序,这比较复杂。如果有任何修改过的示例,您可以使用此技术;我在修改后的方法 2a 中使用它作为示例,因为该代码不适合使用 input
语句技术,因为终端输出太多。毫无疑问,当有any终端输出时,最可靠的方法是使用键盘处理程序中断处理程序方法。只要不需要从任何进程返回 return 值,我仍然倾向于使用方法 1 及其变体而不是进程池:
我有一个函数 (A
) 以恒定速率创建数据,假设每秒 100 个。我想在 A
生成的数据上 运行 另一个函数 (B
)。函数 B
可能需要比 0.01s
到 运行 更长的时间,但我不希望它备份数据流。我是否应该创建一个 B
的 Pool
并将一个通用的 Queue
传递给 A
和 B
以供使用(如下面的代码)?我还看到您应该使用 Pool
s 来处理数据列表。这是它们应该如何使用(关于我描述的方法)?我应该只使用两个 Process
并交替向它们发送数据吗?
def A(queue):
while True:
data = data_getter()
queue.put(data)
def B(queue):
while True:
data = queue.get(True):
do_something(data)
# main.py
q = Queue()
pool = Pool(initializer=B, initargs=[q])
A(q)
这是我的简短回答:
进程池存在的目的是允许您以并行方式最大程度地处理 N 个“作业”,前提是您已为该任务分配了 M 个物理处理器。
创建一个 Process
实例正在写入 N 次的队列(相当于提交 N 个“作业”)并让 M Process
个实例读取和处理这些消息,即“作业",并处理它们,实际上是一种进程池的实现。使用单独的进程池只是为了创建队列 reader 进程所需的进程似乎是一个不必要的复杂层。因此,我将创建 M Process
个实例,这些实例从编写器进程向其添加消息的公共队列中读取。
TL;DR(或长答案)
正如您正确推测的那样,您可以通过 (1) 创建单独的 Process
实例或 (2) 使用进程池来实现。方法 1 直觉上似乎是最合乎逻辑的做法,但它不一定是最直接的代码。我在下面使用模拟展示了一些方法,其中队列编写器进程每 .01 秒创建一个队列条目,但队列 reader 进程需要 .06 秒来处理一个队列条目,因此至少有 6 个这样的进程 (从公共队列读取) 需要跟上:
方法 1 -- 显式处理
import multiprocessing as mp
import time
class Sentinel():
pass
def a(queue, n_readers):
for i in range(1000):
time.sleep(.01)
queue.put(i)
print('queue size is now approximately: ', queue.qsize()) # print queue size
# signal readers to terminate:
end_of_queue = Sentinel()
for _ in range(n_readers):
queue.put(end_of_queue)
def b(queue):
while True:
value = queue.get(True)
# signal to terminate?
if isinstance(value, Sentinel):
break
print(value, flush=True)
time.sleep(.06)
def main():
n_readers = mp.cpu_count() - 1
queue = mp.Queue()
# create queue readers:
readers = [mp.Process(target=b, args=(queue,)) for _ in range(n_readers)]
for p in readers:
p.start()
# now start queue writer:
writer = mp.Process(target=a, args=(queue, n_readers))
writer.start()
# wait for writer to terminate:
writer.join()
for p in readers:
p.join()
print('Done')
if __name__ == '__main__':
main()
方法 2 - 使用进程池
import multiprocessing as mp
import time
class Sentinel():
pass
def init_pool(q):
global queue
queue = q
def a(n_readers):
for i in range(1000):
time.sleep(.01)
queue.put(i)
print('queue size is now approximately: ', queue.qsize()) # print queue size
end_of_queue = Sentinel()
for _ in range(n_readers):
queue.put(end_of_queue)
def b():
while True:
value = queue.get(True)
# signal to terminate?
if isinstance(value, Sentinel):
break
print(value, flush=True)
time.sleep(.06)
def main():
n_readers = mp.cpu_count() - 1
queue = mp.Queue()
pool = mp.Pool(n_readers + 1, initializer=init_pool, initargs=(queue,))
readers_results = [pool.apply_async(b) for _ in range(n_readers)]
# now submit writer:
pool.apply(a, args=(n_readers,))
# wait for readers to finish:
for r in readers_results:
r.get()
print('Done')
if __name__ == '__main__':
main()
第二种方法的唯一优点是,如果有必要将工人a
and/or b
的值返回到主进程,它就变成了使用进程池时很简单。
备注
通过使用 Pool
构造函数的 initializer
参数来实现队列 reader 进程,函数 B
也是可行的(请参阅下面的方法池 2A),但是函数 A
必须在主进程下 运行 。但是这些 Pool 进程是守护进程,一旦所有非守护进程终止,它们就会终止。这就是为什么我在方法 2 中安排将特殊的哨兵消息写入队列作为“作业”的信号(但不是 运行 正在执行作业的进程)在读取哨兵消息时终止.因此,我知道当作业完成时,队列中不再有消息,并且队列中再也不会有任何消息。类似的逻辑适用于方法 1,除了整个过程也终止,我可以使用 join
知道何时发生。但是在您使用隐式守护线程执行队列读取的情况下,即使您添加了额外的代码以在读取所有输入队列值和初始化函数 B
后将哨兵值添加到队列中,终止,主进程如何知道?同样,您可以调用池上的方法 Pool.join()
,这会阻止任何未来的工作被提交到池中(我们从未真正明确地提交工作;所有工作都在池初始化函数中完成)。然后调用 Pool.join()
,等待每个工作进程退出。一旦每个流程实例的池初始化函数完成,这将立即发生,因为之前对 Pool.close
的调用告诉池永远不会向池中添加任何额外的工作。
方法 2A - 使用带有池初始化程序的进程池
import multiprocessing as mp
import time
class Sentinel():
pass
def a(queue, n_readers):
for i in range(1000):
time.sleep(.01)
queue.put(i)
end_of_queue = Sentinel()
for _ in range(n_readers):
queue.put(end_of_queue)
def b(the_queue):
global queue
queue = the_queue
while True:
value = queue.get(True)
# signal to terminate?
if isinstance(value, Sentinel):
break
print(value, flush=True)
time.sleep(.06)
def main():
n_readers = mp.cpu_count() - 1
queue = mp.Queue()
pool = mp.Pool(n_readers, initializer=b, initargs=(queue,))
a(queue, n_readers)
# wait for readers to finish:
pool.close() # must be called before pool.join()
pool.join()
print('Done')
if __name__ == '__main__':
main()
备注
所有这三种方法都可以工作,并且所有这三种方法都预先假设 reader 进程不会 运行 无限期地进行,因此我们对有序终止感兴趣(因此需要标记值向 reader 进程发出终止信号)。但是如果writer进程被设计为运行无限期直到进程被用户中断,那么例如方法2a可以修改为使用用户输入ctrl-C产生的键盘中断,来终止执行:
修改后的方法 2A 仅通过键盘中断终止
import multiprocessing as mp
import time
import itertools
def a(queue, n_readers):
try:
for i in itertools.count(0):
time.sleep(.01)
queue.put(i)
except KeyboardInterrupt:
pass
def b(the_queue):
global queue
queue = the_queue
try:
while True:
value = queue.get(True)
print(value, end=' ', flush=True)
time.sleep(.06)
except KeyboardInterrupt:
pass
def main():
n_readers = mp.cpu_count() - 1
queue = mp.Queue()
pool = mp.Pool(n_readers, initializer=b, initargs=(queue,))
a(queue, n_readers)
# wait for readers to finish:
pool.close() # must be called before pool.join()
try:
pool.join()
except KeyboardInterrupt:
pool.terminate()
print('Done')
if __name__ == '__main__':
main()
修改后的方法 1 仅通过键盘输入终止
import multiprocessing as mp
import time
import itertools
def a(queue, n_readers):
for i in itertools.count(0):
time.sleep(.01)
queue.put(i)
def b(queue):
while True:
value = queue.get(True)
if value % 100 == 0:
print(value, end=' ', flush=True)
time.sleep(.06)
def main():
n_readers = mp.cpu_count() - 1
queue = mp.Queue()
# create queue readers:
readers = [mp.Process(target=b, args=(queue,), daemon=True) for _ in range(n_readers)]
for p in readers:
p.start()
# now start queue writer:
writer = mp.Process(target=a, args=(queue, n_readers), daemon=True)
writer.start()
input('Enter return to terminate...')
print()
print('Done')
if __name__ == '__main__':
main()
结论
你显然有选择。如果程序不会无限期地 运行 并且您希望有序关闭以确保所有已排队的消息都已处理,我的首选方法是方法 1。方法 2 和 2a 似乎只是懒惰的获取方式N 个进程以相同的参数为您执行相同的相同工作。
另一方面,如果您的编写器无休止地处理任务 运行,您需要终止它并且不介意队列中可能还有一两个未处理的消息(毕竟您在一个相当任意的时间点终止程序,所以这应该没什么大不了的),那么如果一个简单的 input
语句足以输入终止命令,修改后的方法 1 似乎是需要的方法最少的修改。但是如果运行ning程序一直在输出消息,input
语句显示的文本就会丢失,你需要依赖于为每个进程使用键盘中断处理程序,这比较复杂。如果有任何修改过的示例,您可以使用此技术;我在修改后的方法 2a 中使用它作为示例,因为该代码不适合使用 input
语句技术,因为终端输出太多。毫无疑问,当有any终端输出时,最可靠的方法是使用键盘处理程序中断处理程序方法。只要不需要从任何进程返回 return 值,我仍然倾向于使用方法 1 及其变体而不是进程池: