如何在 "big data" 列表字典上并行计算?

How to parallelize computation on "big data" dictionary of lists?

我有一个关于在 python 字典上进行计算的问题——在这种情况下,字典有数百万个键,列表同样很长。是否可以在这里使用并行化似乎存在分歧,所以我会在这里更明确地提出这个问题。这是原始问题:

这是一个玩具(小)python字典:

example_dict1 = {'key1':[367, 30, 847, 482, 887, 654, 347, 504, 413, 821],
    'key2':[754, 915, 622, 149, 279, 192, 312, 203, 742, 846], 
    'key3':[586, 521, 470, 476, 693, 426, 746, 733, 528, 565]}

假设我需要解析列表的值,我已将其实现到以下简单(玩具)函数中:

def manipulate_values(input_list):
    return_values = []
    for i in input_list:
        new_value = i ** 2 - 13
        return_values.append(new_value)
    return return_values

现在,我可以很容易地解析这个字典的值如下:

for key, value in example_dict1.items():
    example_dict1[key] = manipulate_values(value)

结果如下:

example_dict1 = {'key1': [134676, 887, 717396, 232311, 786756, 427703, 120396, 254003, 170556, 674028], 
     'key2': [568503, 837212, 386871, 22188, 77828, 36851, 97331, 41196, 550551, 715703], 
     'key3': [343383, 271428, 220887, 226563, 480236, 181463, 556503, 537276, 278771, 319212]}

问题:为什么我不能使用多线程来做这个计算,例如三个线程,一个用于 key1key2key3concurrent.futures.ProcessPoolExecutor() 会在这里工作吗?

原题:有没有更好的方法来优化这个要快?

Q : "Why couldn't I use multiple threads to do this calculation, e.g. three threads, one for key1, key2, and key3?"

你可以,但对性能没有合理影响 - 了解有关 python 如何处理基于线程的执行流的所有细节在这里是最重要的。 Learn about the GIL-lock trick, used right for it avoiding any concurrent processing and its effects on performance 你得到了 WHY 部分。

Q : "Would concurrent.futures.ProcessPoolExecutor() work here?"

会。

然而其净效应(如果 "faster" 与纯 [SERIAL] 处理流程相比)将取决于给定的大小"large"-列表(警告为 (cit.)"millions of keys, and the lists are similarly long." 上面)应该被复制(RAM-I/O)并传递(SER/DES-processed + IPC-transferred)到生成的(基于进程的)远程执行器池。

多次重复 RAM-I/O + SER/DES 附加管理费用很快就会占主导地位。

一个RAM-I/O复制步骤:

>>> from zmq import Stopwatch; aClk = Stopwatch()

>>> aClk.start(); aList = [ i for i in range( int( 1E4 ) ) ]; aClk.stop()
   1345 [us] to copy a List of 1E4 elements
>>> aClk.start(); aList = [ i for i in range( int( 1E5 ) ) ]; aClk.stop()
  12776 [us] to copy a List of 1E5 elements
>>> aClk.start(); aList = [ i for i in range( int( 1E6 ) ) ]; aClk.stop()
 149197 [us] to copy a List of 1E6 elements
>>> aClk.start(); aList = [ i for i in range( int( 1E7 ) ) ]; aClk.stop()
1253792 [us] to copy a List of 1E7 elements
|  |::: [us]
|  +--- [ms]
+------ [ s]

SER/DES 步骤:

>>> import pickle
>>> aClk.start(); _ = pickle.dumps( aList ); aClk.stop()
 608323 
 615851
 638821 [us] to copy pickle.dumps() a List of 1E7 elements
|  |::: [us]
|  +--- [ms]
+------ [ s]

因此,预期的每批次附加开销是 ~ 2 x ( 1253 + 608 ) [ms] + IPC-transfer 一次拍摄的成本共 1E7 件商品

manipulate_values()的实际有用工作负载太小,所有附加成本的总和几乎无法覆盖增加的费用,相关通过在远程工作人员池中分配工作单元。矢量化形式的计算有望带来更智能的结果。这里的附加成本远远大于少量有用的工作。

架构将更多地取决于传递 "there" 的 SER/DES 参数的开销成本加上 SER/DES 的附加成本] 返回的结果 "back" - 所有这些都将决定净效应( anti-加速 << 1.0 x 经常在用例中观察到,但引入时设计方面的工程实践很差,没有后期的基准可以挽救已经被烧毁的人*天,浪费在如此糟糕的设计决策中)

python 线程不会真正帮助您并行处理,因为它们是在同一个线程上执行的 "real CPU thread",python 线程在您处理异步 HTTP 调用时很有帮助

关于ProcessPoolExecutor 来自 docs:

concurrent.futures.ProcessPoolExecutor()

The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.

如果你需要高CPU处理可以帮助你,你可以使用:

import concurrent


def manipulate_values(k_v):
    k, v = k_v
    return_values = []
    for i in v :
        new_value = i ** 2 - 13
        return_values.append(new_value)
    return k, return_values


with concurrent.futures.ProcessPoolExecutor() as executor:
        example_dict = dict(executor.map(manipulate_values, example_dict1.items()))

这是一个简单的基准测试,使用简单的 for 循环来处理您的数据与使用 ProcessPoolExecutor 相比,我的场景假设对于要处理的每个项目,您需要大约 50 毫秒 CPU 时间:

如果要处理的每个项目的 CPU 时间较长

,您可以看到 ProcessPoolExecutor 的真正好处
from simple_benchmark import BenchmarkBuilder
import time
import concurrent

b = BenchmarkBuilder()

def manipulate_values1(k_v):
    k, v = k_v
    time.sleep(0.05)
    return k, v

def manipulate_values2(v):
    time.sleep(0.05)
    return v

@b.add_function()
def test_with_process_pool_executor(d):
    with concurrent.futures.ProcessPoolExecutor() as executor:
            return dict(executor.map(manipulate_values1, d.items()))

@b.add_function()       
def test_simple_for_loop(d):
    for key, value in d.items():
        d[key] = manipulate_values2((key, value))


@b.add_arguments('Number of keys in dict')
def argument_provider():
    for exp in range(2, 10):
        size = 2**exp
        yield size, {i: [i] * 10_000 for i in range(size)}

r = b.run()
r.plot()

如果您没有为 ProcessPoolExecutor 设置工作人员数量,则默认工作人员数量将等于您机器上的处理器数量(对于基准测试,我使用了 8 CPU).


但在您的情况下,根据您问题中提供的数据,处理 1 个项目将花费 ~3 微秒:

%timeit manipulate_values([367, 30, 847, 482, 887, 654, 347, 504, 413, 821])
2.32 µs ± 25.8 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

在这种情况下,基准将如下所示:

因此,如果 CPU 处理一项的时间很短,那么最好使用简单的 for 循环。


@user3666197 提出的一个好观点是当你有巨大的 items/lists 时,我使用列表中的 1_000_000_000 个随机数对这两种方法进行了基准测试:

你可以看到在这种情况下更适合使用ProcessPoolExecutor

from simple_benchmark import BenchmarkBuilder
import time
import concurrent
from random import choice

b = BenchmarkBuilder()

def manipulate_values1(k_v):
    k, v = k_v
    return_values = []
    for i in v:
        new_value = i ** 2 - 13
        return_values.append(new_value)

    return k, return_values

def manipulate_values2(v):
    return_values = []
    for i in v:
        new_value = i ** 2 - 13
        return_values.append(new_value)
    return return_values

@b.add_function()
def test_with_process_pool_executor(d):
    with concurrent.futures.ProcessPoolExecutor() as executor:
            return dict(executor.map(manipulate_values1, d.items()))

@b.add_function()       
def test_simple_for_loop(d):
    for key, value in d.items():
        d[key] = manipulate_values2(value)


@b.add_arguments('Number of keys in dict')
def argument_provider():
    for exp in range(2, 5):
        size = 2**exp
        yield size, {i: [choice(range(1000)) for _ in range(1_000_000)] for i in range(size)}

r = b.run()
r.plot()

预计因为处理一项需要 ~209 毫秒:

l = [367] * 1_000_000
%timeit manipulate_values2(l)
# 209 ms ± 1.45 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

仍然,最快的选择是使用 numpy.arraysfor 循环解决方案:

from simple_benchmark import BenchmarkBuilder
import time
import concurrent
import numpy as np

b = BenchmarkBuilder()

def manipulate_values1(k_v):
    k, v = k_v
    return k,  v ** 2 - 13

def manipulate_values2(v):
    return v ** 2 - 13

@b.add_function()
def test_with_process_pool_executor(d):
    with concurrent.futures.ProcessPoolExecutor() as executor:
            return dict(executor.map(manipulate_values1, d.items()))

@b.add_function()       
def test_simple_for_loop(d):
    for key, value in d.items():
        d[key] = manipulate_values2(value)


@b.add_arguments('Number of keys in dict')
def argument_provider():
    for exp in range(2, 7):
        size = 2**exp
        yield size, {i: np.random.randint(0, 1000, size=1_000_000) for i in range(size)}

r = b.run()
r.plot()

预计简单的 for 循环会更快,因为处理一个 numpy.array 需要 < 1 毫秒:

def manipulate_value2( input_list ):
    return input_list ** 2 - 13

l = np.random.randint(0, 1000, size=1_000_000)
%timeit manipulate_values2(l)
# 951 µs ± 5.7 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)