字典使用 numba njit 并行化加速代码的问题

Problem by dictionaries to use numba njit parallelization to accelerate the code

我写了一段代码并尝试使用numba来加速代码。代码的主要目标是根据条件对一些值进行分组。在这方面,iter_用于收敛代码以满足条件。下面我准备了一个小案例来复现示例代码:

import numpy as np
import numba as nb

rng = np.random.default_rng(85)

# --------------------------------------- small data volume ---------------------------------------
# values_ = {'R0': np.array([0.01090976, 0.01069902, 0.00724112, 0.0068463 , 0.01135723, 0.00990762,
#                                        0.01090976, 0.01069902, 0.00724112, 0.0068463 , 0.01135723]),
#            'R1': np.array([0.01836379, 0.01900166, 0.01864162, 0.0182823 , 0.01840322, 0.01653088,
#                                        0.01900166, 0.01864162, 0.0182823 , 0.01840322, 0.01653088]),
#            'R2': np.array([0.02430913, 0.02239156, 0.02225379, 0.02093393, 0.02408692, 0.02110411,
#                                        0.02239156, 0.02225379, 0.02093393, 0.02408692, 0.02110411])}
#
# params = {'R0': [3, 0.9490579204466154, 1825, 7.070272000000002e-05],
#           'R1': [0, 0.9729203826820172, 167 , 7.070272000000002e-05],
#           'R2': [1, 0.6031363088057902, 1316, 8.007296000000003e-05]}
#
# Sno, dec_, upd_ = 2, 100, 200
# -------------------------------------------------------------------------------------------------

# ----------------------------- UPDATED (medium and large data volumes) ---------------------------
# values_ = np.load("values_med.npy", allow_pickle=True)[()]
# params = np.load("params_med.npy", allow_pickle=True)[()]
values_ = np.load("values_large.npy", allow_pickle=True)[()]
params = np.load("params_large.npy", allow_pickle=True)[()]

Sno, dec_, upd_ = 2000, 1000, 200
# -------------------------------------------------------------------------------------------------

# values_ = [*values_.values()]
# params = [*params.values()]


# @nb.jit(forceobj=True)
# def test(values_, params, Sno, dec_, upd_):

final_dict = {}
for i, j in enumerate(values_.keys()):
    Rand_vals = []
    goal_sum = params[j][1] * params[j][3]
    tel = goal_sum / dec_ * 10
    if params[j][0] != 0:
        for k in range(Sno):
            final_sum = 0.0
            iter_ = 0
            t = 1
            while not np.allclose(goal_sum, final_sum, atol=tel):
                iter_ += 1
                vals_group = rng.choice(values_[j], size=params[j][0], replace=False)
                # final_sum = 0.0016 * np.sum(vals_group)  # -----> For small data volume
                final_sum = np.sum(vals_group ** 3)        # -----> UPDATED For med or large data volume
                if iter_ == upd_:
                    t += 1
                    tel = t * tel
            values_[j] = np.delete(values_[j], np.where(np.in1d(values_[j], vals_group)))
            Rand_vals.append(vals_group)
    else:
        Rand_vals = [np.array([])] * Sno
    final_dict["R" + str(i)] = Rand_vals

#    return final_dict


# test(values_, params, Sno, dec_, upd_)

起初,为了在此代码上应用 numba,使用了 @nb.jitforceobj=True 用于避免警告和...),这会对性能产生不利影响。 nopython 也用 @nb.njit 检查,由于 not supporting (as mentioned in 1, ) dictionary type 输入:

cannot determine Numba type of <class 'dict'>

我不知道 Dictnumba.typed 是否(如何)处理它(通过将创建的 python 字典转换为 numba Dict)或者如果转换 字典数组列表有任何优势。我认为,如果某些代码行例如Rand_vals.append(vals_group) or else section or … 从函数中取出或修改以获得与以前相同的结果,但我不知道该怎么做。

如果能帮助在此代码上使用 numba,我将不胜感激。 numba parallelization 将是最理想的(可能是性能方面最适用的方法)解决方案


数据:

此代码可以转换为 Numba,但并不简单。

首先,必须定义字典和列表类型,因为 Numba njit 函数不能直接对反射列表进行操作(又名 pure-python 列表).在 Numba 中这样做有点乏味,生成的代码也有点冗长:

String = nb.types.unicode_type
ValueArray = nb.float64[::1]
ValueDict = nb.types.DictType(String, ValueArray)
ParamDictValue = nb.types.Tuple([nb.int_, nb.float64, nb.int_, nb.float64])
ParamDict = nb.types.DictType(String, ParamDictValue)
FinalDictValue = nb.types.ListType(ValueArray)
FinalDict = nb.types.DictType(String, FinalDictValue)

那么你需要转换输入字典:

nbValues = nb.typed.typeddict.Dict.empty(String, ValueArray)
for key,value in values_.items():
    nbValues[key] = value.copy()

nbParams = nb.typed.typeddict.Dict.empty(String, ParamDictValue)
for key,value in params.items():
    nbParams[key] = (nb.int_(value[0]), nb.float64(value[1]), nb.int_(value[2]), nb.float64(value[3]))

然后,你需要编写核心功能。 np.allclosenp.isin 未在 Numba 中实现,因此应手动重新实现它们。但要点是 Numba 不支持 rng Numpy 对象。我认为它肯定不会很快支持它。请注意,Numba 有一个随机数实现,它试图模仿 Numpy 的行为,但种子的管理有点不同。另请注意,如果种子设置为相同的值(Numpy 和 Numba 具有不同的未同步的种子变量),np.random.xxx Numpy 函数的结果应该相同。

@nb.njit(FinalDict(ValueDict, ParamDict, nb.int_, nb.int_, nb.int_))
def nbTest(values_, params, Sno, dec_, upd_):
    final_dict = nb.typed.Dict.empty(String, FinalDictValue)
    for i, j in enumerate(values_.keys()):
        Rand_vals = nb.typed.List.empty_list(ValueArray)
        goal_sum = params[j][1] * params[j][3]
        tel = goal_sum / dec_ * 10
        if params[j][0] != 0:
            for k in range(Sno):
                final_sum = 0.0
                iter_ = 0
                t = 1

                vals_group = np.empty(0, dtype=nb.float64)

                while np.abs(goal_sum - final_sum) > (1e-05 * np.abs(final_sum) + tel):
                    iter_ += 1
                    vals_group = np.random.choice(values_[j], size=params[j][0], replace=False)
                    final_sum = 0.0016 * np.sum(vals_group)
                    # final_sum = 0.0016 * np.sum(vals_group)  # (for small data volume)
                    final_sum = np.sum(vals_group ** 3)        # (for med or large data volume)
                    if iter_ == upd_:
                        t += 1
                        tel = t * tel

                # Perform an in-place deletion
                vals, gr = values_[j], vals_group
                cur = 0
                for l in range(vals.size):
                    found = False
                    for m in range(gr.size):
                        found |= vals[l] == gr[m]
                    if not found:
                        # Keep the value (delete it otherwise)
                        vals[cur] = vals[l]
                        cur += 1
                values_[j] = vals[:cur]

                Rand_vals.append(vals_group)
        else:
            for k in range(Sno):
                Rand_vals.append(np.empty(0, dtype=nb.float64))
        final_dict["R" + str(i)] = Rand_vals
    return final_dict

请注意,np.isin 的替换实现非常幼稚,但在您的输入示例中实际运行良好。

函数可以通过以下方式调用:

nbFinalDict = nbTest(nbValues, nbParams, Sno, dec_, upd_)

最后,应该将字典转换回基本 Python 对象:

finalDict = dict()
for key,value in nbFinalDict.items():
    finalDict[key] = list(value)

此实现对于小输入速度很快,但对于大输入则不快,因为 np.random.choice 几乎占用了所有时间 (>96%)。问题是这个函数 显然不是最优的 当请求的项目数量很少时(这是你的情况)。事实上,它令人惊讶地以输入数组的线性时间运行,而不是以请求项数量的线性时间运行。


进一步优化

可以完全重写该算法以仅提取 12 个随机项并以更有效的方式从主 currant 数组中丢弃它们。思路是将数组末尾的n项(小目标样本)与随机位置的其他项互换,然后校验和,重复这个过程直到满足一个条件,最后将视图提取到最后 n 个项目,然后再调整视图大小以丢弃最后一个项目。所有这些都可以在 O(n) 时间内完成,而不是 O(m) 时间,其中 m 是具有 n << m 的主电流阵列的大小(例如 12 VS 20_000).它也可以在没有任何昂贵分配的情况下进行计算。这是结果代码:

@nb.njit(nb.void(ValueArray, nb.int_, nb.int_))
def swap(arr, i, j):
    arr[i], arr[j] = arr[j], arr[i]

@nb.njit(FinalDict(ValueDict, ParamDict, nb.int_, nb.int_, nb.int_))
def nbTest(values_, params, Sno, dec_, upd_):
    final_dict = nb.typed.Dict.empty(String, FinalDictValue)
    for i, j in enumerate(values_.keys()):
        Rand_vals = nb.typed.List.empty_list(ValueArray)
        goal_sum = params[j][1] * params[j][3]
        tel = goal_sum / dec_ * 10
        values = values_[j]
        n = params[j][0]

        if n != 0:
            for k in range(Sno):
                final_sum = 0.0
                iter_ = 0
                t = 1

                m = values.size
                assert n <= m
                group = values[-n:]

                while np.abs(goal_sum - final_sum) > (1e-05 * np.abs(final_sum) + tel):
                    iter_ += 1

                    # Swap the group view with other random items
                    for pos in range(m - n, m):
                        swap(values, pos, np.random.randint(0, m))

                    # For small data volume:
                    # final_sum = 0.0016 * np.sum(group)

                    # For med/large data volume
                    final_sum = 0.0
                    for v in group:
                        final_sum += v ** 3

                    if iter_ == upd_:
                        t += 1
                        tel *= t

                assert iter_ > 0
                values = values[:m-n]
                Rand_vals.append(group)
        else:
            for k in range(Sno):
                Rand_vals.append(np.empty(0, dtype=nb.float64))
        final_dict["R" + str(i)] = Rand_vals
    return final_dict

除了速度更快之外,此实现的好处还在于更简单。结果看起来与之前的实现非常相似,尽管随机性使得结果检查变得棘手(特别是因为此函数不使用相同的方法来选择随机样本)。请注意,此实现不会删除 values 中的项目,而 group 中的项目与前一个项目相反(尽管这可能不是想要的)。


基准

这是我机器上最后一次执行的结果(不包括编译和转换时间):

Provided small input (embedded in the question):
 - Initial code:   42.71 ms
 - Numba code:      0.11 ms

Medium input:
 - Initial code:   3481 ms
 - Numba code:       11 ms

Large input:
 - Initial code:   6728 ms
 - Numba code:       20 ms

请注意,转换时间与计算时间大致相同。

最后一个实现比小输入的初始代码快 316~388 倍


注释

请注意,由于字典和列表类型,编译时间需要几秒钟。

请注意,虽然可以并行化实施,但只能并行化最全面的循环。问题是只有很少的项目需要计算,而且时间已经很短了(不是 multi-threading 的最佳情况)。 <-- 此外,创建许多临时数组(由 rng.choice 创建)肯定会导致并行循环无论如何都无法很好地扩展。 --> 此外,list/dict 不能从多个线程安全地写入,因此需要在整个函数中使用 Numpy 数组才能做到这一点(或添加已经很昂贵的额外转换)。此外,Numba 并行性往往会显着增加本已很长的编译时间。最后,结果将不太确定,因为每个 Numba 线程都有自己的随机数生成器种子,并且无法使用 prange 预测线程计算的项目(取决于在目标平台上选择的并行运行时)。请注意,在 Numpy 中,通常的随机函数默认使用一个全局种子(不推荐使用的方式),而 RNG 对象有自己的种子(新的首选方式)。