Python,使用字典的高效并行操作

Python, efficient paralell operation using a dict

首先抱歉我的英语不完美

我想我的问题很容易解释。

result={}
list_tuple=[(float,float,float),(float,float,float),(float,float,float)...]#200k tuples
threshold=[float,float,float...] #max 1k values
for tuple in list_tuple:
    for value in threeshold:
    if max(tuple)>value and min(tuple)<value:
        if value in result:
            result[value].append(tuple)
        else:
            result[value]=[]
            result[value].append(tuple) 

list_tuple 包含大约 20 万个元组,我必须非常快地执行此操作(在普通电脑上最多 2/3 秒)。

我的第一次尝试是在 cython 中使用 prange() 执行此操作(因此我可以从 cython 优化和并行执行中受益),但问题是(一如既往),GIL:在 prange()我可以使用 cython memviews 管理列表和元组,但我无法将结果插入字典。

在 cython 中,我也尝试使用 c++ std 的 unordered_map,但现在的问题是我无法在 c++ 中创建数组向量(这将是我的字典的值)。

第二题类似:

list_tuple=[((float,float),(float,float)),((float,float),(float,float))...]#200k tuples of tuples

result={list_tuple[0][0]:[]}

for tuple in list_tuple:
    if tuple[0] in result:
        result[tuple[0]].append(tuple)
    else:
        result[tuple[0]]=[]

这里我还有另一个问题,如果想使用 prange() 我必须使用自定义哈希函数来使用数组作为 c++ 的键 unordered_map

如您所见,我的代码片段非常简单 运行 并行。

我想尝试使用 numba,但由于 GIL,可能会是一样的,我更喜欢使用 cython,因为我需要二进制代码(这个库可能是商业软件的一部分,所以只有二进制库是允许)。

总的来说,我想避免使用 c/c++ 函数,我希望找到一种方法来并行管理类似 dicts/lists 的东西,并尽可能保持 cython 的性能在 Python 域中;但我愿意接受每一个建议。

谢谢

编辑

由于这种方法基本上执行数据样本和阈值之间的外积,因此它显着增加了所需的内存,这可能是不希望的。 I keep this answer nevertheless for future reference since it was referred to in .

我发现与 OP 的代码相比,性能提升是 ~ 20 的一个因素。


这是一个使用 numpy 的示例。数据被矢量化,操作也是如此。请注意,与 OP 的示例相反,生成的字典包含空列表,因此可能需要额外的清理步骤(如果适用)。

import numpy as np

# Data setup
data = np.random.uniform(size=(200000, 3))
thresh = np.random.uniform(size=1000)

# Compute tuples for thresholds.
condition = (
    (data.min(axis=1)[:, None] < thresh)
    & (data.max(axis=1)[:, None] > thresh)
)
result = {v: data[c].tolist() for c, v in zip(condition.T, thresh)}

@a_guest的代码:

def foo1(data, thresh):
    data = np.asarray(data)
    thresh = np.asarray(thresh)
    condition = (
       (data.min(axis=1)[:, None] < thresh)
       & (data.max(axis=1)[:, None] > thresh)
       )
    result = {v: data[c].tolist() for c, v in zip(condition.T, thresh)}
    return result

此代码为 thresh 中的每个项目创建一次字典条目。

OP 代码,使用 default_dict(来自 collections)进行了一些简化:

def foo3(list_tuple, threeshold):
    result = defaultdict(list)
    for tuple in list_tuple:
        for value in threeshold:
            if max(tuple)>value and min(tuple)<value:
                result[value].append(tuple)
    return result

这个为每个符合条件的项目更新一次字典条目。

以及他的示例数据:

In [27]: foo1(data,thresh)
Out[27]: {0: [], 1: [[0, 1, 2]], 2: [], 3: [], 4: [[3, 4, 5]]}
In [28]: foo3(data.tolist(), thresh.tolist())
Out[28]: defaultdict(list, {1: [[0, 1, 2]], 4: [[3, 4, 5]]})

时间测试:

In [29]: timeit foo1(data,thresh)
66.1 µs ± 197 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
# In [30]: timeit foo3(data,thresh)
# 161 µs ± 242 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
In [31]: timeit foo3(data.tolist(),thresh.tolist())
30.8 µs ± 56.4 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)

数组迭代比列表慢。 tolist() 的时间很少; np.asarray 列表更长。

使用更大的数据样本,array 版本更快:

In [42]: data = np.random.randint(0,50,(3000,3))
    ...: thresh = np.arange(50)
In [43]: 
In [43]: timeit foo1(data,thresh)
16 ms ± 391 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
In [44]: %%timeit x,y = data.tolist(), thresh.tolist() 
    ...: foo3(x,y)
    ...: 
83.6 ms ± 68.6 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

还可以通过使用 numpy 的矢量化功能实现多项性能改进:

  1. minmax 值当前针对每个阈值重新计算。相反,它们可以预先计算,然后重新用于每个阈值。
  2. 数据样本 (list_tuple) 的循环是在纯 Python 中执行的。这个循环可以使用 numpy.
  3. 向量化

在以下测试中,我按照 OP 中的说明使用了 data.shape == (200000, 3); thresh.shape == (1000,)。我还省略了对 result dict 的修改,因为根据数据,这可能会很快溢出内存。

申请 1.

v_min = [min(t) for t in data]
v_max = [max(t) for t in data]
for mi, ma in zip(v_min, v_max):
    for value in thresh:
        if ma > value and mi < value:
            pass

与 OP 的代码相比,这会产生 ~ 5 的性能提升。

应用 1. & 2.

v_min = data.min(axis=1)
v_max = data.max(axis=1)
mask = np.empty(shape=(data.shape[0],), dtype=bool)
for t in thresh:
    mask[:] = (v_min < t) & (v_max > t)
    samples = data[mask]
    if samples.size > 0:
        pass

与 OP 的代码相比,这会产生 ~ 30 的性能提升。这种方法还有一个额外的好处,它不包含列表的增量 appends,因为可能需要重新分配内存,这会减慢程序的速度。相反,它会在一次尝试中创建每个列表(每个阈值)。