矢量化 numpy:检查点是否在球体内?
Vectorized numpy: check if point is inside sphere?
我有一个 numpy 数组 centers
,它是 N 行 3 列,包含球体中心的 3D 坐标。然后是另一个 Nx1 数组 radii
,其中包含与球体对应的半径。最后,我有第三个数组 points
,这是我要检查的点,看它们是否在球体内。要进行检查,我知道我需要为所有点和所有球体找到 d = np.sqrt((points[0]-sphere[0])**2 + (points[1]-sphere[1])**2 + (points[2]-sphere[2])**2)
,但我不确定如何以矢量化方式执行此操作。这可能吗?谢谢。
附加信息:
- 我想 运行 使用 CuPy 在 GPU 上执行此操作,否则性能会非常快。
- 有 2-200 个球体和 300k 个点。
由于 CuPy
将我们锁定在 scipy.spatial
函数之外,我们可以回退到 numpy
-only 函数。在这种情况下,基于答案的第一部分 并(不熟练地)转换为 CuPy
方法:
d_sq = cupy.einsum('ij, ij ->i', centers , centers)[:, None] +\
cupy.einsum('ij, ij ->i', points , points) -\
2 * cupy.dot(centers, points.T)
然后可以通过 d_sq < radii[:, None] ** 2
对照 radii
检查此平方距离矩阵。这节省了 numerically-costly 平方根步骤。
当球体的数量很大时,使用像quadtree, k-D tree and especially ball tree这样的数据结构通常是个好主意。这就是 Scipy 内部使用的算法,这导致算法 运行 在 O(M log(N))
时间内,其中 N
是球体的数量,M
是点的数量.
在 GPU 上高效地实现这样的数据结构非常疯狂(因为它们 SIMD 不友好)。在您的情况下,球体的数量足够小,因此在 O(N M)
中使用朴素算法 运行 应该相对较快。
@DanielF 的解决方案是一个好的开始,但效率不高,因为它创建了几个临时数组并使用了 SIMD 不友好的内存访问模式(这是对 GPU 至关重要,以便获得合理的性能)。临时数组会导致分配和内存访问缓慢,而 GPU 则适合繁重的计算操作(通常在大型数组上)。
要解决这个问题,您可以编写自己的内核。问题是输入数组没有有效存储。它们应该 转置 以便计算可以进行更多 SIMD-friendly 内存访问并且转置数组在 GPU 上相对较慢(以及创建新数组)。请考虑使用更合适的输入数据布局。此外,不需要计算平方根,因为您可以将结果与平方半径进行比较。这是一个计算“碰撞”矩阵的示例(其中行是球体,列是点):
import cupy as cp
points = cp.random.rand(300_000, 3)
spheres = cp.random.rand(200, 3)
radii = cp.random.rand(200)
checkInSphere = cp.RawKernel(r'''
extern "C" __global__
void checkInSphere(const double* points, int pointCount, const double* spheres, const double* radii, int sphereCount, bool* found) {
int tid = blockDim.x * blockIdx.x + threadIdx.x;
if(tid < pointCount)
{
const double px = points[tid];
const double py = points[tid+pointCount];
const double pz = points[tid+pointCount*2];
for(int i=0 ; i<sphereCount ; ++i)
{
const double r = radii[i];
const double sx = spheres[i];
const double sy = spheres[i+sphereCount];
const double sz = spheres[i+sphereCount*2];
const double dx = px - sx;
const double dy = py - sy;
const double dz = pz - sz;
const double sqDist = dx * dx + dy * dy + dz * dz;
found[i*pointCount+tid] = sqDist <= r * r;
}
}
}
''', 'checkInSphere')
threadPerBlock = 64
blockCount = (points.size + (threadPerBlock - 1)) // threadPerBlock
found = cp.empty((spheres.shape[0], points.shape[0]), dtype=cp.bool_)
checkInSphere((blockCount,), (threadPerBlock,), (points.T.copy(), points.shape[0], spheres.T.copy(), radii, spheres.shape[0], found))
此代码在双精度方面比@DanielF 的代码快 4 倍(即没有平方根和检查的最后一个版本)。问题是我的 GPU 和大多数 GPU 一样 在 double-precision (DP) floating-point 数字 上运行非常慢。事实上,主流 client-side GPU 并不是为这种工作负载而设计的,而且通常比 CPUs 慢。但是,(昂贵的)server-side GPU 可以有效地处理 double-precision 个数字。因此,如果您使用的是基本主流 GPU,请考虑使用 simple-precision floating-point (SP) 编号。如果你真的想使用 DP,那么使用像 Numba 这样的工具在 CPU 上这个计算肯定会更快。请注意,在 server-side GPU 和 CPU 上使用 SP 操作通常也更快。在我的机器上,此代码的 SP 版本大约需要 0.7 毫秒,比 @DanielF(更新版本)快 15 倍。
为了更好的性能,请考虑更改输入布局,pre-allocate所有缓冲区,pre-computeradii
的方块,并使用共享内存 所以只加载一次球体位置。循环展开和寄存器平铺也应该有助于进一步提高性能,尽管它会使代码几乎不可读。也可以只存储每个点最近球体的索引,或者甚至只存储一个布尔值来表示该点是否在任何球体中,以减少输出的大小(内存访问很昂贵)。
我有一个 numpy 数组 centers
,它是 N 行 3 列,包含球体中心的 3D 坐标。然后是另一个 Nx1 数组 radii
,其中包含与球体对应的半径。最后,我有第三个数组 points
,这是我要检查的点,看它们是否在球体内。要进行检查,我知道我需要为所有点和所有球体找到 d = np.sqrt((points[0]-sphere[0])**2 + (points[1]-sphere[1])**2 + (points[2]-sphere[2])**2)
,但我不确定如何以矢量化方式执行此操作。这可能吗?谢谢。
附加信息:
- 我想 运行 使用 CuPy 在 GPU 上执行此操作,否则性能会非常快。
- 有 2-200 个球体和 300k 个点。
由于 CuPy
将我们锁定在 scipy.spatial
函数之外,我们可以回退到 numpy
-only 函数。在这种情况下,基于答案的第一部分 CuPy
方法:
d_sq = cupy.einsum('ij, ij ->i', centers , centers)[:, None] +\
cupy.einsum('ij, ij ->i', points , points) -\
2 * cupy.dot(centers, points.T)
然后可以通过 d_sq < radii[:, None] ** 2
对照 radii
检查此平方距离矩阵。这节省了 numerically-costly 平方根步骤。
当球体的数量很大时,使用像quadtree, k-D tree and especially ball tree这样的数据结构通常是个好主意。这就是 Scipy 内部使用的算法,这导致算法 运行 在 O(M log(N))
时间内,其中 N
是球体的数量,M
是点的数量.
在 GPU 上高效地实现这样的数据结构非常疯狂(因为它们 SIMD 不友好)。在您的情况下,球体的数量足够小,因此在 O(N M)
中使用朴素算法 运行 应该相对较快。
@DanielF 的解决方案是一个好的开始,但效率不高,因为它创建了几个临时数组并使用了 SIMD 不友好的内存访问模式(这是对 GPU 至关重要,以便获得合理的性能)。临时数组会导致分配和内存访问缓慢,而 GPU 则适合繁重的计算操作(通常在大型数组上)。
要解决这个问题,您可以编写自己的内核。问题是输入数组没有有效存储。它们应该 转置 以便计算可以进行更多 SIMD-friendly 内存访问并且转置数组在 GPU 上相对较慢(以及创建新数组)。请考虑使用更合适的输入数据布局。此外,不需要计算平方根,因为您可以将结果与平方半径进行比较。这是一个计算“碰撞”矩阵的示例(其中行是球体,列是点):
import cupy as cp
points = cp.random.rand(300_000, 3)
spheres = cp.random.rand(200, 3)
radii = cp.random.rand(200)
checkInSphere = cp.RawKernel(r'''
extern "C" __global__
void checkInSphere(const double* points, int pointCount, const double* spheres, const double* radii, int sphereCount, bool* found) {
int tid = blockDim.x * blockIdx.x + threadIdx.x;
if(tid < pointCount)
{
const double px = points[tid];
const double py = points[tid+pointCount];
const double pz = points[tid+pointCount*2];
for(int i=0 ; i<sphereCount ; ++i)
{
const double r = radii[i];
const double sx = spheres[i];
const double sy = spheres[i+sphereCount];
const double sz = spheres[i+sphereCount*2];
const double dx = px - sx;
const double dy = py - sy;
const double dz = pz - sz;
const double sqDist = dx * dx + dy * dy + dz * dz;
found[i*pointCount+tid] = sqDist <= r * r;
}
}
}
''', 'checkInSphere')
threadPerBlock = 64
blockCount = (points.size + (threadPerBlock - 1)) // threadPerBlock
found = cp.empty((spheres.shape[0], points.shape[0]), dtype=cp.bool_)
checkInSphere((blockCount,), (threadPerBlock,), (points.T.copy(), points.shape[0], spheres.T.copy(), radii, spheres.shape[0], found))
此代码在双精度方面比@DanielF 的代码快 4 倍(即没有平方根和检查的最后一个版本)。问题是我的 GPU 和大多数 GPU 一样 在 double-precision (DP) floating-point 数字 上运行非常慢。事实上,主流 client-side GPU 并不是为这种工作负载而设计的,而且通常比 CPUs 慢。但是,(昂贵的)server-side GPU 可以有效地处理 double-precision 个数字。因此,如果您使用的是基本主流 GPU,请考虑使用 simple-precision floating-point (SP) 编号。如果你真的想使用 DP,那么使用像 Numba 这样的工具在 CPU 上这个计算肯定会更快。请注意,在 server-side GPU 和 CPU 上使用 SP 操作通常也更快。在我的机器上,此代码的 SP 版本大约需要 0.7 毫秒,比 @DanielF(更新版本)快 15 倍。
为了更好的性能,请考虑更改输入布局,pre-allocate所有缓冲区,pre-computeradii
的方块,并使用共享内存 所以只加载一次球体位置。循环展开和寄存器平铺也应该有助于进一步提高性能,尽管它会使代码几乎不可读。也可以只存储每个点最近球体的索引,或者甚至只存储一个布尔值来表示该点是否在任何球体中,以减少输出的大小(内存访问很昂贵)。