什么时候使用多处理?

When to use multiprocessing?

所以,我最近一直在探索多处理和多线程主题。而且我发现,在某些情况下,使用多进程和多线程不会提高我的代码速度。举个例子:

import multiprocessing as mp
import time

dummyList = [1,2,3,4,5,6]
dummyList2 = ['a','b','c','d','e','f']

q_list = mp.Queue()
for i, j  in zip(dummyList, dummyList2):
    q_list.put(i)
    q_list.put(j)

def f(queue):
    q = queue.get()
    print(q)

# if __name__ == "__main__":
#     start = time.perf_counter()
#     while not q_list.empty():
#         p1 = mp.Process(target=f, args=[q_list])
#         p2 = mp.Process(target=f, args=[q_list])
#         p1.start()
#         p2.start()
#         p1.join()
#         p2.join()
#     finish = time.perf_counter()
#     print(f'elaspse time = {finish - start} second(s)')

start = time.perf_counter()
while not q_list.empty():
    f(q_list)
finish = time.perf_counter()
print(f'elaspse time = {finish - start} second(s)')

在上面的代码中,我尝试从 multiprocessing.Queue class 中取出一项并打印它直到队列为空。我认为在这种情况下使用多处理会提高速度。令人惊讶的是,它没有提高速度,反而变慢了!所增巨异也。不使用多进程只需要2ms,而使用多进程需要690ms。

谁能给我解释一下,为什么会这样?什么时候实际上是使用 multiprocessing/multithreading 的最佳时间。泰

首先在创建新进程时会产生开销,在读取和写入多处理队列时会产生开销,而将参数传递给同一进程中的函数 运行 则没有这些开销。这意味着您的“工人”功能,f 在这种情况下,必须足够“CPU 密集”以证明我刚才提到的额外开销是合理的。

正如文章 What are the differences between the threading and multiprocessing modules? 指出的那样,多线程不适合 CPU 密集型函数,因为它会争用全局解释器锁。但是因为创建线程的开销远小于进程,所以它最适合大多数等待 I/O 完成的功能,比如从网站上获取一个 URL,那里很少CPU 涉及处理。

查看以下两个基准,其中函数全部为 CPU 而没有 I/O,因此可能是多处理的候选者。它比较了单处理与多处理,在第一种情况下,我们有一个非 CPU 密集型函数,其中多处理会损害性能,而在第二种情况下,我们有一个 CPU 密集型函数,其中多处理提高性能:

import multiprocessing as mp
import time

QUARTER_SECOND_ITERATIONS = 5_000_000

def quarter_second():
    sum = 0
    for _ in range(QUARTER_SECOND_ITERATIONS):
        sum += 1
    return sum

# non-multiprocessing version:
def compute_square(x, cpu_intensive):
    """ Compute x ** 2 """
    if cpu_intensive:
        quarter_second()
    return x ** 2

# multiprocessing version
def m_compute_square(input_q, output_q, cpu_intensive):
    """ Compute x ** 2: """
    while True:
        x = input_q.get()
        if x is None: # our signal to terminate
            break
        if cpu_intensive:
            quarter_second()
        output_q.put(x, x ** 2)

def main():
    numbers = range(1, 101)

    for intensive in (False, True):
        t0 = time.perf_counter()
        results = [compute_square(x, cpu_intensive=intensive) for x in numbers]
        t1 = time.perf_counter()
        print(f'Non-multiprocessing time = {t1 - t0}, intensive = {intensive}')
        t0 = time.perf_counter()
        input_queue = mp.Queue()
        output_queue = mp.Queue()
        for x in numbers:
            input_queue.put(x)
        # Put two "no more input" indicators:
        input_queue.put(None)
        input_queue.put(None)
        p1 = mp.Process(target=m_compute_square, args=(input_queue, output_queue, intensive))
        p2 = mp.Process(target=m_compute_square, args=(input_queue, output_queue, intensive))
        p1.start()
        p2.start()
        results = [output_queue.get() for _ in range(100)]
        p1.join()
        p2.join()
        t1 = time.perf_counter()
        print(f'Mutiprocessing time = {t1 - t0}, intensive = {intensive}')

# Required for Windows:
if __name__=='__main__':
    main()

打印:

Non-multiprocessing time = 3.600000000000825e-05, intensive = False
Mutiprocessing time = 0.1501859, intensive = False
Non-multiprocessing time = 25.417471099999997, intensive = True
Mutiprocessing time = 14.596532500000002, intensive = True

使用多处理池

import multiprocessing as mp
from functools import partial
import time


QUARTER_SECOND_ITERATIONS = 5_000_000

def quarter_second():
    sum = 0
    for _ in range(QUARTER_SECOND_ITERATIONS):
        sum += 1
    return sum

# non-multiprocessing version:
def compute_square(x, cpu_intensive):
    """ Compute x ** 2 """
    if cpu_intensive:
        quarter_second()
    return x ** 2

def main():
    numbers = range(1, 101)

    for intensive in (False, True):
        t0 = time.perf_counter()
        results = [compute_square(x, cpu_intensive=intensive) for x in numbers]
        t1 = time.perf_counter()
        print(f'Non-multiprocessing time = {t1 - t0}, intensive = {intensive}')
        t0 = time.perf_counter()
        # create processing pool using all 8 processors:
        with mp.Pool(8) as pool:
            worker = partial(compute_square, cpu_intensive=intensive)
            results = pool.map(worker, numbers)
        t1 = time.perf_counter()
        print(f'Mutiprocessing time = {t1 - t0}, intensive = {intensive}')

# Required for Windows:
if __name__=='__main__':
    main()

打印:

Non-multiprocessing time = 3.9300000000006e-05, intensive = False
Mutiprocessing time = 0.22172129999999995, intensive = False
Non-multiprocessing time = 26.1021124, intensive = True
Mutiprocessing time = 7.3056439, intensive = True

使用多线程池

from multiprocessing.pool import ThreadPool
from functools import partial
import time


QUARTER_SECOND_ITERATIONS = 5_000_000

def quarter_second():
    sum = 0
    for _ in range(QUARTER_SECOND_ITERATIONS):
        sum += 1
    return sum

# non-multithreading version:
def compute_square(x, cpu_intensive):
    """ Compute x ** 2 """
    if cpu_intensive:
        quarter_second()
    return x ** 2

def main():
    numbers = range(1, 101)

    for intensive in (False, True):
        t0 = time.perf_counter()
        results = [compute_square(x, cpu_intensive=intensive) for x in numbers]
        t1 = time.perf_counter()
        print(f'Non-multithreading time = {t1 - t0}, intensive = {intensive}')
        t0 = time.perf_counter()
        # create processing pool using all processors:
        with ThreadPool(8) as pool:
            worker = partial(compute_square, cpu_intensive=intensive)
            results = pool.map(worker, numbers)
        t1 = time.perf_counter()
        print(f'Mutithreading time = {t1 - t0}, intensive = {intensive}')

# Required for Windows:
if __name__=='__main__':
    main()

打印:

Non-multithreading time = 3.0000000000002247e-05, intensive = False
Mutithreading time = 0.03963000000000001, intensive = False
Non-multithreading time = 26.428487699999998, intensive = True
Mutithreading time = 29.0095318, intensive = True

因为“worker”函数是纯粹的CPU,多线程不能提高性能,实际上只会增加额外的开销。

Worker 函数主要是“I/O”的多线程池

在以下基准测试中,compute_square 通过休眠模拟等待 I/O 完成。在这种情况下,它是多线程的候选者,因为它花费大部分时间不执行实际的 Python 字节码,因此几乎没有争用全局解释器锁。

from multiprocessing.pool import ThreadPool
from functools import partial
import time

def compute_square(x):
    """ Compute x ** 2 """
    time.sleep(.25)
    return x ** 2

def main():
    numbers = range(1, 101)

    t0 = time.perf_counter()
    results = [compute_square(x) for x in numbers]
    t1 = time.perf_counter()
    print(f'Non-multithreading time = {t1 - t0}')
    t0 = time.perf_counter()
    # create pool using all processors:
    with ThreadPool(8) as pool:
        results = pool.map(compute_square, numbers)
    t1 = time.perf_counter()
    print(f'Mutithreading time = {t1 - t0}')

if __name__=='__main__':
    main()

打印:

Non-multithreading time = 25.1188871
Mutithreading time = 4.039328099999999