一个大内核与许多小内核和 memcopy 的并发性 (CUDA)
Concurrency of one large kernel with many small kernels and memcopys (CUDA)
我正在开发一个 Multi-GPU 加速流解算器。目前我正在尝试实现通信隐藏。这意味着,在交换数据时,GPU 会计算不参与通信的网格部分,并在通信完成后计算网格的其余部分。
我试图通过为长 运行 时间内核 (fluxKernel
) 和一个 (communicationStream
) 提供一个流 (computeStream
) 和一个 (communicationStream
) 来解决这个问题沟通的不同阶段。 computeStream
的优先级非常低,以允许 communicationStream
上的内核交错 fluxKernel
,即使它使用了所有资源。
这些是我正在使用的流:
int priority_high, priority_low;
cudaDeviceGetStreamPriorityRange(&priority_low , &priority_high ) ;
cudaStreamCreateWithPriority (&communicationStream, cudaStreamNonBlocking, priority_high );
cudaStreamCreateWithPriority (&computeStream , cudaStreamNonBlocking, priority_low );
所需的并发模式如下所示:
在我通过 MPI 发送数据之前,我需要同步 communicationStream
,以确保数据在我继续发送之前完全下载。
在下面的清单中,我展示了我目前正在做的事情的结构。首先,我在 computeStream
上开始长时间 运行 网格的主要部分 fluxKernel
。然后我启动一个 sendKernel
来收集应该发送到第二个 GPU 的数据,然后将其下载到主机(由于硬件限制,我不能使用 cuda-aware MPI)。然后根据 MPI_Isend
发送数据 non-blocking,随后使用阻塞接收 (MPI_recv
)。当接收到数据时,该过程向后完成。首先将数据上传到设备,然后通过 recvKernel
传播到主要数据结构。最后,在 communicationStream
.
上为网格的剩余部分调用 fluxKernel
请注意,在显示的代码内核之前和之后,默认流上的内核是 运行。
{ ... } // Preparations
// Start main part of computatation on first stream
fluxKernel<<< ..., ..., 0, computeStream >>>( /* main Part */ );
// Prepare send data
sendKernel<<< ..., ..., 0, communicationStream >>>( ... );
cudaMemcpyAsync ( ..., ..., ..., cudaMemcpyDeviceToHost, communicationStream );
cudaStreamSynchronize( communicationStream );
// MPI Communication
MPI_Isend( ... );
MPI_Recv ( ... );
// Use received data
cudaMemcpyAsync ( ..., ..., ..., cudaMemcpyHostToDevice, communicationStream );
recvKernel<<< ..., ..., 0, communicationStream >>>( ... );
fluxKernel<<< ..., ..., 0, communicationStream >>>( /* remaining Part */ );
{ ... } // Rest of the Computations
我使用 nvprof 和 Visual Profiler 查看流是否实际并发执行。这是结果:
我观察到 sendKernel
(紫色)、上传、MPI 通信和下载与 fluxKernel
并发。但是,recvKernel
(红色)仅在另一个流完成后才开始。关闭同步并不能解决问题:
对于我的实际应用程序,我不仅有一个通信,而且有多个。我也用两个通信测试了这个。程序是:
sendKernel<<< ..., ..., 0, communicationStream >>>( ... );
cudaMemcpyAsync ( ..., ..., ..., cudaMemcpyDeviceToHost, communicationStream );
cudaStreamSynchronize( communicationStream );
MPI_Isend( ... );
sendKernel<<< ..., ..., 0, communicationStream >>>( ... );
cudaMemcpyAsync ( ..., ..., ..., cudaMemcpyDeviceToHost, communicationStream );
cudaStreamSynchronize( communicationStream );
MPI_Isend( ... );
MPI_Recv ( ... );
cudaMemcpyAsync ( ..., ..., ..., cudaMemcpyHostToDevice, communicationStream );
recvKernel<<< ..., ..., 0, communicationStream >>>( ... );
MPI_Recv ( ... );
cudaMemcpyAsync ( ..., ..., ..., cudaMemcpyHostToDevice, communicationStream );
recvKernel<<< ..., ..., 0, communicationStream >>>( ... );
结果类似于上面的一对一通信,因为第二次内核调用(这次是 sendKernel
)被延迟到 [=14= 上的内核]完了。
因此总体观察结果是,第二个内核调用被延迟,与这是哪个内核无关。
你能解释一下,为什么 GPU 以这种方式同步,或者我如何让 communicationStream
上的第二个内核也 运行 同时与 computeStream 同步?
非常感谢。
编辑 1: 完全返工问题
最小可重现示例
我构建了一个最小的可重现示例。最后,代码将 int
数据绘制到终端。正确的最后一个值为 32778 (=(32*1024-1) + 1 + 10)。一开始我添加了一个选项整数来测试3个不同的选项:
- 0:在 CPU 数据修改
之前同步的预期版本
- 1:同0,但不同步
- 2:memcpys 专用流,无同步
#include <iostream>
#include <cuda.h>
#include <cuda_runtime.h>
#include <device_launch_parameters.h>
const int option = 0;
const int numberOfEntities = 2 * 1024 * 1024;
const int smallNumberOfEntities = 32 * 1024;
__global__ void longKernel(float* dataDeviceIn, float* dataDeviceOut, int numberOfEntities)
{
int index = blockIdx.x * blockDim.x + threadIdx.x;
if(index >= numberOfEntities) return;
float tmp = dataDeviceIn[index];
#pragma unroll
for( int i = 0; i < 2000; i++ ) tmp += 1.0;
dataDeviceOut[index] = tmp;
}
__global__ void smallKernel_1( int* smallDeviceData, int numberOfEntities )
{
int index = blockIdx.x * blockDim.x + threadIdx.x;
if(index >= numberOfEntities) return;
smallDeviceData[index] = index;
}
__global__ void smallKernel_2( int* smallDeviceData, int numberOfEntities )
{
int index = blockIdx.x * blockDim.x + threadIdx.x;
if(index >= numberOfEntities) return;
int value = smallDeviceData[index];
value += 10;
smallDeviceData[index] = value;
}
int main(int argc, char **argv)
{
cudaSetDevice(0);
float* dataDeviceIn;
float* dataDeviceOut;
cudaMalloc( &dataDeviceIn , sizeof(float) * numberOfEntities );
cudaMalloc( &dataDeviceOut, sizeof(float) * numberOfEntities );
int* smallDataDevice;
int* smallDataHost;
cudaMalloc ( &smallDataDevice, sizeof(int) * smallNumberOfEntities );
cudaMallocHost( &smallDataHost , sizeof(int) * smallNumberOfEntities );
cudaStream_t streamLong;
cudaStream_t streamSmall;
cudaStream_t streamCopy;
int priority_high, priority_low;
cudaDeviceGetStreamPriorityRange(&priority_low , &priority_high ) ;
cudaStreamCreateWithPriority (&streamLong , cudaStreamNonBlocking, priority_low );
cudaStreamCreateWithPriority (&streamSmall, cudaStreamNonBlocking, priority_high );
cudaStreamCreateWithPriority (&streamCopy , cudaStreamNonBlocking, priority_high );
//////////////////////////////////////////////////////////////////////////
longKernel <<< numberOfEntities / 32, 32, 0, streamLong >>> (dataDeviceIn, dataDeviceOut, numberOfEntities);
//////////////////////////////////////////////////////////////////////////
smallKernel_1 <<< smallNumberOfEntities / 32, 32, 0 , streamSmall >>> (smallDataDevice, smallNumberOfEntities);
if( option <= 1 ) cudaMemcpyAsync( smallDataHost, smallDataDevice, sizeof(int) * smallNumberOfEntities, cudaMemcpyDeviceToHost, streamSmall );
if( option == 2 ) cudaMemcpyAsync( smallDataHost, smallDataDevice, sizeof(int) * smallNumberOfEntities, cudaMemcpyDeviceToHost, streamCopy );
if( option == 0 ) cudaStreamSynchronize( streamSmall );
// some CPU modification of data
for( int i = 0; i < smallNumberOfEntities; i++ ) smallDataHost[i] += 1;
if( option <= 1 ) cudaMemcpyAsync( smallDataDevice, smallDataHost, sizeof(int) * smallNumberOfEntities, cudaMemcpyHostToDevice, streamSmall );
if( option == 2 ) cudaMemcpyAsync( smallDataDevice, smallDataHost, sizeof(int) * smallNumberOfEntities, cudaMemcpyHostToDevice, streamCopy );
smallKernel_2 <<< smallNumberOfEntities / 32, 32, 0 , streamSmall >>> (smallDataDevice, smallNumberOfEntities);
//////////////////////////////////////////////////////////////////////////
cudaDeviceSynchronize();
cudaMemcpy( smallDataHost, smallDataDevice, sizeof(int) * smallNumberOfEntities, cudaMemcpyDeviceToHost );
for( int i = 0; i < smallNumberOfEntities; i++ ) std::cout << smallDataHost[i] << "\n";
return 0;
}
通过代码,我看到了与上述相同的行为:
选项0(正确结果):
选项 1(错误的结果,缺少 CPU 中的 +1):
选项2(完全错误的结果,全部10个,在smallKernel_1
之前下载)
解决方案:
运行 Linux 下的选项 0(根据 Roberts answere 中的建议),带来了预期的行为!
以下是我将如何尝试完成此操作。
- 按照您的建议使用 high-priority/low-priority 流安排。
- 只需要 2 个流
- 确保固定主机内存以允许compute/copy重叠
- 由于您不打算使用 cuda-aware MPI,因此您的 MPI 事务纯粹是主机 activity。因此,我们可以使用流回调将此主机 activity 插入到 high-priority 流中。
- 为了让 high-priority 内核轻松插入到 low-priority 内核中,我为高优先级复制内核选择了 grid-stride-loop 的设计策略,但非 grid-stride-loop 用于低优先级内核。我们希望低优先级内核拥有更多的块,以便块始终启动和退出,轻松允许 GPU 块调度程序在块可用时插入 high-priority 个块。
- 每个 "frame" 的工作发布不使用任何类型的同步调用。我在每个 loop/frame 中使用一次
cudaDeviceSynchronize()
来中断(分离)一帧与下一帧的处理。框架内的活动安排完全使用 CUDA 流语义来处理,以对相互依赖的活动强制执行序列化,但允许不相互依赖的活动并发。
下面是实现这些想法的示例代码:
#include <iostream>
#include <unistd.h>
#include <cstdio>
#define cudaCheckErrors(msg) \
do { \
cudaError_t __err = cudaGetLastError(); \
if (__err != cudaSuccess) { \
fprintf(stderr, "Fatal error: %s (%s at %s:%d)\n", \
msg, cudaGetErrorString(__err), \
__FILE__, __LINE__); \
fprintf(stderr, "*** FAILED - ABORTING\n"); \
exit(1); \
} \
} while (0)
typedef double mt;
const int nTPB = 512;
const size_t ds = 100ULL*1048576;
const size_t bs = 1048576ULL;
const int my_intensity = 1;
const int loops = 4;
const size_t host_func_delay_us = 100;
const int max_blocks = 320; // chosen based on GPU, could use runtime calls to set this via cudaGetDeviceProperties
template <typename T>
__global__ void fluxKernel(T * __restrict__ d, const size_t n, const int intensity){
size_t idx = ((size_t)blockDim.x) * blockIdx.x + threadIdx.x;
if (idx < n){
T temp = d[idx];
for (int i = 0; i < intensity; i++)
temp = sin(temp); // just some dummy code to simulate "real work"
d[idx] = temp;
}
}
template <typename T>
__global__ void sendKernel(const T * __restrict__ d, const size_t n, T * __restrict__ b){
for (size_t idx = ((size_t)blockDim.x) * blockIdx.x + threadIdx.x; idx < n; idx += ((size_t)blockDim.x)*gridDim.x)
b[idx] = d[idx];
}
template <typename T>
__global__ void recvKernel(const T * __restrict__ b, const size_t n, T * __restrict__ d){
for (size_t idx = ((size_t)blockDim.x) * blockIdx.x + threadIdx.x; idx < n; idx += ((size_t)blockDim.x)*gridDim.x)
d[idx] = b[idx];
}
void CUDART_CB MyCallback(cudaStream_t stream, cudaError_t status, void *data){
printf("Loop %lu callback\n", (size_t)data);
usleep(host_func_delay_us); // simulate: this is where non-cuda-aware MPI calls would go, operating on h_buf
}
int main(){
// get the range of stream priorities for this device
int priority_high, priority_low;
cudaDeviceGetStreamPriorityRange(&priority_low, &priority_high);
// create streams with highest and lowest available priorities
cudaStream_t st_high, st_low;
cudaStreamCreateWithPriority(&st_high, cudaStreamNonBlocking, priority_high);
cudaStreamCreateWithPriority(&st_low, cudaStreamNonBlocking, priority_low);
// allocations
mt *h_buf, *d_buf, *d_data;
cudaMalloc(&d_data, ds*sizeof(d_data[0]));
cudaMalloc(&d_buf, bs*sizeof(d_buf[0]));
cudaHostAlloc(&h_buf, bs*sizeof(h_buf[0]), cudaHostAllocDefault);
cudaCheckErrors("setup error");
// main processing loop
for (unsigned long i = 0; i < loops; i++){
// issue low-priority
fluxKernel<<<((ds-bs)+nTPB)/nTPB, nTPB,0,st_low>>>(d_data+bs, ds-bs, my_intensity);
// issue high-priority
sendKernel<<<max_blocks,nTPB,0,st_high>>>(d_data, bs, d_buf);
cudaMemcpyAsync(h_buf, d_buf, bs*sizeof(h_buf[0]), cudaMemcpyDeviceToHost, st_high);
cudaStreamAddCallback(st_high, MyCallback, (void*)i, 0);
cudaMemcpyAsync(d_buf, h_buf, bs*sizeof(h_buf[0]), cudaMemcpyHostToDevice, st_high);
recvKernel<<<max_blocks,nTPB,0,st_high>>>(d_buf, bs, d_data);
fluxKernel<<<((bs)+nTPB)/nTPB, nTPB,0,st_high>>>(d_data, bs, my_intensity);
cudaDeviceSynchronize();
cudaCheckErrors("loop error");
}
return 0;
}
这是可视化分析器时间轴输出(在 linux,Tesla V100 上):
请注意,在 Windows WDDM 上安排复杂的并发场景可能非常具有挑战性。我建议避免这种情况,这个答案并不打算讨论那里的所有挑战。我建议使用 linux 或 Windows TCC GPU 来执行此操作。
如果您在您的机器上尝试此代码,您可能需要调整一些不同的常量以使事情看起来像这样。
我正在开发一个 Multi-GPU 加速流解算器。目前我正在尝试实现通信隐藏。这意味着,在交换数据时,GPU 会计算不参与通信的网格部分,并在通信完成后计算网格的其余部分。
我试图通过为长 运行 时间内核 (fluxKernel
) 和一个 (communicationStream
) 提供一个流 (computeStream
) 和一个 (communicationStream
) 来解决这个问题沟通的不同阶段。 computeStream
的优先级非常低,以允许 communicationStream
上的内核交错 fluxKernel
,即使它使用了所有资源。
这些是我正在使用的流:
int priority_high, priority_low;
cudaDeviceGetStreamPriorityRange(&priority_low , &priority_high ) ;
cudaStreamCreateWithPriority (&communicationStream, cudaStreamNonBlocking, priority_high );
cudaStreamCreateWithPriority (&computeStream , cudaStreamNonBlocking, priority_low );
所需的并发模式如下所示:
在我通过 MPI 发送数据之前,我需要同步 communicationStream
,以确保数据在我继续发送之前完全下载。
在下面的清单中,我展示了我目前正在做的事情的结构。首先,我在 computeStream
上开始长时间 运行 网格的主要部分 fluxKernel
。然后我启动一个 sendKernel
来收集应该发送到第二个 GPU 的数据,然后将其下载到主机(由于硬件限制,我不能使用 cuda-aware MPI)。然后根据 MPI_Isend
发送数据 non-blocking,随后使用阻塞接收 (MPI_recv
)。当接收到数据时,该过程向后完成。首先将数据上传到设备,然后通过 recvKernel
传播到主要数据结构。最后,在 communicationStream
.
fluxKernel
请注意,在显示的代码内核之前和之后,默认流上的内核是 运行。
{ ... } // Preparations
// Start main part of computatation on first stream
fluxKernel<<< ..., ..., 0, computeStream >>>( /* main Part */ );
// Prepare send data
sendKernel<<< ..., ..., 0, communicationStream >>>( ... );
cudaMemcpyAsync ( ..., ..., ..., cudaMemcpyDeviceToHost, communicationStream );
cudaStreamSynchronize( communicationStream );
// MPI Communication
MPI_Isend( ... );
MPI_Recv ( ... );
// Use received data
cudaMemcpyAsync ( ..., ..., ..., cudaMemcpyHostToDevice, communicationStream );
recvKernel<<< ..., ..., 0, communicationStream >>>( ... );
fluxKernel<<< ..., ..., 0, communicationStream >>>( /* remaining Part */ );
{ ... } // Rest of the Computations
我使用 nvprof 和 Visual Profiler 查看流是否实际并发执行。这是结果:
我观察到 sendKernel
(紫色)、上传、MPI 通信和下载与 fluxKernel
并发。但是,recvKernel
(红色)仅在另一个流完成后才开始。关闭同步并不能解决问题:
对于我的实际应用程序,我不仅有一个通信,而且有多个。我也用两个通信测试了这个。程序是:
sendKernel<<< ..., ..., 0, communicationStream >>>( ... );
cudaMemcpyAsync ( ..., ..., ..., cudaMemcpyDeviceToHost, communicationStream );
cudaStreamSynchronize( communicationStream );
MPI_Isend( ... );
sendKernel<<< ..., ..., 0, communicationStream >>>( ... );
cudaMemcpyAsync ( ..., ..., ..., cudaMemcpyDeviceToHost, communicationStream );
cudaStreamSynchronize( communicationStream );
MPI_Isend( ... );
MPI_Recv ( ... );
cudaMemcpyAsync ( ..., ..., ..., cudaMemcpyHostToDevice, communicationStream );
recvKernel<<< ..., ..., 0, communicationStream >>>( ... );
MPI_Recv ( ... );
cudaMemcpyAsync ( ..., ..., ..., cudaMemcpyHostToDevice, communicationStream );
recvKernel<<< ..., ..., 0, communicationStream >>>( ... );
结果类似于上面的一对一通信,因为第二次内核调用(这次是 sendKernel
)被延迟到 [=14= 上的内核]完了。
因此总体观察结果是,第二个内核调用被延迟,与这是哪个内核无关。
你能解释一下,为什么 GPU 以这种方式同步,或者我如何让 communicationStream
上的第二个内核也 运行 同时与 computeStream 同步?
非常感谢。
编辑 1: 完全返工问题
最小可重现示例
我构建了一个最小的可重现示例。最后,代码将 int
数据绘制到终端。正确的最后一个值为 32778 (=(32*1024-1) + 1 + 10)。一开始我添加了一个选项整数来测试3个不同的选项:
- 0:在 CPU 数据修改 之前同步的预期版本
- 1:同0,但不同步
- 2:memcpys 专用流,无同步
#include <iostream>
#include <cuda.h>
#include <cuda_runtime.h>
#include <device_launch_parameters.h>
const int option = 0;
const int numberOfEntities = 2 * 1024 * 1024;
const int smallNumberOfEntities = 32 * 1024;
__global__ void longKernel(float* dataDeviceIn, float* dataDeviceOut, int numberOfEntities)
{
int index = blockIdx.x * blockDim.x + threadIdx.x;
if(index >= numberOfEntities) return;
float tmp = dataDeviceIn[index];
#pragma unroll
for( int i = 0; i < 2000; i++ ) tmp += 1.0;
dataDeviceOut[index] = tmp;
}
__global__ void smallKernel_1( int* smallDeviceData, int numberOfEntities )
{
int index = blockIdx.x * blockDim.x + threadIdx.x;
if(index >= numberOfEntities) return;
smallDeviceData[index] = index;
}
__global__ void smallKernel_2( int* smallDeviceData, int numberOfEntities )
{
int index = blockIdx.x * blockDim.x + threadIdx.x;
if(index >= numberOfEntities) return;
int value = smallDeviceData[index];
value += 10;
smallDeviceData[index] = value;
}
int main(int argc, char **argv)
{
cudaSetDevice(0);
float* dataDeviceIn;
float* dataDeviceOut;
cudaMalloc( &dataDeviceIn , sizeof(float) * numberOfEntities );
cudaMalloc( &dataDeviceOut, sizeof(float) * numberOfEntities );
int* smallDataDevice;
int* smallDataHost;
cudaMalloc ( &smallDataDevice, sizeof(int) * smallNumberOfEntities );
cudaMallocHost( &smallDataHost , sizeof(int) * smallNumberOfEntities );
cudaStream_t streamLong;
cudaStream_t streamSmall;
cudaStream_t streamCopy;
int priority_high, priority_low;
cudaDeviceGetStreamPriorityRange(&priority_low , &priority_high ) ;
cudaStreamCreateWithPriority (&streamLong , cudaStreamNonBlocking, priority_low );
cudaStreamCreateWithPriority (&streamSmall, cudaStreamNonBlocking, priority_high );
cudaStreamCreateWithPriority (&streamCopy , cudaStreamNonBlocking, priority_high );
//////////////////////////////////////////////////////////////////////////
longKernel <<< numberOfEntities / 32, 32, 0, streamLong >>> (dataDeviceIn, dataDeviceOut, numberOfEntities);
//////////////////////////////////////////////////////////////////////////
smallKernel_1 <<< smallNumberOfEntities / 32, 32, 0 , streamSmall >>> (smallDataDevice, smallNumberOfEntities);
if( option <= 1 ) cudaMemcpyAsync( smallDataHost, smallDataDevice, sizeof(int) * smallNumberOfEntities, cudaMemcpyDeviceToHost, streamSmall );
if( option == 2 ) cudaMemcpyAsync( smallDataHost, smallDataDevice, sizeof(int) * smallNumberOfEntities, cudaMemcpyDeviceToHost, streamCopy );
if( option == 0 ) cudaStreamSynchronize( streamSmall );
// some CPU modification of data
for( int i = 0; i < smallNumberOfEntities; i++ ) smallDataHost[i] += 1;
if( option <= 1 ) cudaMemcpyAsync( smallDataDevice, smallDataHost, sizeof(int) * smallNumberOfEntities, cudaMemcpyHostToDevice, streamSmall );
if( option == 2 ) cudaMemcpyAsync( smallDataDevice, smallDataHost, sizeof(int) * smallNumberOfEntities, cudaMemcpyHostToDevice, streamCopy );
smallKernel_2 <<< smallNumberOfEntities / 32, 32, 0 , streamSmall >>> (smallDataDevice, smallNumberOfEntities);
//////////////////////////////////////////////////////////////////////////
cudaDeviceSynchronize();
cudaMemcpy( smallDataHost, smallDataDevice, sizeof(int) * smallNumberOfEntities, cudaMemcpyDeviceToHost );
for( int i = 0; i < smallNumberOfEntities; i++ ) std::cout << smallDataHost[i] << "\n";
return 0;
}
通过代码,我看到了与上述相同的行为:
选项0(正确结果):
选项 1(错误的结果,缺少 CPU 中的 +1):
选项2(完全错误的结果,全部10个,在smallKernel_1
之前下载)
解决方案:
运行 Linux 下的选项 0(根据 Roberts answere 中的建议),带来了预期的行为!
以下是我将如何尝试完成此操作。
- 按照您的建议使用 high-priority/low-priority 流安排。
- 只需要 2 个流
- 确保固定主机内存以允许compute/copy重叠
- 由于您不打算使用 cuda-aware MPI,因此您的 MPI 事务纯粹是主机 activity。因此,我们可以使用流回调将此主机 activity 插入到 high-priority 流中。
- 为了让 high-priority 内核轻松插入到 low-priority 内核中,我为高优先级复制内核选择了 grid-stride-loop 的设计策略,但非 grid-stride-loop 用于低优先级内核。我们希望低优先级内核拥有更多的块,以便块始终启动和退出,轻松允许 GPU 块调度程序在块可用时插入 high-priority 个块。
- 每个 "frame" 的工作发布不使用任何类型的同步调用。我在每个 loop/frame 中使用一次
cudaDeviceSynchronize()
来中断(分离)一帧与下一帧的处理。框架内的活动安排完全使用 CUDA 流语义来处理,以对相互依赖的活动强制执行序列化,但允许不相互依赖的活动并发。
下面是实现这些想法的示例代码:
#include <iostream>
#include <unistd.h>
#include <cstdio>
#define cudaCheckErrors(msg) \
do { \
cudaError_t __err = cudaGetLastError(); \
if (__err != cudaSuccess) { \
fprintf(stderr, "Fatal error: %s (%s at %s:%d)\n", \
msg, cudaGetErrorString(__err), \
__FILE__, __LINE__); \
fprintf(stderr, "*** FAILED - ABORTING\n"); \
exit(1); \
} \
} while (0)
typedef double mt;
const int nTPB = 512;
const size_t ds = 100ULL*1048576;
const size_t bs = 1048576ULL;
const int my_intensity = 1;
const int loops = 4;
const size_t host_func_delay_us = 100;
const int max_blocks = 320; // chosen based on GPU, could use runtime calls to set this via cudaGetDeviceProperties
template <typename T>
__global__ void fluxKernel(T * __restrict__ d, const size_t n, const int intensity){
size_t idx = ((size_t)blockDim.x) * blockIdx.x + threadIdx.x;
if (idx < n){
T temp = d[idx];
for (int i = 0; i < intensity; i++)
temp = sin(temp); // just some dummy code to simulate "real work"
d[idx] = temp;
}
}
template <typename T>
__global__ void sendKernel(const T * __restrict__ d, const size_t n, T * __restrict__ b){
for (size_t idx = ((size_t)blockDim.x) * blockIdx.x + threadIdx.x; idx < n; idx += ((size_t)blockDim.x)*gridDim.x)
b[idx] = d[idx];
}
template <typename T>
__global__ void recvKernel(const T * __restrict__ b, const size_t n, T * __restrict__ d){
for (size_t idx = ((size_t)blockDim.x) * blockIdx.x + threadIdx.x; idx < n; idx += ((size_t)blockDim.x)*gridDim.x)
d[idx] = b[idx];
}
void CUDART_CB MyCallback(cudaStream_t stream, cudaError_t status, void *data){
printf("Loop %lu callback\n", (size_t)data);
usleep(host_func_delay_us); // simulate: this is where non-cuda-aware MPI calls would go, operating on h_buf
}
int main(){
// get the range of stream priorities for this device
int priority_high, priority_low;
cudaDeviceGetStreamPriorityRange(&priority_low, &priority_high);
// create streams with highest and lowest available priorities
cudaStream_t st_high, st_low;
cudaStreamCreateWithPriority(&st_high, cudaStreamNonBlocking, priority_high);
cudaStreamCreateWithPriority(&st_low, cudaStreamNonBlocking, priority_low);
// allocations
mt *h_buf, *d_buf, *d_data;
cudaMalloc(&d_data, ds*sizeof(d_data[0]));
cudaMalloc(&d_buf, bs*sizeof(d_buf[0]));
cudaHostAlloc(&h_buf, bs*sizeof(h_buf[0]), cudaHostAllocDefault);
cudaCheckErrors("setup error");
// main processing loop
for (unsigned long i = 0; i < loops; i++){
// issue low-priority
fluxKernel<<<((ds-bs)+nTPB)/nTPB, nTPB,0,st_low>>>(d_data+bs, ds-bs, my_intensity);
// issue high-priority
sendKernel<<<max_blocks,nTPB,0,st_high>>>(d_data, bs, d_buf);
cudaMemcpyAsync(h_buf, d_buf, bs*sizeof(h_buf[0]), cudaMemcpyDeviceToHost, st_high);
cudaStreamAddCallback(st_high, MyCallback, (void*)i, 0);
cudaMemcpyAsync(d_buf, h_buf, bs*sizeof(h_buf[0]), cudaMemcpyHostToDevice, st_high);
recvKernel<<<max_blocks,nTPB,0,st_high>>>(d_buf, bs, d_data);
fluxKernel<<<((bs)+nTPB)/nTPB, nTPB,0,st_high>>>(d_data, bs, my_intensity);
cudaDeviceSynchronize();
cudaCheckErrors("loop error");
}
return 0;
}
这是可视化分析器时间轴输出(在 linux,Tesla V100 上):
请注意,在 Windows WDDM 上安排复杂的并发场景可能非常具有挑战性。我建议避免这种情况,这个答案并不打算讨论那里的所有挑战。我建议使用 linux 或 Windows TCC GPU 来执行此操作。
如果您在您的机器上尝试此代码,您可能需要调整一些不同的常量以使事情看起来像这样。