多处理嵌套 for 循环与计数器

Multiprocessing nested for loop with counter

我正在寻找可以帮助我充分利用 PC 的全部功能来处理数据的简单解决方案。我认为,将任务划分到不同的核心将有助于减少处理时间,但我不知道该怎么做,我在 Whosebug 上搜索了类似的问题,但没有任何解决方案可以解决我的问题。我正在处理长度约为:3000 的数据,并且由于我使用嵌套 for 循环来查找列表中相似(在 +- 0.5 范围内)元素的数量,它将 运行 3000x3000 次,这大约需要 2 分钟,并且我想减少花费的时间。

repeat= []
values = []
for i in completeList:
    count = 0
    for j in completeList:
        if isfloat(i) and isfloat(j):
            if float(i)-0.5 <= float(j) <= float(i)+0.5:
                count = count + 1
    repeat.append(count)
    values.append(i)

如有任何帮助,我们将不胜感激。

关于, 马尼什

由于您仍然没有 post isfloat 的实际代码或显示 completeList 的元素是什么样的,我能做的最好的就是猜测它们可能是什么.它有所不同,因为正如我提到的,执行 isfloatfloat 转换 completeList 的元素所需的 CPU 越多,使用获得的收益就越大多处理。

对于CASE 1我假设completeList是由字符串组成,isfloat需要使用正则表达式来判断字符串是否匹配我们预期的浮点格式和 float 因此需要从字符串转换。这将是我想象中最 CPU 密集的案例。因为CASE 2completeList是由float组成的,isfloat只是returnsTruefloat不用做任何真正的转换。

我的桌面有 8 个核心处理器:

案例 1

import multiprocessing as mp
import time
import random
import re
from functools import partial

def isfloat(s):
    return not re.fullmatch(r'\d*\.\d+', s) is None

def single_process(complete_list):
    #repeat = []
    values = []
    for idx_i, v_i in enumerate(complete_list):
        count = 0
        for idx_j, v_j in enumerate(complete_list):
            if idx_i == idx_j:
                continue # don't compare an element with itself
            if isfloat(v_i) and isfloat(v_j):
                f_i = float(v_i)
                if f_i-0.5 <= float(v_j) <= f_i+0.5:
                    count = count + 1
        # repeat will end up being a copy of complete_list
        # why are we doing this?
        #repeat.append(v_i)
        values.append(count) # these are actually counts
    return values


def multi_worker(complete_list, index_range):
    values = []
    for idx_i in index_range:
        v_i = complete_list[idx_i]
        count = 0
        for idx_j, v_j in enumerate(complete_list):
            if idx_i == idx_j:
                continue # don't compare an element with itself
            if isfloat(v_i) and isfloat(v_j):
                f_i = float(v_i)
                if f_i-0.5 <= float(v_j) <= f_i+0.5:
                    count = count + 1
        values.append(count) # these are actually counts
    return values


def multi_process(complete_list):

    def split(a, n):
        k, m = divmod(len(a), n)
        return (a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))

    n = len(complete_list)
    POOL_SIZE = mp.cpu_count()
    range_splits = split(range(0, n), POOL_SIZE)
    pool = mp.Pool(POOL_SIZE)
    value_lists = pool.map(partial(multi_worker, complete_list), range_splits)
    values = []
    # join results together:
    for value_list in value_lists:
        values.extend(value_list)
    return values

def main():
    # generate 3000 random numbers:
    random.seed(0)
    complete_list = [str(random.uniform(1.0, 3.0)) for _ in range(3000)]
    t = time.time()
    values = single_process(complete_list)
    print(time.time() - t, values[0:10], values[-10:-1])

    t = time.time()
    values = multi_process(complete_list)
    print(time.time() - t, values[0:10], values[-10:-1])


# required for Windows:
if __name__ == '__main__':
    main()

打印:

27.7540442943573 [1236, 1491, 1464, 1477, 1494, 1472, 1410, 1450, 1502, 1537] [1485, 1513, 1513, 1501, 1283, 1538, 804, 1459, 1457]
7.187546253204346 [1236, 1491, 1464, 1477, 1494, 1472, 1410, 1450, 1502, 1537] [1485, 1513, 1513, 1501, 1283, 1538, 804, 1459, 1457]

案例 2

import multiprocessing as mp
import time
import random
from functools import partial

def isfloat(s):
    return True

def single_process(complete_list):
    values = []
    for idx_i, v_i in enumerate(complete_list):
        count = 0
        for idx_j, v_j in enumerate(complete_list):
            if idx_i == idx_j:
                continue # don't compare an element with itself
            if isfloat(v_i) and isfloat(v_j):
                f_i = float(v_i)
                if f_i-0.5 <= float(v_j) <= f_i+0.5:
                    count = count + 1
        values.append(count) # these are actually counts
    return values


def multi_worker(complete_list, index_range):
    values = []
    for idx_i in index_range:
        v_i = complete_list[idx_i]
        count = 0
        for idx_j, v_j in enumerate(complete_list):
            if idx_i == idx_j:
                continue # don't compare an element with itself
            if isfloat(v_i) and isfloat(v_j):
                f_i = float(v_i)
                if f_i-0.5 <= float(v_j) <= f_i+0.5:
                    count = count + 1
        values.append(count) # these are actually counts
    return values


def multi_process(complete_list):

    def split(a, n):
        k, m = divmod(len(a), n)
        return (a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))

    n = len(complete_list)
    POOL_SIZE = mp.cpu_count()
    range_splits = split(range(0, n), POOL_SIZE)
    pool = mp.Pool(POOL_SIZE)
    value_lists = pool.map(partial(multi_worker, complete_list), range_splits)
    values = []
    # join results together:
    for value_list in value_lists:
        values.extend(value_list)
    return values

def main():
    # generate 3000 random numbers:
    random.seed(0)
    complete_list = [random.uniform(1.0, 3.0) for _ in range(3000)]
    t = time.time()
    values = single_process(complete_list)
    print(time.time() - t, values[0:10], values[-10:-1])

    t = time.time()
    values = multi_process(complete_list)
    print(time.time() - t, values[0:10], values[-10:-1])


# required for Windows:
if __name__ == '__main__':
    main()

打印:

4.181002378463745 [1236, 1491, 1464, 1477, 1494, 1472, 1410, 1450, 1502, 1537] [1485, 1513, 1513, 1501, 1283, 1538, 804, 1459, 1457]
1.325998067855835 [1236, 1491, 1464, 1477, 1494, 1472, 1410, 1450, 1502, 1537] [1485, 1513, 1513, 1501, 1283, 1538, 804, 1459, 1457]

结果

案例 1 的加速比为 3.86,案例 2 的加速比仅为 3.14。