加入时挂起的多处理

multiprocessing hanging at join

在任何人将其标记为重复问题之前。这几天我一直在看 Whosebug 的帖子,我还没有真正找到一个好的或令人满意的答案。

我有一个程序,在某些时候会获取单个字符串(还有许多其他参数和对象),对它们进行一些复杂的处理,然后吐出 1 个或多个字符串。因为每个字符串都是单独处理的,所以在这里使用多处理似乎很自然,尤其是因为我在超过 100 个内核的机器上工作。

以下是一个最小的示例,最多可使用 12 到 15 个内核,如果我尝试给它更多的内核,它会挂在 p.join()。我知道它在加入时挂起,因为我试图在加入之前和之后添加一些调试打印,它会在两个打印命令之间的某个时刻停止。

最小示例:

import os, random, sys, time, string
import multiprocessing as mp

letters = string.ascii_uppercase
align_len = 1300

def return_string(queue):
    n_strings = [1,2,3,4]
    alignments = []

    # generating 1 to 4 sequences randomly, each sequence of length 1300
    # the original code might even produce more than 4, but 1 to 4 is an average case
    # instead of the random string there will be some complicated function called
    # in the original code
    for i in range(random.choice(n_strings)):
        alignment = ""
        for i in range(align_len):
            alignment += random.choice(letters)
        alignments.append(alignment)

    for a in alignments:
        queue.put(a)


def run_string_gen(cores):
    processes = []
    queue = mp.Queue()
    # running the target function 1000 time
    for i in range(1000):
        # print(i)
        process = mp.Process(target=return_string, args = (queue,))
        processes.append(process)
        if len(processes) == cores:
            counter = len(processes)
            for p in processes:
                p.start()

            for p in processes:
                p.join()

            while queue.qsize() != 0:
                a = queue.get()
                # the original idea is that instead of print
                # I will be writing to a file that is already open
                print(a)

            processes = []
            queue = mp.Queue()

    # any leftovers processes
    if processes:
        for p in processes:
            p.start()
        for p in processes:
            p.join()
        while queue.qsize() != 0:
            a = queue.get()
            print(a)

if __name__ == "__main__":
    cores = int(sys.argv[1])
    if cores > os.cpu_count():
        cores = os.cpu_count()
    start = time.perf_counter()
    run_string_gen(cores)
    print(f"it took {time.perf_counter() - start}")

怀疑是队列变满了,但也没有那么多字符串,当我给它 20 个核心时,它挂了,但那是关于 20*4=80 个字符串(如果选择总是 4) ,但是队列中有那么多字符串会变满吗?

假设队列已满,我不确定应该在什么时候检查并清空它。在 return_string 内执行似乎不是一个好主意,因为其他一些进程也会有队列并且可能同时清空 it/filling 它。那我要用 lock.acquire()lock.release() 吗? 这些字符串将被添加到一个文件中,因此我可以避免使用队列并将字符串输出到一个文件中。但是,因为启动进程意味着复制对象,所以我无法传递 _io.TextIOWrapper 对象(这是一个要附加到的打开文件),但我需要在使用 [= 进行同步时打开和关闭 return_string 内的文件14=] 和 lock.release(),但是继续打开和关闭输出文件以写入它似乎很浪费。

一些建议的解决方案:

1- 在加入之前取消队列是我找到的答案之一。但是,我无法预料每个过程需要多长时间,并且在 p.start() 循环之后和 p.join() 之前添加一个 sleep 命令是不好的(至少对于我的代码而言),因为如果它们完成得很快我最终等待,那只是浪费了很多时间,整个想法是在这里有速度。

2- 添加某种标志性字符,例如none 了解一名工人是否完成。但是没有得到这部分,如果我 运行 10 个核心的目标函数 10 次,我将有 10 个哨兵,但问题是它挂起并且无法到达队列清空并检查哨兵

关于在这里做什么有什么建议或想法吗?

注意: 答案适用于 Linux 系统,但我想在 Windows.

上会类似

Queue 是使用管道实现的,您似乎达到了容量限制:

男人pipe(7):

If a process attempts to read from an empty pipe, then read(2) will block until data is available. If a process attempts to write to a full pipe (see below), then write(2) blocks until sufficient data has been read from the pipe to allow the write to complete.

然而,Python 队列只会将数据排入底层缓冲区,并且队列线程会在写入管道时阻塞。

Process.join method also blocks so you have to start to consume the data from the queue before that. You can try to create a consumer process or just simplify your code by using Pool.

用单个进程重现问题的简单测试用例:

test.py:

import logging
import multiprocessing as mp
import os


logger = mp.log_to_stderr()
logger.setLevel(logging.DEBUG)

def worker(q, n):
    q.put(os.urandom(2 ** n))


def main():
    q = mp.Queue()

    p = mp.Process(target=worker, args=(q, 17)) # > 65k bytes
    p.start()
    # p.join()


if __name__ == "__main__":
    main()

测试:

$ python test.py
[DEBUG/MainProcess] created semlock with handle 140292518252544
[DEBUG/MainProcess] created semlock with handle 140292517982208
[DEBUG/MainProcess] created semlock with handle 140292517978112
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[INFO/MainProcess] calling join() for process Process-1
[DEBUG/Process-1] Queue._after_fork()
[INFO/Process-1] child process calling self.run()
[DEBUG/Process-1] Queue._start_thread()
[DEBUG/Process-1] doing self._thread.start()
[DEBUG/Process-1] starting thread to feed data to pipe
[DEBUG/Process-1] ... done self._thread.start()
[INFO/Process-1] process shutting down
[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1] telling queue thread to quit
[DEBUG/Process-1] running the remaining "atexit" finalizers
[DEBUG/Process-1] joining queue thread

正如您在上面看到的,它在加入队列线程时阻塞,因为它无法写入管道:

$ sudo strace -ttT -f -p 218650
strace: Process 218650 attached with 2 threads
[pid 218650] 07:51:44.659503 write(4, "7.2)4p6e2485167`X63m@:5g-D34$"..., 4096 <unfinished ...>
[pid 218649] 07:51:44.659563 futex(0x7fe3f8000b60, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME, 0, NULL, FUTEX_BITSET_MATCH_ANY

一旦我们从另一个终端上的管道读取数据,进程就会终止:

$ cat /proc/218650/fd/4 1> /dev/null

...
[DEBUG/Process-1] feeder thread got sentinel -- exiting
[DEBUG/Process-1] ... queue thread joined
[INFO/Process-1] process exiting with exitcode 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers

仔细阅读 `multiprocessing.Queue 的文档。阅读第二个警告,其中部分内容为:

Warning: As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.

This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.

简单来说,您的程序通过在它从队列中读取项目之前加入进程违反了这一点。您必须颠倒操作顺序。那么问题就变成了如果子进程仍在运行并写入队列,主进程如何知道何时停止读取。最简单的解决方案是为每个子进程编写一个特殊的 sentinel 记录作为最后一项,表明该进程不再写入任何项目。然后主进程可以简单地阻塞读取,直到它看到 N 个哨兵记录,其中 N 是它已启动的将写入队列的进程数。哨兵记录必须是任何不会被误认为是要处理的正常项目的唯一记录。 None 就足够了:

import os, random, sys, time, string
import multiprocessing as mp

letters = string.ascii_uppercase
align_len = 1300

SENTINEL = None # no more records sentinel

def return_string(queue):
    n_strings = [1,2,3,4]
    alignments = []

    # generating 1 to 4 sequences randomly, each sequence of length 1300
    # the original code might even produce more than 4, but 1 to 4 is an average case
    # instead of the random string there will be some complicated function called
    # in the original code
    for i in range(random.choice(n_strings)):
        alignment = ""
        for i in range(align_len):
            alignment += random.choice(letters)
        alignments.append(alignment)

    for a in alignments:
        queue.put(a)
    # show this process is through writing records:
    queue.put(SENTINEL)


def run_string_gen(cores):
    processes = []
    queue = mp.Queue()
    # running the target function 1000 time
    for i in range(1000):
        # print(i)
        process = mp.Process(target=return_string, args = (queue,))
        processes.append(process)
        if len(processes) == cores:
            counter = len(processes)
            for p in processes:
                p.start()

            seen_sentinel_count = 0
            while seen_sentinel_count < len(processes):
                a = queue.get()
                if a is SENTINEL:
                    seen_sentinel_count += 1
                # the original idea is that instead of print
                # I will be writing to a file that is already open
                else:
                    print(a)

            for p in processes:
                p.join()

            processes = []
            # The same queue can be reused:
            #queue = mp.Queue()

    # any leftovers processes
    if processes:
        for p in processes:
            p.start()

        seen_sentinel_count = 0
        while seen_sentinel_count < len(processes):
            a = queue.get()
            if a is SENTINEL:
                seen_sentinel_count += 1
            else:
                print(a)

        for p in processes:
            p.join()

if __name__ == "__main__":
    cores = int(sys.argv[1])
    if cores > os.cpu_count():
        cores = os.cpu_count()
    start = time.perf_counter()
    run_string_gen(cores)
    print(f"it took {time.perf_counter() - start}")

打印:

...
NEUNBZVXNHCHVIGNDCEUXJSINEJQNCOWBMUJRTIASUEJHDJUWZIYHHZTJJSJXALZHOEVGMHSVVMMIFZGLGLJDECEWSVZCDRHZWVOMHCDLJVQLQIQCVKBEVOVDWTMFPWIWIQFOGWAOPTJUWKAFBXPWYDIENZTTJNFAEXDVZHXHJPNFDKACCTRTOKMVDGBQYJQMPSQZKDNDYFVBCFMWCSCHTVKURPJDBMRWFQAYIIALHDJTTMSIAJAPLHUAJNMHOKLZNUTRWWYURBTVQHWECAFHQPOZZLVOQJWVLFXUEQYKWEFXQPHKRRHBBCSYZOHUDIFOMBSRNDJNBHDUYMXSMKUOJZUAPPLOFAESZXIETOARQMBRYWNWTSXKBBKWYYKDNLZOCPHDVNLONEGMALL
it took 32.7125509

更新

使用多处理池完成的相同代码,避免了重新创建进程:

import os, random, sys, time, string
import multiprocessing as mp

letters = string.ascii_uppercase
align_len = 1300

SENTINEL = None # no more records sentinel

def return_string():
    n_strings = [1,2,3,4]
    alignments = []

    # generating 1 to 4 sequences randomly, each sequence of length 1300
    # the original code might even produce more than 4, but 1 to 4 is an average case
    # instead of the random string there will be some complicated function called
    # in the original code
    for i in range(random.choice(n_strings)):
        alignment = ""
        for i in range(align_len):
            alignment += random.choice(letters)
        alignments.append(alignment)

    return alignments


def run_string_gen(cores):
    def my_callback(result):
        alignments = result
        for alignment in alignments:
            print(alignment)

    pool = mp.Pool(cores)
    for i in range(1000):
        pool.apply_async(return_string, callback=my_callback)
    # wait for completion of all tasks:
    pool.close()
    pool.join()

if __name__ == "__main__":
    cores = int(sys.argv[1])
    if cores > os.cpu_count():
        cores = os.cpu_count()
    start = time.perf_counter()
    run_string_gen(cores)
    print(f"it took {time.perf_counter() - start}")

打印:

...
OMCRIHWCNDKYBZBTXUUYAGCMRBMOVTDOCDYFGRODBWLIFZZBDGEDVAJAJFXWJRFGQXTSCCJLDFKMOENGAGXAKKFSYXEQOICKWFPSKOHIMCRATLVLVLMGFAWBDIJMZMVMHCXMTVJBSWXTLDHEWYHUMSQZGGFWRMOHKKKGMTFEOTTJDOQMOWWLKTOWHKCIUNINHTGUZHTBGHROPVKQBNEHQWIDCZUOJGHUXLLDGHCNWIGFUCAQAZULAEZPIP
it took 2.1607988999999996