使用多处理使处理列表的速度加倍

Using multiprocessing to double the speed of working on a list

假设我有一个这样的列表:

list_base = ['a','b','c','d']

如果我使用 for xxx in list_base:,循环将一次解析列表的一个值。如果我想将这项工作的速度提高一倍,我将创建一个包含两个值的列表以一次迭代并调用 multiprocessing.

基本示例

代码 1 (main_code.py):

import api_values

if __name__ == '__main__':
    list_base = ['a','b','c','d']
    api_values.main(list_base)

代码 2 (api_values.py):

import multiprocessing
import datetime

def add_hour(x):
    return str(x) + ' - ' + datetime.datetime.now().strftime('%d/%m/%Y %H:%M')

def main(list_base):
    a = list_base
    a_pairs = [a[i:i+2] for i in range(0, len(a)-1, 2)]
    if (len(a) % 2) != 0:
        a_pairs.append([a[-1]])  

    final_list = []

    for a, b in a_pairs:
        mp_1 = multiprocessing.Process(target=add_hour, args=(a,))
        mp_2 = multiprocessing.Process(target=add_hour, args=(b,))
        mp_1.start()
        mp_2.start()
        mp_1.join()
        mp_2.join()
        final_list.append(mp_1)
        final_list.append(mp_2)

    print(final_list)

当我分析 final_list 打印时,它提供如下值:

[
<Process name='Process-1' pid=9564 parent=19136 stopped exitcode=0>, 
<Process name='Process-2' pid=5400 parent=19136 stopped exitcode=0>, 
<Process name='Process-3' pid=13396 parent=19136 stopped exitcode=0>, 
<Process name='Process-4' pid=5132 parent=19136 stopped exitcode=0>
]

我无法通过调用 add_hour(x) 函数获得我想要征服的 return 值。

我在这个问题中找到了一些答案:
How can I recover the return value of a function passed to multiprocessing.Process?

但是我无法将我正在使用的场景带到需要函数内部而不是 if __name__ == '__main__':

的地方 multiprocessing

尝试使用它时,它总是会生成与创建的代码结构的位置相关的错误,我需要一些帮助,以便能够根据我的需要可视化使用。

注:
这些代码是一个基本的示例,我的实际用途是从允许最多同时调用两个的 API 中提取数据。

附加代码:

根据@Timus 的评论 (You might want to look into a **Pool** and **.apply_async**),我看到这段代码,在我看来它可以工作,但我不知道它是否可靠,是否需要对其进行任何改进使用并且这个选项是最好的,随时在答案中更新:

import multiprocessing
import datetime

final_list = []

def foo_pool(x):
    return str(x) + ' - ' + datetime.datetime.now().strftime('%d/%m/%Y %H:%M:%S')

def log_result(result):
    final_list.append(result)

def main(list_base):
    pool = multiprocessing.Pool()
    a = list_base
    a_pairs = [a[i:i+2] for i in range(0, len(a)-1, 2)]
    if (len(a) % 2) != 0:
        a_pairs.append([a[-1]])

    for a, b in a_pairs:
        pool.apply_async(foo_pool, args = (a, ), callback = log_result)
        pool.apply_async(foo_pool, args = (b, ), callback = log_result)
    pool.close()
    pool.join()

    print(final_list)

我认为您需要在进程之间共享字符串。它们可以从 multiprocessing.Manager().

获得

您的 api_values.py 应如下所示:

import multiprocessing
import datetime
from ctypes import c_wchar_p

def add_hour(x, ret_str):
    ret_str.value = str(x) + ' - ' + datetime.datetime.now().strftime('%d/%m/%Y %H:%M')

def main(list_base):
    a = list_base
    a_pairs = [a[i:i+2] for i in range(0, len(a)-1, 2)]
    if (len(a) % 2) != 0:
        a_pairs.append([a[-1]])  

    final_list = []
    manager = multiprocessing.Manager()

    for a, b in a_pairs:
        
        ret_str_a = manager.Value(c_wchar_p, "")
        ret_str_b = manager.Value(c_wchar_p, "")
        mp_1 = multiprocessing.Process(target=add_hour, args=(a, ret_str_a))
        mp_2 = multiprocessing.Process(target=add_hour, args=(b, ret_str_b))
        mp_1.start()
        mp_2.start()
        mp_1.join()
        mp_2.join()
        final_list.append(ret_str_a.value)
        final_list.append(ret_str_b.value)

    print(final_list)

来源:How to share a string amongst multiple processes using Managers() in Python?

你不必使用回调:Pool.apply_async() 给你一个 return(一个 AsyncResult 对象),它有一个 .get() 方法来检索结果的提交。延长您的尝试时间:

import time
import multiprocessing
import datetime
from os import getpid

def foo_pool(x):
    print(getpid())
    time.sleep(2)
    return str(x) + ' - ' + datetime.datetime.now().strftime('%d/%m/%Y %H:%M:%S')

def main(list_base):
    a = list_base
    a_pairs = [a[i:i+2] for i in range(0, len(a)-1, 2)]
    if (len(a) % 2) != 0:
        a_pairs.append([a[-1]])  

    final_list = []
    with multiprocessing.Pool(processes=2) as pool:
        for a, b in a_pairs:
            res_1 = pool.apply_async(foo_pool, args=(a,))
            res_2 = pool.apply_async(foo_pool, args=(b,))
            final_list.extend([res_1.get(), res_2.get()])

    print(final_list)

if __name__ == '__main__':
    list_base = ['a','b','c','d']
    start = time.perf_counter()
    main(list_base)
    end = time.perf_counter()
    print(end - start)

我已将 print(getpid()) 添加到 foo_pool 以表明您实际上使用的是不同的进程。我用 time 来说明,尽管 foo_pool 中有 time.sleep(2),但 main 的总持续时间不超过 2 秒。