在 Python 中了解多处理/管理器队列

Undestanding multiprocessing / Manager Queue in Python

我很难理解 Python 多处理中的管理器队列。

我的老师给了我这个代码:

def check_prime(n): test if n is a prime or not and returns a boolean.

def chunks(lst, n):
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

def find_prime_worker(input, output):
    for chunk in iter(input.get,None):
        print('(Pid: {})'.format(getpid()))
        primes_found = list(filter(check_prime,chunk))
        output.put(primes_found)

from multiprocessing import Pool,Manager
from itertools import chain

def calculate_primes(ncore,N,chunksize):
    with Manager() as manager:
        input = manager.Queue()
        output = manager.Queue()

        with Pool(ncore) as p:
            it = p.starmap_async(find_prime_worker,[(input,output)]*ncore)
            for r in chunks(range(1,N),chunksize):
                input.put(r)
            for i in range(ncore): input.put(None)
            it.wait()
            output.put(None)

        res = list(chain(*list(iter(output.get,None))))
    return res

calculate_primes(8,100,5)
  1. 如果在find_prime_worker函数中,我添加

    print('(Pid: {})'.format(getpid()))

calculate_primes() 函数 returns 空结果 []。 如果我删除这一行,功能就可以正常工作。我想知道在不破坏正确功能的情况下正在执行什么进程 ID...请问该怎么做?

  1. 我不明白 calculate_primes() 函数:我们定义了 2 个管理器队列输入和输出。

我不明白为什么我们用 starmap_async 传递给 find_prime_worker 函数,参数大小为 [(input,output)]*ncore.

非常感谢。

如果要打印进程id,需要:

from os import getpid

如果没有该语句,您的工作函数 find_prime_worker 会引发异常,但由于您只是在等待调用工作函数的结果通过调用 it.wait() 准备就绪,而不是实际检索it.get() 的结果,您不会将异常反映回主进程。将您的代码更改为 return_values = it.get() ,您将看到异常。添加 import 语句,异常将消失,并且 return_values 将是 find_prime_worker 中所有 return 值的列表,从那以后它将是 [None] * 8函数隐含地是 returning None 并且它被调用 ncore 或 8 次。顺便说一句,it 这个名字选得不好,因为它表明(至少对我而言)从对 starmap_async 方法的调用中 return 编辑的是一个 iterator 而实际上 returned 是一个 multiprocessing.pool.AsycResult 实例。在我看来,result 是这个 return 值的更好名称。

这让我想到你的第二个问题。该程序试图找到从 1 到 99 的所有素数,并通过创建一个具有 ncore 个进程的进程池来实现,其中 ncore 设置为 8。[(input,output)]*ncore 创建一个包含 8 个元组的列表其中每个元组是 (input,ouput)。因此,语句 it = p.starmap_async(find_prime_worker,[(input,output)]*ncore) 将创建 8 个任务,其中每个任务使用参数 inputoutput 调用 find_prime_worker。因此,这些任务是相同的,因为它们通过从输入队列 input 获取消息来读取输入,并将它们的结果写入输出队列 output。输入队列中的消息只是数字 1 到 99,被分成大小为 5 的范围:range(1, 6)range(6, 11)range(11, 16) ... range(96, 100).


仅在您感兴趣时提供额外信息

该代码的不寻常之处在于,多处理池在内部使用队列来传递参数,并消除了为此目的显式创建队列的必要性。然而,代码正在使用池的内部队列来传递托管队列参数,这些参数保存的值本可以更容易地通过池的参数传递机制透明地处理:

def check_prime(n):
    # Not a real implemntation and not all the values between 1 and 100:
    if n in (3, 5, 7, 11, 13, 17, 19, 23, 97):
        return True
    return False

def chunks(lst, n):
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

def find_prime_worker(chunk):
    from os import getpid

    print('(Pid: {})'.format(getpid()))
    return list(filter(check_prime, chunk))


def calculate_primes(ncore, N, chunksize):
    from multiprocessing import Pool

    with Pool(ncore) as p:
        results = p.map(find_prime_worker, chunks(range(1, N), chunksize))
    res = []
    for result in results:
        res.extend(result)
    return res

# Required for Windows:
if __name__ == '__main__':
    print(calculate_primes(8, 100, 5))

打印:

(Pid: 9440)
(Pid: 9440)
(Pid: 9440)
(Pid: 9440)
(Pid: 9440)
(Pid: 9112)
(Pid: 9440)
(Pid: 9112)
(Pid: 9440)
(Pid: 9440)
(Pid: 9112)
(Pid: 14644)
(Pid: 14644)
(Pid: 9112)
(Pid: 9440)
(Pid: 14644)
(Pid: 9112)
(Pid: 9112)
(Pid: 14644)
(Pid: 9440)
[3, 5, 7, 11, 13, 17, 19, 23, 97]

但让我们坚持使用代码的原始方法。如果您有兴趣,可以使用性能更高的 multiprocessing.Queue 而不是当前使用的 托管队列 来重写代码。但是,它不能作为参数传递给多处理池工作函数。相反,您必须使用 multiprocessing.pool.Pool 构造函数的 initializerinitargs 参数来初始化池中每个进程的全局变量队列参考。由于 worker 函数现在将引用队列作为全局变量,它不再需要任何参数,并且 starmap_async 或任何 map 函数在不返回函数 [=20= 的情况下似乎不再合适] 一个从未使用过的虚拟参数。所以方法 apply_async 似乎是更合乎逻辑的选择:

def init_processes(inq, outq):
    global input, output
    input = inq
    output = outq

def check_prime(n):
    # Not a real implementation and not all the values between 1 and 100:
    if n in (3, 5, 7, 11, 13, 17, 19, 23, 97):
        return True
    return False

def chunks(lst, n):
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

def find_prime_worker():
    from os import getpid

    for chunk in iter(input.get, None):
        print('(Pid: {})'.format(getpid()))
        primes_found = list(filter(check_prime, chunk))
        output.put(primes_found)

def calculate_primes(ncore, N, chunksize):
    from multiprocessing import Pool, Queue
    from itertools import chain

    input = Queue()
    output = Queue()

    with Pool(ncore, initializer=init_processes, initargs=(input, output)) as p:
        results = [p.apply_async(find_prime_worker) for _ in range(ncore)]
        for r in chunks(range(1, N), chunksize):
            input.put(r)
        for i in range(ncore):
            input.put(None)
        # Wait for all tasks to complete.
        # The actual return values are not of interest;
        # they are just None -- this is just for demo purposes but
        # calling result.get() will raise an exception if find_prime_worker
        # raised an exception:
        return_values = [result.get() for result in results]
        print(return_values)

        output.put(None)
        res = list(chain(*list(iter(output.get,None))))
    return res

# Required for Windows:
if __name__ == '__main__':
    print(calculate_primes(8, 100, 5))

打印:

(Pid: 10516)
(Pid: 10516)
(Pid: 10516)
(Pid: 10516)
(Pid: 10516)
(Pid: 10516)
(Pid: 10516)
(Pid: 10516)
(Pid: 10516)
(Pid: 10516)
(Pid: 10516)
(Pid: 10516)
(Pid: 10516)
(Pid: 10516)
(Pid: 10516)
(Pid: 10516)
(Pid: 10516)
(Pid: 10516)
(Pid: 10516)
(Pid: 10516)
[None, None, None, None, None, None, None, None]
[3, 5, 7, 11, 13, 17, 19, 23, 97]

这里有几点需要注意。首先,虽然我们使用的池大小为 8,但我们可以看到单个进程 10516 处理了所有块。这可能是由于 check_prime 太简单了,以至于 calculate_primes 任务能够从输入队列中读取一个块,为队列中的 5 个数字中的每一个调用一次 check_prime块并返回读取所有其他块并在池中的其他进程甚至有机会 运行.

之前重复调用

其次,创建多处理池以及通过队列发送参数和获取结果会产生开销,否则您将无法获得这些开销。 find_prime_worker 必须足够 CPU 密集,以便并行处理补偿额外的开销。我怀疑在这种情况下它确实如此,至少在 find_prime_worker.

的这种实现中没有

第三,这是非常微妙的,使用 multiprocessing.Queue 需要一点小心。在上面的代码中,我们知道 find_prime_worker 除了 None 之外不会 return 任何东西,所以语句 return_values = [result.get() for result in results] 主要用于演示目的,并确保 return_values = [result.get() for result in results] 引发的任何异常=20=] 不会被忽视。但是,一般来说,如果您只想等待所有提交的任务完成而不关心 return 值,而不是保存所有 AsyncResult 个实例 returned 从 apply_async 调用,然后在这些实例上调用 get,可以只调用 p.close(),然后调用 p.join(),在执行这些调用后,您可以确定所有提交的任务都已完成(可能有例外)。这是因为在池实例上调用 join 将等待所有池进程退出,这只会在完成它们的工作或某些异常情况时发生。但是我没有在此代码中使用该方法,因为在读取已写入该队列的所有消息之前,您绝不能加入已写入多处理队列的进程。

但是我们可以改变逻辑,这样我们就知道我们已经从队列中读取了所有可能的消息,方法是让每个工作任务在处理完所有块后写入一条 None 记录:

def init_processes(inq, outq):
    global input, output
    input = inq
    output = outq

def check_prime(n):
    # Not a real implemntation and not all the values between 1 and 100:
    if n in (3, 5, 7, 11, 13, 17, 19, 23, 97):
        return True
    return False

def chunks(lst, n):
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

def find_prime_worker():
    from os import getpid

    for chunk in iter(input.get, None):
        print('(Pid: {})'.format(getpid()))
        primes_found = list(filter(check_prime, chunk))
        if primes_found:
            output.put(primes_found)
    output.put(None)

def calculate_primes(ncore, N, chunksize):
    from multiprocessing import Pool, Queue

    input = Queue()
    output = Queue()

    with Pool(ncore, initializer=init_processes, initargs=(input, output)) as p:
        results = [p.apply_async(find_prime_worker) for _ in range(ncore)]
        for r in chunks(range(1, N), chunksize):
            input.put(r)
        for i in range(ncore):
            input.put(None)
        # We assume find_prime_worker does not raise and exception and
        # there will be ncore None records on the output queue:
        none_seen = 0
        res = []
        while none_seen < ncore:
            result = output.get()
            if result is None:
                none_seen += 1
            else:
                res.extend(result)
        # Now we can, to be "tidy":
        p.close()
        p.join()
    return res

# Required for Windows:
if __name__ == '__main__':
    print(calculate_primes(8, 100, 5))

打印:

(Pid: 15796)
(Pid: 14644)
(Pid: 19100)
(Pid: 15796)
(Pid: 15796)
(Pid: 14644)
(Pid: 15796)
(Pid: 19100)
(Pid: 14644)
(Pid: 14644)
(Pid: 19100)
(Pid: 15796)
(Pid: 14644)
(Pid: 8924)
(Pid: 15796)
(Pid: 19100)
(Pid: 14644)
(Pid: 8924)
(Pid: 15796)
(Pid: 19100)
[3, 5, 17, 19, 7, 23, 11, 13, 97]

请注意,我向 find_prime_worker 添加了额外的检查,因为将空列表写入输出队列没有意义:

        if primes_found:
            output.put(primes_found)

最后,使用输入和输出队列的正常用例是当您不是使用多处理池而是创建自己的multirpocessing.Process实例时,实际上使用这些 Process 个实例和队列来实现您自己的池:

def check_prime(n):
    # Not a real implementation and not all the values between 1 and 100:
    if n in (3, 5, 7, 11, 13, 17, 19, 23, 97):
        return True
    return False

def chunks(lst, n):
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

def find_prime_worker(input, output):
    from os import getpid

    for chunk in iter(input.get, None):
        print('(Pid: {})'.format(getpid()))
        primes_found = list(filter(check_prime, chunk))
        if primes_found:
            output.put(primes_found)
    output.put(None)

def calculate_primes(ncore, N, chunksize):
    from multiprocessing import Process, Queue

    input = Queue()
    output = Queue()

    processes = [Process(target=find_prime_worker, args=(input, output))
                 for _ in range(ncore)
                 ]
    for process in processes:
        process.start()
    for r in chunks(range(1, N), chunksize):
        input.put(r)
    for i in range(ncore):
        input.put(None)

    none_seen = 0
    res = []
    while none_seen < ncore:
        result = output.get()
        if result is None:
            none_seen += 1
        else:
            res.extend(result)

    for process in processes:
        process.join()

    return res

# Required for Windows:
if __name__ == '__main__':
    print(calculate_primes(8, 100, 5))

打印:

(Pid: 18112)
(Pid: 18112)
(Pid: 18112)
(Pid: 18112)
(Pid: 18112)
(Pid: 18112)
(Pid: 18112)
(Pid: 18112)
(Pid: 18112)
(Pid: 18112)
(Pid: 18112)
(Pid: 18112)
(Pid: 18112)
(Pid: 18112)
(Pid: 18112)
(Pid: 18112)
(Pid: 18112)
(Pid: 18112)
(Pid: 18112)
(Pid: 18112)
[3, 5, 7, 11, 13, 17, 19, 23, 97]