如何加速我编写的 python 代码:包含用于按多边形对点进行分类的嵌套函数的函数
How to accelerate my written python code: function containing nested functions for classification of points by polygons
我 Python 编写了以下 NumPy 代码:
def inbox_(points, polygon):
""" Finding points in a region """
ll = np.amin(polygon, axis=0) # lower limit
ur = np.amax(polygon, axis=0) # upper limit
in_idx = np.all(np.logical_and(ll <= points, points < ur), axis=1) # points in the range [boolean]
return in_idx
def operation_(r, gap, ends_ind):
""" calculation formula which is applied on the points specified by inbox_ function """
r_active = np.take(r, ends_ind) # taking values from "r" based on indices and shape (paired_values) of "ends_ind"
r_sub = np.subtract.reduce(r_active, axis=1) # subtracting each paired "r" determined by "ends_ind" [this line will be used in the final return formula]
r_add = np.add.reduce(r_active, axis=1) # adding each paired "r" determined by "ends_ind" [this line will be used in the final return formula]
paired_cent_dis = np.sum((r_add, gap), axis=0) # distance of the each two paired points
return (np.power(gap, 2) * (np.power(paired_cent_dis, 2) + 5 * paired_cent_dis * r_add - 7 * np.power(r_sub, 2))) / (3 * paired_cent_dis) # Formula
def elapses(r, pos, gap, ends_ind, elem_vert, contact_poss):
if len(gap) > 0:
elaps = np.empty([len(elem_vert), ], dtype=object)
operate_ = operation_(r, gap, ends_ind)
#elbav = np.empty([len(elem_vert), ], dtype=object)
#con_num = 0
for i, j in enumerate(elem_vert): # loop for each section (cell or region) of a mesh
in_bool = inbox_(contact_poss, j) # getting boolean array for points within that section
elaps[i] = np.sum(operate_[in_bool]) # performing some calculations on that points and get the sum of them for each section
operate_ = operate_[np.invert(in_bool)] # slicing the arrays by deleting the points on which the calculations were performed to speed-up the code in next loops
contact_poss = contact_poss[np.invert(in_bool)] # as above
#con_num += sum(inbox_(contact_poss, j))
#inba_bool = inbox_(pos, j)
#elbav[i] = 4 * np.pi * np.sum(np.power(r[inba_bool], 3)) / 3
#pos = pos[np.invert(inba_bool)]
#r = r[np.invert(inba_bool)]
return elaps
r = np.load('a.npy')
pos = np.load('b.npy')
gap = np.load('c.npy')
ends_ind = np.load('d.npy')
elem_vert = np.load('e.npy')
contact_poss = np.load('f.npy')
elapses(r, pos, gap, ends_ind, elem_vert, contact_poss)
# a --------r-------> parameter corresponding to each coordinate (point); here radius (23605,) <class 'numpy.ndarray'> <class 'numpy.float64'>
# b -------pos------> coordinates of the points (23605, 3) <class 'numpy.ndarray'> <class 'numpy.ndarray'> <class 'numpy.float64'>
# c -------gap------> if we consider points as spheres by that radii [r], it is maximum length for spheres' over-lap (103832,) <class 'numpy.ndarray'> <class 'numpy.float64'>
# d ----ends_ind----> indices for each over-laped spheres (103832, 2) <class 'numpy.ndarray'> <class 'numpy.ndarray'> <class 'numpy.int64'>
# e ---elem_vert----> vertices of the mesh's sections or cells (2000, 8, 3) <class 'numpy.ndarray'> <class 'numpy.ndarray'> <class 'numpy.ndarray'> <class 'numpy.float64'>
# f --contact_poss--> a coordinate between the paired spheres (103832, 3) <class 'numpy.ndarray'> <class 'numpy.ndarray'> <class 'numpy.float64'>
此代码将经常从另一个具有大数据输入的代码调用。因此,加速此代码至关重要。我尝试使用 JAX 和 Numba 库中的 jit
装饰器来加速代码,但我无法正确使用它来使代码更好。我已经在 Colab 上测试了代码(针对循环数分别为 20、250 和 2000 的 3 个数据集)的速度,结果为:
11 (ms), 47 (ms), 6.62 (s) (per loop) <-- without the commented code lines in the code
137 (ms), 1.66 (s) , 4 (m) (per loop) <-- with activating the commented code lines in the code
这段代码所做的是在一个范围内找到一些坐标,然后对它们执行一些计算。
对于任何可以显着加快此代码速度的答案(我相信它可以),我将不胜感激。此外,对于通过更改(替换)使用的 NumPy 方法和……或编写数学运算方法来加速代码的任何有经验的建议,我将不胜感激。
备注:
- 建议的答案必须由 python 版本 2 执行(适用于版本 2 和 3 非常好)
- 代码中的注释代码行对于主要目的来说是不必要的,只是为了进一步评估而编写的。任何使用建议的答案处理这些行的建议都值得赞赏(不需要)。
测试数据集:
小数据集:https://drive.google.com/file/d/1CswjyoqS8ogLmLQa_oNTOj221chDcbK8/view?usp=sharing
中等数据集:https://drive.google.com/file/d/14RJ0Ackx88NzQWloops5FagzuNQYDSrh/view?usp=sharing
大数据集:https://drive.google.com/file/d/1dJnXpb3HiAGcRC9PPTwui9joNcij4E_E/view?usp=sharing
首先,可以改进算法以提高效率。事实上,一个多边形可以直接分配给每个点。这就像按多边形 对点进行分类 。分类完成后,您可以通过键 执行 one/many 归约,其中键是多边形 ID。
这个新算法包括:
- 计算多边形的所有边界框;
- 按多边形对点进行分类;
- 按键执行缩减(其中键是多边形 ID)。
这种方法比遍历每个多边形的所有点并过滤属性数组(例如 operate_
和 contact_poss
)更有效。事实上,过滤是一项昂贵的操作,因为它需要目标数组(可能不适合 CPU 缓存)被完全读取然后写回。更不用说这个操作需要一个临时数组 allocated/deleted 如果它不是就地执行并且该操作无法从大多数 [=70] 上使用 SIMD 指令 实现而受益=] 平台(因为它需要新的 AVX-512 指令集)。 并行化也更难,因为过滤步骤太快以至于线程无法使用,但步骤需要按顺序完成。
关于算法的实现,Numba 可以用来加快整体计算速度。使用 Numba 的主要好处是大幅 减少 Numpy 在当前实现中创建的昂贵的临时数组 的数量。请注意,您可以 将函数类型 指定给 Numba,以便它可以在定义函数时对其进行编译。 断言 可用于使代码更 健壮 并帮助编译器了解给定维度的大小,从而生成明显更快的代码( Numba 的 JIT 编译器可以展开循环)。 Ternaries operators 可以帮助 JIT 编译器生成更快的 无分支 程序。
请注意,使用多线程可以轻松并行化 分类。然而,需要非常小心 constant propagation 因为一些关键常量(如工作数组和断言的形状)往往不会传播到线程执行的代码,而传播对于优化热循环至关重要(例如矢量化、展开)。另请注意,在具有许多内核(从 10 毫秒到 0.1 毫秒)的机器上创建许多线程可能会很昂贵。因此,通常最好只对大输入数据使用并行实现。
这是最终实现(同时使用 Python2 和 Python3):
@nb.njit('float64[::1](float64[::1], float64[::1], int64[:,::1])')
def operation_(r, gap, ends_ind):
""" calculation formula which is applied on the points specified by findMatchingPolygons_ function """
nPoints = ends_ind.shape[0]
assert ends_ind.shape[1] == 2
assert gap.size == nPoints
formula = np.empty(nPoints, dtype=np.float64)
for i in range(nPoints):
ind0, ind1 = ends_ind[i]
r0, r1 = r[ind0], r[ind1]
r_sub = r0 - r1
r_add = r0 + r1
cur_gap = gap[i]
paired_cent_dis = r_add + cur_gap
formula[i] = (cur_gap**2 * (paired_cent_dis**2 + 5 * paired_cent_dis * r_add - 7 * r_sub**2)) / (3 * paired_cent_dis)
return formula
# Use `parallel=True` for a parallel implementation
@nb.njit('int32[::1](float64[:,::1], float64[:,:,::1])')
def findMatchingPolygons_(points, polygons):
""" Attribute to all point a region """
nPolygons = polygons.shape[0]
nPolygonPoints = polygons.shape[1]
nPoints = points.shape[0]
assert points.shape[1] == 3
assert polygons.shape[2] == 3
# Compute the bounding boxes of all polygons
ll = np.empty((nPolygons, 3), dtype=np.float64)
ur = np.empty((nPolygons, 3), dtype=np.float64)
for i in range(nPolygons):
ll_x, ll_y, ll_z = polygons[i, 0]
ur_x, ur_y, ur_z = polygons[i, 0]
for j in range(1, nPolygonPoints):
x, y, z = polygons[i, j]
ll_x = x if x<ll_x else ll_x
ll_y = y if y<ll_y else ll_y
ll_z = z if z<ll_z else ll_z
ur_x = x if x>ur_x else ur_x
ur_y = y if y>ur_y else ur_y
ur_z = z if z>ur_z else ur_z
ll[i] = ll_x, ll_y, ll_z
ur[i] = ur_x, ur_y, ur_z
# Find for each point its corresponding polygon
pointPolygonId = np.empty(nPoints, dtype=np.int32)
# Use `nb.prange(nPoints)` for a parallel implementation
for i in range(nPoints):
x, y, z = points[i, 0], points[i, 1], points[i, 2]
pointPolygonId[i] = -1
for j in range(polygons.shape[0]):
if ll[j, 0] <= x < ur[j, 0] and ll[j, 1] <= y < ur[j, 1] and ll[j, 2] <= z < ur[j, 2]:
pointPolygonId[i] = j
break
return pointPolygonId
@nb.njit('float64[::1](float64[:,:,::1], float64[:,::1], float64[::1])')
def computeSections_(elem_vert, contact_poss, operate_):
nPolygons = elem_vert.shape[0]
elaps = np.zeros(nPolygons, dtype=np.float64)
pointPolygonId = findMatchingPolygons_(contact_poss, elem_vert)
for i, polygonId in enumerate(pointPolygonId):
if polygonId >= 0:
elaps[polygonId] += operate_[i]
return elaps
def elapses(r, pos, gap, ends_ind, elem_vert, contact_poss):
if len(gap) > 0:
operate_ = operation_(r, gap, ends_ind)
return computeSections_(elem_vert, contact_poss, operate_)
r = np.load('a.npy')
pos = np.load('b.npy')
gap = np.load('c.npy')
ends_ind = np.load('d.npy')
elem_vert = np.load('e.npy')
contact_poss = np.load('f.npy')
elapses(r, pos, gap, ends_ind, elem_vert, contact_poss)
以下是旧 2 核机器 (i7-3520M) 上的结果:
Small dataset:
- Original version: 5.53 ms
- Proposed version (sequential): 0.22 ms (x25)
- Proposed version (parallel): 0.20 ms (x27)
Medium dataset:
- Original version: 53.40 ms
- Proposed version (sequential): 1.24 ms (x43)
- Proposed version (parallel): 0.62 ms (x86)
Big dataset:
- Original version: 5742 ms
- Proposed version (sequential): 144 ms (x40)
- Proposed version (parallel): 67 ms (x86)
因此,提议的实施比原始实施快 86 倍。
我 Python 编写了以下 NumPy 代码:
def inbox_(points, polygon):
""" Finding points in a region """
ll = np.amin(polygon, axis=0) # lower limit
ur = np.amax(polygon, axis=0) # upper limit
in_idx = np.all(np.logical_and(ll <= points, points < ur), axis=1) # points in the range [boolean]
return in_idx
def operation_(r, gap, ends_ind):
""" calculation formula which is applied on the points specified by inbox_ function """
r_active = np.take(r, ends_ind) # taking values from "r" based on indices and shape (paired_values) of "ends_ind"
r_sub = np.subtract.reduce(r_active, axis=1) # subtracting each paired "r" determined by "ends_ind" [this line will be used in the final return formula]
r_add = np.add.reduce(r_active, axis=1) # adding each paired "r" determined by "ends_ind" [this line will be used in the final return formula]
paired_cent_dis = np.sum((r_add, gap), axis=0) # distance of the each two paired points
return (np.power(gap, 2) * (np.power(paired_cent_dis, 2) + 5 * paired_cent_dis * r_add - 7 * np.power(r_sub, 2))) / (3 * paired_cent_dis) # Formula
def elapses(r, pos, gap, ends_ind, elem_vert, contact_poss):
if len(gap) > 0:
elaps = np.empty([len(elem_vert), ], dtype=object)
operate_ = operation_(r, gap, ends_ind)
#elbav = np.empty([len(elem_vert), ], dtype=object)
#con_num = 0
for i, j in enumerate(elem_vert): # loop for each section (cell or region) of a mesh
in_bool = inbox_(contact_poss, j) # getting boolean array for points within that section
elaps[i] = np.sum(operate_[in_bool]) # performing some calculations on that points and get the sum of them for each section
operate_ = operate_[np.invert(in_bool)] # slicing the arrays by deleting the points on which the calculations were performed to speed-up the code in next loops
contact_poss = contact_poss[np.invert(in_bool)] # as above
#con_num += sum(inbox_(contact_poss, j))
#inba_bool = inbox_(pos, j)
#elbav[i] = 4 * np.pi * np.sum(np.power(r[inba_bool], 3)) / 3
#pos = pos[np.invert(inba_bool)]
#r = r[np.invert(inba_bool)]
return elaps
r = np.load('a.npy')
pos = np.load('b.npy')
gap = np.load('c.npy')
ends_ind = np.load('d.npy')
elem_vert = np.load('e.npy')
contact_poss = np.load('f.npy')
elapses(r, pos, gap, ends_ind, elem_vert, contact_poss)
# a --------r-------> parameter corresponding to each coordinate (point); here radius (23605,) <class 'numpy.ndarray'> <class 'numpy.float64'>
# b -------pos------> coordinates of the points (23605, 3) <class 'numpy.ndarray'> <class 'numpy.ndarray'> <class 'numpy.float64'>
# c -------gap------> if we consider points as spheres by that radii [r], it is maximum length for spheres' over-lap (103832,) <class 'numpy.ndarray'> <class 'numpy.float64'>
# d ----ends_ind----> indices for each over-laped spheres (103832, 2) <class 'numpy.ndarray'> <class 'numpy.ndarray'> <class 'numpy.int64'>
# e ---elem_vert----> vertices of the mesh's sections or cells (2000, 8, 3) <class 'numpy.ndarray'> <class 'numpy.ndarray'> <class 'numpy.ndarray'> <class 'numpy.float64'>
# f --contact_poss--> a coordinate between the paired spheres (103832, 3) <class 'numpy.ndarray'> <class 'numpy.ndarray'> <class 'numpy.float64'>
此代码将经常从另一个具有大数据输入的代码调用。因此,加速此代码至关重要。我尝试使用 JAX 和 Numba 库中的 jit
装饰器来加速代码,但我无法正确使用它来使代码更好。我已经在 Colab 上测试了代码(针对循环数分别为 20、250 和 2000 的 3 个数据集)的速度,结果为:
11 (ms), 47 (ms), 6.62 (s) (per loop) <-- without the commented code lines in the code
137 (ms), 1.66 (s) , 4 (m) (per loop) <-- with activating the commented code lines in the code
这段代码所做的是在一个范围内找到一些坐标,然后对它们执行一些计算。
对于任何可以显着加快此代码速度的答案(我相信它可以),我将不胜感激。此外,对于通过更改(替换)使用的 NumPy 方法和……或编写数学运算方法来加速代码的任何有经验的建议,我将不胜感激。
备注:
- 建议的答案必须由 python 版本 2 执行(适用于版本 2 和 3 非常好)
- 代码中的注释代码行对于主要目的来说是不必要的,只是为了进一步评估而编写的。任何使用建议的答案处理这些行的建议都值得赞赏(不需要)。
测试数据集:
小数据集:https://drive.google.com/file/d/1CswjyoqS8ogLmLQa_oNTOj221chDcbK8/view?usp=sharing
中等数据集:https://drive.google.com/file/d/14RJ0Ackx88NzQWloops5FagzuNQYDSrh/view?usp=sharing
大数据集:https://drive.google.com/file/d/1dJnXpb3HiAGcRC9PPTwui9joNcij4E_E/view?usp=sharing
首先,可以改进算法以提高效率。事实上,一个多边形可以直接分配给每个点。这就像按多边形 对点进行分类 。分类完成后,您可以通过键 执行 one/many 归约,其中键是多边形 ID。
这个新算法包括:
- 计算多边形的所有边界框;
- 按多边形对点进行分类;
- 按键执行缩减(其中键是多边形 ID)。
这种方法比遍历每个多边形的所有点并过滤属性数组(例如 operate_
和 contact_poss
)更有效。事实上,过滤是一项昂贵的操作,因为它需要目标数组(可能不适合 CPU 缓存)被完全读取然后写回。更不用说这个操作需要一个临时数组 allocated/deleted 如果它不是就地执行并且该操作无法从大多数 [=70] 上使用 SIMD 指令 实现而受益=] 平台(因为它需要新的 AVX-512 指令集)。 并行化也更难,因为过滤步骤太快以至于线程无法使用,但步骤需要按顺序完成。
关于算法的实现,Numba 可以用来加快整体计算速度。使用 Numba 的主要好处是大幅 减少 Numpy 在当前实现中创建的昂贵的临时数组 的数量。请注意,您可以 将函数类型 指定给 Numba,以便它可以在定义函数时对其进行编译。 断言 可用于使代码更 健壮 并帮助编译器了解给定维度的大小,从而生成明显更快的代码( Numba 的 JIT 编译器可以展开循环)。 Ternaries operators 可以帮助 JIT 编译器生成更快的 无分支 程序。
请注意,使用多线程可以轻松并行化 分类。然而,需要非常小心 constant propagation 因为一些关键常量(如工作数组和断言的形状)往往不会传播到线程执行的代码,而传播对于优化热循环至关重要(例如矢量化、展开)。另请注意,在具有许多内核(从 10 毫秒到 0.1 毫秒)的机器上创建许多线程可能会很昂贵。因此,通常最好只对大输入数据使用并行实现。
这是最终实现(同时使用 Python2 和 Python3):
@nb.njit('float64[::1](float64[::1], float64[::1], int64[:,::1])')
def operation_(r, gap, ends_ind):
""" calculation formula which is applied on the points specified by findMatchingPolygons_ function """
nPoints = ends_ind.shape[0]
assert ends_ind.shape[1] == 2
assert gap.size == nPoints
formula = np.empty(nPoints, dtype=np.float64)
for i in range(nPoints):
ind0, ind1 = ends_ind[i]
r0, r1 = r[ind0], r[ind1]
r_sub = r0 - r1
r_add = r0 + r1
cur_gap = gap[i]
paired_cent_dis = r_add + cur_gap
formula[i] = (cur_gap**2 * (paired_cent_dis**2 + 5 * paired_cent_dis * r_add - 7 * r_sub**2)) / (3 * paired_cent_dis)
return formula
# Use `parallel=True` for a parallel implementation
@nb.njit('int32[::1](float64[:,::1], float64[:,:,::1])')
def findMatchingPolygons_(points, polygons):
""" Attribute to all point a region """
nPolygons = polygons.shape[0]
nPolygonPoints = polygons.shape[1]
nPoints = points.shape[0]
assert points.shape[1] == 3
assert polygons.shape[2] == 3
# Compute the bounding boxes of all polygons
ll = np.empty((nPolygons, 3), dtype=np.float64)
ur = np.empty((nPolygons, 3), dtype=np.float64)
for i in range(nPolygons):
ll_x, ll_y, ll_z = polygons[i, 0]
ur_x, ur_y, ur_z = polygons[i, 0]
for j in range(1, nPolygonPoints):
x, y, z = polygons[i, j]
ll_x = x if x<ll_x else ll_x
ll_y = y if y<ll_y else ll_y
ll_z = z if z<ll_z else ll_z
ur_x = x if x>ur_x else ur_x
ur_y = y if y>ur_y else ur_y
ur_z = z if z>ur_z else ur_z
ll[i] = ll_x, ll_y, ll_z
ur[i] = ur_x, ur_y, ur_z
# Find for each point its corresponding polygon
pointPolygonId = np.empty(nPoints, dtype=np.int32)
# Use `nb.prange(nPoints)` for a parallel implementation
for i in range(nPoints):
x, y, z = points[i, 0], points[i, 1], points[i, 2]
pointPolygonId[i] = -1
for j in range(polygons.shape[0]):
if ll[j, 0] <= x < ur[j, 0] and ll[j, 1] <= y < ur[j, 1] and ll[j, 2] <= z < ur[j, 2]:
pointPolygonId[i] = j
break
return pointPolygonId
@nb.njit('float64[::1](float64[:,:,::1], float64[:,::1], float64[::1])')
def computeSections_(elem_vert, contact_poss, operate_):
nPolygons = elem_vert.shape[0]
elaps = np.zeros(nPolygons, dtype=np.float64)
pointPolygonId = findMatchingPolygons_(contact_poss, elem_vert)
for i, polygonId in enumerate(pointPolygonId):
if polygonId >= 0:
elaps[polygonId] += operate_[i]
return elaps
def elapses(r, pos, gap, ends_ind, elem_vert, contact_poss):
if len(gap) > 0:
operate_ = operation_(r, gap, ends_ind)
return computeSections_(elem_vert, contact_poss, operate_)
r = np.load('a.npy')
pos = np.load('b.npy')
gap = np.load('c.npy')
ends_ind = np.load('d.npy')
elem_vert = np.load('e.npy')
contact_poss = np.load('f.npy')
elapses(r, pos, gap, ends_ind, elem_vert, contact_poss)
以下是旧 2 核机器 (i7-3520M) 上的结果:
Small dataset:
- Original version: 5.53 ms
- Proposed version (sequential): 0.22 ms (x25)
- Proposed version (parallel): 0.20 ms (x27)
Medium dataset:
- Original version: 53.40 ms
- Proposed version (sequential): 1.24 ms (x43)
- Proposed version (parallel): 0.62 ms (x86)
Big dataset:
- Original version: 5742 ms
- Proposed version (sequential): 144 ms (x40)
- Proposed version (parallel): 67 ms (x86)
因此,提议的实施比原始实施快 86 倍。