字典使用 numba njit 并行化加速代码的问题
Problem by dictionaries to use numba njit parallelization to accelerate the code
我写了一段代码并尝试使用numba来加速代码。代码的主要目标是根据条件对一些值进行分组。在这方面,iter_
用于收敛代码以满足条件。下面我准备了一个小案例来复现示例代码:
import numpy as np
import numba as nb
rng = np.random.default_rng(85)
# --------------------------------------- small data volume ---------------------------------------
# values_ = {'R0': np.array([0.01090976, 0.01069902, 0.00724112, 0.0068463 , 0.01135723, 0.00990762,
# 0.01090976, 0.01069902, 0.00724112, 0.0068463 , 0.01135723]),
# 'R1': np.array([0.01836379, 0.01900166, 0.01864162, 0.0182823 , 0.01840322, 0.01653088,
# 0.01900166, 0.01864162, 0.0182823 , 0.01840322, 0.01653088]),
# 'R2': np.array([0.02430913, 0.02239156, 0.02225379, 0.02093393, 0.02408692, 0.02110411,
# 0.02239156, 0.02225379, 0.02093393, 0.02408692, 0.02110411])}
#
# params = {'R0': [3, 0.9490579204466154, 1825, 7.070272000000002e-05],
# 'R1': [0, 0.9729203826820172, 167 , 7.070272000000002e-05],
# 'R2': [1, 0.6031363088057902, 1316, 8.007296000000003e-05]}
#
# Sno, dec_, upd_ = 2, 100, 200
# -------------------------------------------------------------------------------------------------
# ----------------------------- UPDATED (medium and large data volumes) ---------------------------
# values_ = np.load("values_med.npy", allow_pickle=True)[()]
# params = np.load("params_med.npy", allow_pickle=True)[()]
values_ = np.load("values_large.npy", allow_pickle=True)[()]
params = np.load("params_large.npy", allow_pickle=True)[()]
Sno, dec_, upd_ = 2000, 1000, 200
# -------------------------------------------------------------------------------------------------
# values_ = [*values_.values()]
# params = [*params.values()]
# @nb.jit(forceobj=True)
# def test(values_, params, Sno, dec_, upd_):
final_dict = {}
for i, j in enumerate(values_.keys()):
Rand_vals = []
goal_sum = params[j][1] * params[j][3]
tel = goal_sum / dec_ * 10
if params[j][0] != 0:
for k in range(Sno):
final_sum = 0.0
iter_ = 0
t = 1
while not np.allclose(goal_sum, final_sum, atol=tel):
iter_ += 1
vals_group = rng.choice(values_[j], size=params[j][0], replace=False)
# final_sum = 0.0016 * np.sum(vals_group) # -----> For small data volume
final_sum = np.sum(vals_group ** 3) # -----> UPDATED For med or large data volume
if iter_ == upd_:
t += 1
tel = t * tel
values_[j] = np.delete(values_[j], np.where(np.in1d(values_[j], vals_group)))
Rand_vals.append(vals_group)
else:
Rand_vals = [np.array([])] * Sno
final_dict["R" + str(i)] = Rand_vals
# return final_dict
# test(values_, params, Sno, dec_, upd_)
起初,为了在此代码上应用 numba,使用了 @nb.jit
(forceobj=True
用于避免警告和...),这会对性能产生不利影响。 nopython
也用 @nb.njit
检查,由于 not supporting (as mentioned in 1, ) dictionary type 输入:
cannot determine Numba type of <class 'dict'>
我不知道 Dict
从 numba.typed
是否(如何)处理它(通过将创建的 python 字典转换为 numba Dict)或者如果转换 字典到数组列表有任何优势。我认为,如果某些代码行例如Rand_vals.append(vals_group)
or else section or … 从函数中取出或修改以获得与以前相同的结果,但我不知道该怎么做。
如果能帮助在此代码上使用 numba,我将不胜感激。 numba parallelization
将是最理想的(可能是性能方面最适用的方法)解决方案。
数据:
- 中等数据量:values_med, params_med
- 大数据量:values_large, params_large
此代码可以转换为 Numba,但并不简单。
首先,必须定义字典和列表类型,因为 Numba njit
函数不能直接对反射列表进行操作(又名 pure-python 列表).在 Numba 中这样做有点乏味,生成的代码也有点冗长:
String = nb.types.unicode_type
ValueArray = nb.float64[::1]
ValueDict = nb.types.DictType(String, ValueArray)
ParamDictValue = nb.types.Tuple([nb.int_, nb.float64, nb.int_, nb.float64])
ParamDict = nb.types.DictType(String, ParamDictValue)
FinalDictValue = nb.types.ListType(ValueArray)
FinalDict = nb.types.DictType(String, FinalDictValue)
那么你需要转换输入字典:
nbValues = nb.typed.typeddict.Dict.empty(String, ValueArray)
for key,value in values_.items():
nbValues[key] = value.copy()
nbParams = nb.typed.typeddict.Dict.empty(String, ParamDictValue)
for key,value in params.items():
nbParams[key] = (nb.int_(value[0]), nb.float64(value[1]), nb.int_(value[2]), nb.float64(value[3]))
然后,你需要编写核心功能。 np.allclose
和 np.isin
未在 Numba 中实现,因此应手动重新实现它们。但要点是 Numba 不支持 rng
Numpy 对象。我认为它肯定不会很快支持它。请注意,Numba 有一个随机数实现,它试图模仿 Numpy 的行为,但种子的管理有点不同。另请注意,如果种子设置为相同的值(Numpy 和 Numba 具有不同的未同步的种子变量),np.random.xxx
Numpy 函数的结果应该相同。
@nb.njit(FinalDict(ValueDict, ParamDict, nb.int_, nb.int_, nb.int_))
def nbTest(values_, params, Sno, dec_, upd_):
final_dict = nb.typed.Dict.empty(String, FinalDictValue)
for i, j in enumerate(values_.keys()):
Rand_vals = nb.typed.List.empty_list(ValueArray)
goal_sum = params[j][1] * params[j][3]
tel = goal_sum / dec_ * 10
if params[j][0] != 0:
for k in range(Sno):
final_sum = 0.0
iter_ = 0
t = 1
vals_group = np.empty(0, dtype=nb.float64)
while np.abs(goal_sum - final_sum) > (1e-05 * np.abs(final_sum) + tel):
iter_ += 1
vals_group = np.random.choice(values_[j], size=params[j][0], replace=False)
final_sum = 0.0016 * np.sum(vals_group)
# final_sum = 0.0016 * np.sum(vals_group) # (for small data volume)
final_sum = np.sum(vals_group ** 3) # (for med or large data volume)
if iter_ == upd_:
t += 1
tel = t * tel
# Perform an in-place deletion
vals, gr = values_[j], vals_group
cur = 0
for l in range(vals.size):
found = False
for m in range(gr.size):
found |= vals[l] == gr[m]
if not found:
# Keep the value (delete it otherwise)
vals[cur] = vals[l]
cur += 1
values_[j] = vals[:cur]
Rand_vals.append(vals_group)
else:
for k in range(Sno):
Rand_vals.append(np.empty(0, dtype=nb.float64))
final_dict["R" + str(i)] = Rand_vals
return final_dict
请注意,np.isin
的替换实现非常幼稚,但在您的输入示例中实际运行良好。
函数可以通过以下方式调用:
nbFinalDict = nbTest(nbValues, nbParams, Sno, dec_, upd_)
最后,应该将字典转换回基本 Python 对象:
finalDict = dict()
for key,value in nbFinalDict.items():
finalDict[key] = list(value)
此实现对于小输入速度很快,但对于大输入则不快,因为 np.random.choice
几乎占用了所有时间 (>96%)。问题是这个函数 显然不是最优的 当请求的项目数量很少时(这是你的情况)。事实上,它令人惊讶地以输入数组的线性时间运行,而不是以请求项数量的线性时间运行。
进一步优化
可以完全重写该算法以仅提取 12 个随机项并以更有效的方式从主 currant 数组中丢弃它们。思路是将数组末尾的n
项(小目标样本)与随机位置的其他项互换,然后校验和,重复这个过程直到满足一个条件,最后将视图提取到最后 n
个项目,然后再调整视图大小以丢弃最后一个项目。所有这些都可以在 O(n)
时间内完成,而不是 O(m)
时间,其中 m
是具有 n << m
的主电流阵列的大小(例如 12 VS 20_000).它也可以在没有任何昂贵分配的情况下进行计算。这是结果代码:
@nb.njit(nb.void(ValueArray, nb.int_, nb.int_))
def swap(arr, i, j):
arr[i], arr[j] = arr[j], arr[i]
@nb.njit(FinalDict(ValueDict, ParamDict, nb.int_, nb.int_, nb.int_))
def nbTest(values_, params, Sno, dec_, upd_):
final_dict = nb.typed.Dict.empty(String, FinalDictValue)
for i, j in enumerate(values_.keys()):
Rand_vals = nb.typed.List.empty_list(ValueArray)
goal_sum = params[j][1] * params[j][3]
tel = goal_sum / dec_ * 10
values = values_[j]
n = params[j][0]
if n != 0:
for k in range(Sno):
final_sum = 0.0
iter_ = 0
t = 1
m = values.size
assert n <= m
group = values[-n:]
while np.abs(goal_sum - final_sum) > (1e-05 * np.abs(final_sum) + tel):
iter_ += 1
# Swap the group view with other random items
for pos in range(m - n, m):
swap(values, pos, np.random.randint(0, m))
# For small data volume:
# final_sum = 0.0016 * np.sum(group)
# For med/large data volume
final_sum = 0.0
for v in group:
final_sum += v ** 3
if iter_ == upd_:
t += 1
tel *= t
assert iter_ > 0
values = values[:m-n]
Rand_vals.append(group)
else:
for k in range(Sno):
Rand_vals.append(np.empty(0, dtype=nb.float64))
final_dict["R" + str(i)] = Rand_vals
return final_dict
除了速度更快之外,此实现的好处还在于更简单。结果看起来与之前的实现非常相似,尽管随机性使得结果检查变得棘手(特别是因为此函数不使用相同的方法来选择随机样本)。请注意,此实现不会删除 values
中的项目,而 group
中的项目与前一个项目相反(尽管这可能不是想要的)。
基准
这是我机器上最后一次执行的结果(不包括编译和转换时间):
Provided small input (embedded in the question):
- Initial code: 42.71 ms
- Numba code: 0.11 ms
Medium input:
- Initial code: 3481 ms
- Numba code: 11 ms
Large input:
- Initial code: 6728 ms
- Numba code: 20 ms
请注意,转换时间与计算时间大致相同。
最后一个实现比小输入的初始代码快 316~388 倍。
注释
请注意,由于字典和列表类型,编译时间需要几秒钟。
请注意,虽然可以并行化实施,但只能并行化最全面的循环。问题是只有很少的项目需要计算,而且时间已经很短了(不是 multi-threading 的最佳情况)。 <-- 此外,创建许多临时数组(由 rng.choice
创建)肯定会导致并行循环无论如何都无法很好地扩展。 --> 此外,list/dict 不能从多个线程安全地写入,因此需要在整个函数中使用 Numpy 数组才能做到这一点(或添加已经很昂贵的额外转换)。此外,Numba 并行性往往会显着增加本已很长的编译时间。最后,结果将不太确定,因为每个 Numba 线程都有自己的随机数生成器种子,并且无法使用 prange
预测线程计算的项目(取决于在目标平台上选择的并行运行时)。请注意,在 Numpy 中,通常的随机函数默认使用一个全局种子(不推荐使用的方式),而 RNG 对象有自己的种子(新的首选方式)。
我写了一段代码并尝试使用numba来加速代码。代码的主要目标是根据条件对一些值进行分组。在这方面,iter_
用于收敛代码以满足条件。下面我准备了一个小案例来复现示例代码:
import numpy as np
import numba as nb
rng = np.random.default_rng(85)
# --------------------------------------- small data volume ---------------------------------------
# values_ = {'R0': np.array([0.01090976, 0.01069902, 0.00724112, 0.0068463 , 0.01135723, 0.00990762,
# 0.01090976, 0.01069902, 0.00724112, 0.0068463 , 0.01135723]),
# 'R1': np.array([0.01836379, 0.01900166, 0.01864162, 0.0182823 , 0.01840322, 0.01653088,
# 0.01900166, 0.01864162, 0.0182823 , 0.01840322, 0.01653088]),
# 'R2': np.array([0.02430913, 0.02239156, 0.02225379, 0.02093393, 0.02408692, 0.02110411,
# 0.02239156, 0.02225379, 0.02093393, 0.02408692, 0.02110411])}
#
# params = {'R0': [3, 0.9490579204466154, 1825, 7.070272000000002e-05],
# 'R1': [0, 0.9729203826820172, 167 , 7.070272000000002e-05],
# 'R2': [1, 0.6031363088057902, 1316, 8.007296000000003e-05]}
#
# Sno, dec_, upd_ = 2, 100, 200
# -------------------------------------------------------------------------------------------------
# ----------------------------- UPDATED (medium and large data volumes) ---------------------------
# values_ = np.load("values_med.npy", allow_pickle=True)[()]
# params = np.load("params_med.npy", allow_pickle=True)[()]
values_ = np.load("values_large.npy", allow_pickle=True)[()]
params = np.load("params_large.npy", allow_pickle=True)[()]
Sno, dec_, upd_ = 2000, 1000, 200
# -------------------------------------------------------------------------------------------------
# values_ = [*values_.values()]
# params = [*params.values()]
# @nb.jit(forceobj=True)
# def test(values_, params, Sno, dec_, upd_):
final_dict = {}
for i, j in enumerate(values_.keys()):
Rand_vals = []
goal_sum = params[j][1] * params[j][3]
tel = goal_sum / dec_ * 10
if params[j][0] != 0:
for k in range(Sno):
final_sum = 0.0
iter_ = 0
t = 1
while not np.allclose(goal_sum, final_sum, atol=tel):
iter_ += 1
vals_group = rng.choice(values_[j], size=params[j][0], replace=False)
# final_sum = 0.0016 * np.sum(vals_group) # -----> For small data volume
final_sum = np.sum(vals_group ** 3) # -----> UPDATED For med or large data volume
if iter_ == upd_:
t += 1
tel = t * tel
values_[j] = np.delete(values_[j], np.where(np.in1d(values_[j], vals_group)))
Rand_vals.append(vals_group)
else:
Rand_vals = [np.array([])] * Sno
final_dict["R" + str(i)] = Rand_vals
# return final_dict
# test(values_, params, Sno, dec_, upd_)
起初,为了在此代码上应用 numba,使用了 @nb.jit
(forceobj=True
用于避免警告和...),这会对性能产生不利影响。 nopython
也用 @nb.njit
检查,由于 not supporting (as mentioned in 1,
cannot determine Numba type of <class 'dict'>
我不知道 Dict
从 numba.typed
是否(如何)处理它(通过将创建的 python 字典转换为 numba Dict)或者如果转换 字典到数组列表有任何优势。我认为,如果某些代码行例如Rand_vals.append(vals_group)
or else section or … 从函数中取出或修改以获得与以前相同的结果,但我不知道该怎么做。
如果能帮助在此代码上使用 numba,我将不胜感激。 numba parallelization
将是最理想的(可能是性能方面最适用的方法)解决方案。
数据:
- 中等数据量:values_med, params_med
- 大数据量:values_large, params_large
此代码可以转换为 Numba,但并不简单。
首先,必须定义字典和列表类型,因为 Numba njit
函数不能直接对反射列表进行操作(又名 pure-python 列表).在 Numba 中这样做有点乏味,生成的代码也有点冗长:
String = nb.types.unicode_type
ValueArray = nb.float64[::1]
ValueDict = nb.types.DictType(String, ValueArray)
ParamDictValue = nb.types.Tuple([nb.int_, nb.float64, nb.int_, nb.float64])
ParamDict = nb.types.DictType(String, ParamDictValue)
FinalDictValue = nb.types.ListType(ValueArray)
FinalDict = nb.types.DictType(String, FinalDictValue)
那么你需要转换输入字典:
nbValues = nb.typed.typeddict.Dict.empty(String, ValueArray)
for key,value in values_.items():
nbValues[key] = value.copy()
nbParams = nb.typed.typeddict.Dict.empty(String, ParamDictValue)
for key,value in params.items():
nbParams[key] = (nb.int_(value[0]), nb.float64(value[1]), nb.int_(value[2]), nb.float64(value[3]))
然后,你需要编写核心功能。 np.allclose
和 np.isin
未在 Numba 中实现,因此应手动重新实现它们。但要点是 Numba 不支持 rng
Numpy 对象。我认为它肯定不会很快支持它。请注意,Numba 有一个随机数实现,它试图模仿 Numpy 的行为,但种子的管理有点不同。另请注意,如果种子设置为相同的值(Numpy 和 Numba 具有不同的未同步的种子变量),np.random.xxx
Numpy 函数的结果应该相同。
@nb.njit(FinalDict(ValueDict, ParamDict, nb.int_, nb.int_, nb.int_))
def nbTest(values_, params, Sno, dec_, upd_):
final_dict = nb.typed.Dict.empty(String, FinalDictValue)
for i, j in enumerate(values_.keys()):
Rand_vals = nb.typed.List.empty_list(ValueArray)
goal_sum = params[j][1] * params[j][3]
tel = goal_sum / dec_ * 10
if params[j][0] != 0:
for k in range(Sno):
final_sum = 0.0
iter_ = 0
t = 1
vals_group = np.empty(0, dtype=nb.float64)
while np.abs(goal_sum - final_sum) > (1e-05 * np.abs(final_sum) + tel):
iter_ += 1
vals_group = np.random.choice(values_[j], size=params[j][0], replace=False)
final_sum = 0.0016 * np.sum(vals_group)
# final_sum = 0.0016 * np.sum(vals_group) # (for small data volume)
final_sum = np.sum(vals_group ** 3) # (for med or large data volume)
if iter_ == upd_:
t += 1
tel = t * tel
# Perform an in-place deletion
vals, gr = values_[j], vals_group
cur = 0
for l in range(vals.size):
found = False
for m in range(gr.size):
found |= vals[l] == gr[m]
if not found:
# Keep the value (delete it otherwise)
vals[cur] = vals[l]
cur += 1
values_[j] = vals[:cur]
Rand_vals.append(vals_group)
else:
for k in range(Sno):
Rand_vals.append(np.empty(0, dtype=nb.float64))
final_dict["R" + str(i)] = Rand_vals
return final_dict
请注意,np.isin
的替换实现非常幼稚,但在您的输入示例中实际运行良好。
函数可以通过以下方式调用:
nbFinalDict = nbTest(nbValues, nbParams, Sno, dec_, upd_)
最后,应该将字典转换回基本 Python 对象:
finalDict = dict()
for key,value in nbFinalDict.items():
finalDict[key] = list(value)
此实现对于小输入速度很快,但对于大输入则不快,因为 np.random.choice
几乎占用了所有时间 (>96%)。问题是这个函数 显然不是最优的 当请求的项目数量很少时(这是你的情况)。事实上,它令人惊讶地以输入数组的线性时间运行,而不是以请求项数量的线性时间运行。
进一步优化
可以完全重写该算法以仅提取 12 个随机项并以更有效的方式从主 currant 数组中丢弃它们。思路是将数组末尾的n
项(小目标样本)与随机位置的其他项互换,然后校验和,重复这个过程直到满足一个条件,最后将视图提取到最后 n
个项目,然后再调整视图大小以丢弃最后一个项目。所有这些都可以在 O(n)
时间内完成,而不是 O(m)
时间,其中 m
是具有 n << m
的主电流阵列的大小(例如 12 VS 20_000).它也可以在没有任何昂贵分配的情况下进行计算。这是结果代码:
@nb.njit(nb.void(ValueArray, nb.int_, nb.int_))
def swap(arr, i, j):
arr[i], arr[j] = arr[j], arr[i]
@nb.njit(FinalDict(ValueDict, ParamDict, nb.int_, nb.int_, nb.int_))
def nbTest(values_, params, Sno, dec_, upd_):
final_dict = nb.typed.Dict.empty(String, FinalDictValue)
for i, j in enumerate(values_.keys()):
Rand_vals = nb.typed.List.empty_list(ValueArray)
goal_sum = params[j][1] * params[j][3]
tel = goal_sum / dec_ * 10
values = values_[j]
n = params[j][0]
if n != 0:
for k in range(Sno):
final_sum = 0.0
iter_ = 0
t = 1
m = values.size
assert n <= m
group = values[-n:]
while np.abs(goal_sum - final_sum) > (1e-05 * np.abs(final_sum) + tel):
iter_ += 1
# Swap the group view with other random items
for pos in range(m - n, m):
swap(values, pos, np.random.randint(0, m))
# For small data volume:
# final_sum = 0.0016 * np.sum(group)
# For med/large data volume
final_sum = 0.0
for v in group:
final_sum += v ** 3
if iter_ == upd_:
t += 1
tel *= t
assert iter_ > 0
values = values[:m-n]
Rand_vals.append(group)
else:
for k in range(Sno):
Rand_vals.append(np.empty(0, dtype=nb.float64))
final_dict["R" + str(i)] = Rand_vals
return final_dict
除了速度更快之外,此实现的好处还在于更简单。结果看起来与之前的实现非常相似,尽管随机性使得结果检查变得棘手(特别是因为此函数不使用相同的方法来选择随机样本)。请注意,此实现不会删除 values
中的项目,而 group
中的项目与前一个项目相反(尽管这可能不是想要的)。
基准
这是我机器上最后一次执行的结果(不包括编译和转换时间):
Provided small input (embedded in the question):
- Initial code: 42.71 ms
- Numba code: 0.11 ms
Medium input:
- Initial code: 3481 ms
- Numba code: 11 ms
Large input:
- Initial code: 6728 ms
- Numba code: 20 ms
请注意,转换时间与计算时间大致相同。
最后一个实现比小输入的初始代码快 316~388 倍。
注释
请注意,由于字典和列表类型,编译时间需要几秒钟。
请注意,虽然可以并行化实施,但只能并行化最全面的循环。问题是只有很少的项目需要计算,而且时间已经很短了(不是 multi-threading 的最佳情况)。 <-- 此外,创建许多临时数组(由 rng.choice
创建)肯定会导致并行循环无论如何都无法很好地扩展。 --> 此外,list/dict 不能从多个线程安全地写入,因此需要在整个函数中使用 Numpy 数组才能做到这一点(或添加已经很昂贵的额外转换)。此外,Numba 并行性往往会显着增加本已很长的编译时间。最后,结果将不太确定,因为每个 Numba 线程都有自己的随机数生成器种子,并且无法使用 prange
预测线程计算的项目(取决于在目标平台上选择的并行运行时)。请注意,在 Numpy 中,通常的随机函数默认使用一个全局种子(不推荐使用的方式),而 RNG 对象有自己的种子(新的首选方式)。