加入多处理超时

Join timeout in multiprocessing

我有一个虚拟示例,我想在其中应用多处理。考虑这样一种情况,您有一个接一个传入的数字流(我称之为帧)。我想将它分配给当前可用的任何单个进程。所以我正在创建 4 个进程,它们是 运行 一个 while 循环,查看队列中是否有任何元素,然后在其上应用函数。

问题是,当我 join 它时,它会卡在任何 while 循环中,即使我在它之前关闭了 while 循环。但不知何故它卡在里面了。

代码:

# step 1, 4 processes
import multiprocessing as mp
import os
import time

class MpListOperations:
    def __init__(self):
        self.results_queue = mp.Manager().Queue()
        self.frames_queue = mp.Manager().Queue()
        self.flag = mp.Manager().Value(typecode='b',value=True)
        self.list_nums = list(range(0,5000))


    def process_list(self):
        print(f"Process id {os.getpid()} started")
        while self.flag.value:
#             print(self.flag.value)
            if self.frames_queue.qsize():
                self.results_queue.put(self.frames_queue.get()**2)


    def create_processes(self, no_of_processes = mp.cpu_count()):
        print("Creating Processes")
        self.processes = [mp.Process(target=self.process_list) for _ in range(no_of_processes)]

    def start_processes(self):
        print(f"starting processes")
        for process in self.processes:
            process.start()

    def join_process(self):
        print("Joining Processes")
        while True:
            if not self.frames_queue.qsize():
                self.flag.value=False
                print("JOININNG HERE")
                for process in self.processes:
                    exit_code = process.join()
                    print(exit_code)
                print("BREAKING DONE")
                break

    def stream_frames(self):
        print("Streaming Frames")
        for frame in self.list_nums:
            self.frames_queue.put(frame)


if __name__=="__main__":
    start = time.time()
    mp_ops = MpListOperations()
    mp_ops.create_processes()
    mp_ops.start_processes()
    mp_ops.stream_frames()
    mp_ops.join_process()
    print(time.time()-start)

现在,如果我在 join 中添加超时参数,甚至 0,即 exit_code = process.join(0) 也可以。我想了解在这种情况下,如果这段代码是正确的,超时值应该是多少?为什么它与超时一起工作而不是没有它?用它实现多处理的正确方法是什么?

目标循环卡在循环的 get() 方法中。这是因为多个进程可以看到队列不为空,但其中只有一个进程能够获取最后一项。其余进程正在等待队列中的下一个项目可用。

当您读取 Queue 对象的大小并获取该队列的对象时,您可能需要添加一个 Lock

或者,您可以通过简单地使用带有超时的 queue.get() 方法来避免读取队列的大小,这样我们就可以定期检查 flag

import queue

TIMEOUT = 1 # seconds

class MpListOperations:
    #[...]
    def process_list(self):
        print(f"Process id {os.getpid()} started")
        previous = self.flag.value 
        while self.flag.value:
            try:
                got = self.frames_queue.get(timeout=TIMEOUT)
            except queue.Empty:
                pass
            else:
                print(f"Gotten {got}")
                self.results_queue.put(got**2)
            _next = self.flag.value
            if previous != _next:
                print(f"Flag change: {_next}")
$ python ./test_mp.py
Creating Processes
starting processes
Process id 36566 started
Streaming Frames
Process id 36565 started
Process id 36564 started
Process id 36570 started
Process id 36567 started
Gotten 0
Process id 36572 started
Gotten 1
Gotten 2
Gotten 3
Process id 36579 started
Gotten 4
Gotten 5
Gotten 6
Process id 36583 started
Gotten 7

# [...]

Gotten 4997
Joining Processes
Gotten 4998
Gotten 4999
JOININNG HERE
Flag change: False
Flag change: False
Flag change: False
Flag change: False
Flag change: False
Flag change: False
Flag change: False
Flag change: False
Exit code : None
Exit code : None
Exit code : None
Exit code : None
Exit code : None
Exit code : None
Exit code : None
Exit code : None
BREAKING DONE
1.4375360012054443

或者,使用 multiprocessing.Pool 对象:

def my_func(arg):
    time.sleep(0.002)
    return arg**2

def get_input():
    for i in range(5000):
        yield i
        time.sleep(0.001)

if __name__=="__main__":
    start = time.time()
    mp_pool = mp.Pool()
    result = mp_pool.map(my_func, get_input())
    mp_pool.close()
    mp_pool.join()
    print(len(result))
    print(f"Duration: {time.time()-start}")

给予:

$ python ./test_mp.py
5000
Duration: 6.847279787063599

如果您查看托管队列的文档,您会发现 qsize 方法只是 returns 一个近似大小。因此,当所有项目都已从帧队列中取出时,我不会使用它进行测试。大概您想让进程 运行 直到处理完所有帧。我知道的最简单的方法是在实际帧被放置后将 N sentinel 项放入帧队列,其中 N 是从队列中获取的进程数。 sentinel item 是一个特殊的值,不能被误认为是一个实际的帧,并向进程发出信号,表明没有更多的项目可以从队列中获取(即准 end-of-file 项目)。在这种情况下,我们可以使用 None 作为标记项。然后每个进程继续在队列上执行获取操作,直到它看到标记项然后终止。因此不需要 self.flag 属性。

这是更新后的简化代码。我做了一些其他的小改动,这些改动已被评论:

import multiprocessing as mp
import os
import time

class MpListOperations:
    def __init__(self):
        # Only create one manager process:
        manager = mp.Manager()
        self.results_queue = manager.Queue()
        self.frames_queue = manager.Queue()
        # No need to convert range to a list:
        self.list_nums = range(0, 5000)


    def process_list(self):
        print(f"Process id {os.getpid()} started")
        while True:
            frame = self.frames_queue.get()
            if frame is None: # Sentinel?
                # Yes, we are done:
                break
            self.results_queue.put(frame ** 2)


    def create_processes(self, no_of_processes = mp.cpu_count()):
        print("Creating Processes")
        self.no_of_processes = no_of_processes
        self.processes = [mp.Process(target=self.process_list) for _ in range(no_of_processes)]

    def start_processes(self):
        print("Starting Processes")
        for process in self.processes:
            process.start()

    def join_processes(self):
        print("Joining Processes")
        for process in self.processes:
            # join returns None:
            process.join()

    def stream_frames(self):
        print("Streaming Frames")
        for frame in self.list_nums:
            self.frames_queue.put(frame)
        # Put sentinels:
        for _ in range(self.no_of_processes):
            self.frames_queue.put(None)


if __name__== "__main__":
    start = time.time()
    mp_ops = MpListOperations()
    mp_ops.create_processes()
    mp_ops.start_processes()
    mp_ops.stream_frames()
    mp_ops.join_processes()
    print(time.time()-start)

打印:

Creating Processes
Starting Processes
Process id 28 started
Process id 29 started
Streaming Frames
Process id 33 started
Process id 31 started
Process id 38 started
Process id 44 started
Process id 42 started
Process id 45 started
Joining Processes
2.3660173416137695

注意 Windows

我修改了方法 start_processes 以暂时将属性 self.processes 设置为 None:

    def start_processes(self):
        print("Starting Processes")
        processes = self.processes
        # Don't try to pickle list of processes:
        self.processes = None
        for process in processes:
            process.start()
        # Restore attribute:
        self.processes = processes

否则,在 Windows 下,我们会在尝试 serialize/deserialize 包含两个或更多 multiprocessing.Process 个实例的进程列表时遇到 pickle 错误。错误是“类型错误:无法腌制 'weakref' 对象。”这可以用下面的代码演示,我们首先尝试 pickle 1 个进程的列表,然后是 2 个进程的列表:

import multiprocessing as mp
import os

class Foo:
    def __init__(self, number_of_processes):
        self.processes = [mp.Process(target=self.worker) for _ in range(number_of_processes)]
        self.start_processes()
        self.join_processes()

    def start_processes(self):
        processes = self.processes
        for process in self.processes:
            process.start()

    def join_processes(self):
        for process in self.processes:
            process.join()

    def worker(self):
        print(f"Process id {os.getpid()} started")
        print(f"Process id {os.getpid()} ended")


if __name__== "__main__":
    foo = Foo(1)
    foo = Foo(2)

打印:

Process id 7540 started
Process id 7540 ended
Traceback (most recent call last):
  File "C:\Booboo\test\test.py", line 26, in <module>
    foo = Foo(2)
  File "C:\Booboo\test\test.py", line 7, in __init__
    self.start_processes()
  File "C:\Booboo\test\test.py", line 13, in start_processes
    process.start()
  File "C:\Program Files\Python38\lib\multiprocessing\process.py", line 121, in start
    self._popen = self._Popen(self)
  File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 327, in _Popen
    return Popen(process_obj)
  File "C:\Program Files\Python38\lib\multiprocessing\popen_spawn_win32.py", line 93, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Program Files\Python38\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle 'weakref' object
Process id 18152 started
Process id 18152 ended
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Program Files\Python38\lib\multiprocessing\spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "C:\Program Files\Python38\lib\multiprocessing\spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
EOFError: Ran out of input