高效地同步排列许多小型 OpenCL 内核

Efficiently synchronously queue many small OpenCL kernels

TLDR:我怎样才能 运行 多个小内核,一次一个,而不会有显着的开销?

我正在做一个充当虚拟绿屏的项目。它接受图像输入,寻找与颜色键相似的像素,并用替换颜色替换这些像素。我计划在 Windows 中将生成的图像提要输出为虚拟网络摄像头。完整的源代码是 on Github。目前,我在 Java (JOCL) 中使用 OpenCL 绑定来加速该过程。主要应用程序是用 JavaFX 和 Kotlin 编写的,我对此很满意,但 OpenCL 内核是用 C 编写的,我是新手。

这是我为程序创建的主要“API”。我尝试使 API 接口相对开放,以便将来可以添加直接的 Cuda 支持。

class OpenClApi constructor(
    platformIndex: Int = 0,
    deviceIndex: Int = 0,
    val localWorkSize: Long? = null
) : AbstractApi {
    companion object : AbstractApi.AbstractApiConsts {
        override val listName = "OpenCl"

        enum class ClMemOperation(val flags: Long) {
            // CL_MEM_USE_HOST_PTR instead of CL_MEM_COPY_HOST_PTR speeds up most operations for realtime video
            READ(CL_MEM_READ_ONLY or CL_MEM_USE_HOST_PTR),
            WRITE(CL_MEM_WRITE_ONLY)
        }

        private fun getPlatforms(): Array<cl_platform_id?> {
            val numPlatformsArray = IntArray(1)
            clGetPlatformIDs(0, null, numPlatformsArray)
            val numPlatforms = numPlatformsArray[0]
            val platforms = arrayOfNulls<cl_platform_id>(numPlatforms)
            clGetPlatformIDs(platforms.size, platforms, null)
            return platforms
        }

        private fun getPlatform(platformId: Int) = getPlatforms()[platformId]
            ?: throw ArrayIndexOutOfBoundsException("Couldn't find the specified platform")

        fun getPlatformsMap(): Map<Int, String> {
            val platforms = getPlatforms()
            val result = mutableMapOf<Int, String>()
            for (platformId in platforms.indices) {
                val platformFromList = platforms[platformId]
                val size = LongArray(1)
                clGetPlatformInfo(platformFromList, CL_PLATFORM_NAME, 0, null, size)
                val buffer = ByteArray(size[0].toInt())
                clGetPlatformInfo(platformFromList, CL_PLATFORM_NAME, buffer.size.toLong(), Pointer.to(buffer), null)
                result[platformId] = String(buffer, 0, buffer.size - 1)
            }
            return result
        }

        private fun getDevices(platformId: Int): Array<cl_device_id?> {
            val platform = getPlatform(platformId)
            val numDevicesArray = IntArray(1)
            clGetDeviceIDs(platform, CL_DEVICE_TYPE_ALL, 0, null, numDevicesArray)
            val numDevices = numDevicesArray[0]
            val devices = arrayOfNulls<cl_device_id>(numDevices)
            clGetDeviceIDs(platform, CL_DEVICE_TYPE_ALL, numDevices, devices, null)
            return devices
        }

        private fun getDevice(platformId: Int, deviceId: Int) = getDevices(platformId)[deviceId]
                ?: throw ArrayIndexOutOfBoundsException("Couldn't find the specified platform or device")

        fun getDevicesMap(platformId: Int): Map<Int, String> {
            val devices = getDevices(platformId)
            val result = mutableMapOf<Int, String>()
            for (deviceId in devices.indices) {
                val deviceFromList = devices[deviceId]
                val size = LongArray(1)
                clGetDeviceInfo(deviceFromList, CL_DEVICE_NAME, 0, null, size)
                val buffer = ByteArray(size[0].toInt())
                clGetDeviceInfo(deviceFromList, CL_DEVICE_NAME, buffer.size.toLong(), Pointer.to(buffer), null)
                result[deviceId] = String(buffer, 0, buffer.size - 1)
            }
            return result
        }
    }

    private val platform: cl_platform_id = getPlatform(platformIndex)

    private val contextProperties: cl_context_properties = cl_context_properties()

    private val device: cl_device_id = getDevice(platformIndex, deviceIndex)

    private val context: cl_context = clCreateContext(contextProperties, 1, arrayOf(device), null, null, null)

    val commandQueue: cl_command_queue

    val program: cl_program

    init {
        setExceptionsEnabled(true)
        contextProperties.addProperty(CL_CONTEXT_PLATFORM.toLong(), platform)
        val properties = cl_queue_properties()
        commandQueue = clCreateCommandQueueWithProperties(context, device, properties, null)
        val sources = arrayOf(
            "Util",
            "InitialComparison",
            "NoiseReduction",
            "FlowKey",
            "Splash",
            "SplashPrep"
        ).map {
            this::class.java.getResource("$it.cl")!!.readText()
        }.toTypedArray()
        program = clCreateProgramWithSource(context, sources.size, sources, null, null)
        clBuildProgram(program, 0, null, null, null, null)
    }

    override fun getFilters(): Map<String, AbstractFilter> = mapOf(
        OpenClInitialComparisonFilter.listName to OpenClInitialComparisonFilter(api = this),
        OpenClNoiseReductionFilter.listName to OpenClNoiseReductionFilter(api = this),
        OpenClFlowKeyFilter.listName to OpenClFlowKeyFilter(api = this),
        OpenClSplashFilter.listName to OpenClSplashFilter(api = this),
    )

    override fun close() {
        clReleaseProgram(program)
        clReleaseCommandQueue(commandQueue)
        clReleaseContext(context)
    }

    fun allocMem(ptr: Pointer?, op: ClMemOperation, size: Int): cl_mem = clCreateBuffer(
        context,
        op.flags,
        size.toLong(),
        ptr,
        null
    )
}

这是一个使用 API 实例处理帧的示例“过滤器”。

class OpenClInitialComparisonFilter @Suppress("LongParameterList") constructor(
    private val api: OpenClApi,
    var colorKey: ByteArray = byteArrayOf(0, 255.toByte(), 0),
    var replacementKey: ByteArray = byteArrayOf(0, 255.toByte(), 0),
    var percentTolerance: Float = 0.025f,
    var colorSpace: ColorSpace = ColorSpace.ALL,
    var width: Int = DEFAULT_WIDTH_PIXELS,
    var height: Int = DEFAULT_HEIGHT_PIXELS
) : AbstractFilter{
    companion object : AbstractFilterConsts {
        override val listName = "Initial Comparison"

        private const val KERNEL_NAME = "initialComparisonKernel"
    }

    override fun getProperties(): Map<AbstractFilterProperty, Any> = mapOf(
        AbstractFilterProperty.TOLERANCE to percentTolerance,
        AbstractFilterProperty.COLOR_KEY to colorKey,
        AbstractFilterProperty.REPLACEMENT_KEY to replacementKey,
        AbstractFilterProperty.COLOR_SPACE to colorSpace
    )

    override fun setProperty(listName: String, newValue: Any) = when (listName) {
        AbstractFilterProperty.TOLERANCE.listName -> percentTolerance = newValue as Float
        AbstractFilterProperty.COLOR_KEY.listName -> colorKey = newValue as ByteArray
        AbstractFilterProperty.REPLACEMENT_KEY.listName -> replacementKey = newValue as ByteArray
        AbstractFilterProperty.COLOR_SPACE.listName -> colorSpace = newValue as ColorSpace
        else -> throw ArrayIndexOutOfBoundsException("Couldn't find property $listName")
    }

    @Suppress("LongMethod")
    override fun apply(inputBuffer: ByteArray): ByteArray {
        val outputBuffer = ByteArray(size = inputBuffer.size)
        val floatOptionsBuffer = floatArrayOf(percentTolerance)
        val intOptionsBuffer = intArrayOf(colorSpace.i, width, height)

        val inputPtr = Pointer.to(inputBuffer)
        val outputPtr = Pointer.to(outputBuffer)
        val colorKeyPtr = Pointer.to(colorKey)
        val replacementKeyPtr = Pointer.to(replacementKey)
        val floatOptionsPtr = Pointer.to(floatOptionsBuffer)
        val intOptionsPtr = Pointer.to(intOptionsBuffer)

        val inputMem = api.allocMem(inputPtr, ClMemOperation.READ, Sizeof.cl_char * inputBuffer.size)
        val outputMem = api.allocMem(null, ClMemOperation.WRITE, Sizeof.cl_char * outputBuffer.size)
        val colorKeyMem = api.allocMem(colorKeyPtr, ClMemOperation.READ, Sizeof.cl_char * colorKey.size)
        val replacementKeyMem = api.allocMem(
            replacementKeyPtr,
            ClMemOperation.READ,
            Sizeof.cl_char * replacementKey.size
        )
        val floatOptionsMem = api.allocMem(
            floatOptionsPtr,
            ClMemOperation.READ,
            Sizeof.cl_float * floatOptionsBuffer.size
        )
        val intOptionsMem = api.allocMem(intOptionsPtr, ClMemOperation.READ, Sizeof.cl_int * intOptionsBuffer.size)

        val kernel = clCreateKernel(api.program, KERNEL_NAME, null)
        var a = 0
        clSetKernelArg(kernel, a++, Sizeof.cl_mem.toLong(), Pointer.to(inputMem))
        clSetKernelArg(kernel, a++, Sizeof.cl_mem.toLong(), Pointer.to(outputMem))
        clSetKernelArg(kernel, a++, Sizeof.cl_mem.toLong(), Pointer.to(colorKeyMem))
        clSetKernelArg(kernel, a++, Sizeof.cl_mem.toLong(), Pointer.to(replacementKeyMem))
        clSetKernelArg(kernel, a++, Sizeof.cl_mem.toLong(), Pointer.to(floatOptionsMem))
        clSetKernelArg(kernel, a, Sizeof.cl_mem.toLong(), Pointer.to(intOptionsMem))
        val globalWorkSizeBuffer = api.localWorkSize?.let {
            longArrayOf(ceil(inputBuffer.size / it.toFloat()).toLong() * it)
        } ?: longArrayOf(inputBuffer.size.toLong())
        val localWorkSizeBuffer = api.localWorkSize?.let { longArrayOf(api.localWorkSize) }

        clEnqueueNDRangeKernel(
            api.commandQueue,
            kernel,
            1,
            null,
            globalWorkSizeBuffer,
            localWorkSizeBuffer,
            0,
            null,
            null
        )
        clEnqueueReadBuffer(
            api.commandQueue,
            outputMem,
            CL_TRUE,
            0,
            (inputBuffer.size * Sizeof.cl_char).toLong(),
            outputPtr,
            0,
            null,
            null
        )

        clReleaseMemObject(inputMem)
        clReleaseMemObject(outputMem)
        clReleaseMemObject(colorKeyMem)
        clReleaseMemObject(replacementKeyMem)
        clReleaseMemObject(floatOptionsMem)
        clReleaseMemObject(intOptionsMem)
        clReleaseKernel(kernel)

        return outputBuffer
    }
}

这里是 InitialComparison 内核的一个例子,它寻找并替换相似的像素。

enum ColorSpace {
    BLUE = 0,
    GREEN = 1,
    RED = 2,
    ALL = 3
};

enum FloatOptions {
    PERCENT_TOLERANCE = 0,
    GRADIENT_TOLERANCE = 1
};

enum IntOptions {
    COLOR_SPACE = 0,
    WIDTH = 1,
    HEIGHT = 2,
    BLOCK_SIZE = 3
};


float calcColorDiff(
    const char *a,
    const int i,
    const char *b,
    const int j,
    const int colorSpace
) {
    float colorDiff[3];
    for (int k = 0; k < 3; k++) {
        colorDiff[k] = abs(a[i + k] - b[j + k]);
    }
    if (colorSpace < 3) {
        return colorDiff[colorSpace] / 255.0;
    } else {
        float percentDiff = 0.0;
        for (int i = 0; i < 3; i++) {
            percentDiff += colorDiff[i] / 765.0;
        }
        return percentDiff;
    }
}

void writePixel(
    char *canvas,
    const int i,
    const char *ink,
    const int j
) {
    for (int k = 0; k < 3; k++) {
        canvas[i + k] = ink[j + k];
    }
}

__kernel void initialComparisonKernel(
    __global const char *input,
    __global char *output,
    __global const char *colorKey,
    __global const char *replacementKey,
    __global const float *floatOptions,
    __global const int *intOptions
) {
    float percentTolerance = floatOptions[PERCENT_TOLERANCE];
    int colorSpace = intOptions[COLOR_SPACE];
    int gid = get_global_id(0);

    if (gid % 3 == 0) {
        float percentDiff = calcColorDiff(input, gid, colorKey, 0, colorSpace);
        if (percentDiff < percentTolerance) {
            writePixel(output, gid, replacementKey, 0);
        } else {
            writePixel(output, gid, input, gid);
        }
    }
}

效果很好!比 运行 在 CPU 上运行它快得多,甚至使用 Java 的多线程 ExecutorService。除了比较之外,我还 运行 两个额外的过滤器:NoiseReduction,它删除了大部分没有被其他绿屏像素包围的绿屏像素,以及 FlowKey,它填充了绿屏像素之间的间隙。

int checkPixelEquality(
    const char *input, 
    const int i, 
    const char *colorKey
) {
    int diffSum = 0;
    for (int j = 0; j < 3; j++) {
        diffSum += abs(input[i + j] - colorKey[j]);
    }
    if (diffSum == 0) {
        return 1;
    } else {
        return 0;
    }
}

__kernel void noiseReductionKernel(
    __global const char *input,
    __global char *output,
    __global const char *template,
    __global const char *colorKey,
    __global const int *intOptions
) {
    int width = intOptions[WIDTH];
    int height = intOptions[HEIGHT];
    int gid = get_global_id(0);

    if (gid % 3 == 0) {
        int anchorEquality = checkPixelEquality(input, gid, colorKey);
        if (anchorEquality == 1) {
            int surroundingPixels = 0;
            if ((gid / 3) % width == 0) {
                surroundingPixels += 1;
            } else {
                surroundingPixels += checkPixelEquality(input, gid - 3, colorKey);
            }
            if ((gid / 3) % width == width - 1) {
                surroundingPixels += 1;
            } else {
                surroundingPixels += checkPixelEquality(input, gid + 3, colorKey);
            }
            if ((gid / 3) / width == 0) {
                surroundingPixels += 1;
            } else {
                surroundingPixels += checkPixelEquality(input, gid - (width * 3), colorKey);
            }
            if ((gid / 3) / width == height - 1) {
                surroundingPixels += 1;
            } else {
                surroundingPixels += checkPixelEquality(input, gid + (width * 3), colorKey);
            }
            if (surroundingPixels < 3) {
                writePixel(output, gid, template, gid);
            } else {
                writePixel(output, gid, colorKey, 0);
            }
        } else {
            writePixel(output, gid, template, gid);
        }
    }
}

__kernel void flowKeyKernel(
    __global const char *input,
    __global char *output,
    __global const char *template,
    __global const char *colorKey,
    __global const float *floatOptions,
    __global const int *intOptions
) {
    float gradientTolerance = floatOptions[GRADIENT_TOLERANCE];
    int colorSpace = intOptions[COLOR_SPACE];
    int width = intOptions[WIDTH];
    int height = intOptions[HEIGHT];
    int gid = get_global_id(0);

    if (gid % 3 == 0) {
        if (checkPixelEquality(input, gid, colorKey) == 0) {
            if (
                (gid / 3) % width != 0 &&
                checkPixelEquality(input, gid - 3, colorKey) == 1 &&
                calcColorDiff(input, gid, template, gid - 3, colorSpace) > gradientTolerance
            ) {
                writePixel(output, gid, colorKey, 0);
                return;
            }
            if (
                (gid / 3) % width != width - 1 &&
                checkPixelEquality(input, gid + 3, colorKey) == 1 &&
                calcColorDiff(input, gid, template, gid + 3, colorSpace) > gradientTolerance
            ) {
                writePixel(output, gid, colorKey, 0);
                return;
            }
            if (
                (gid / 3) / width != 0 &&
                checkPixelEquality(input, gid - (width * 3), colorKey) == 1 && 
                calcColorDiff(input, gid, template, gid - (width * 3), colorSpace) > gradientTolerance
            ) {
                writePixel(output, gid, colorKey, 0);
                return;
            }
            if (
                (gid / 3) / width != height - 1 &&
                checkPixelEquality(input, gid + (width * 3), colorKey) == 1 &&
                calcColorDiff(input, gid, template, gid + (width * 3), colorSpace) > gradientTolerance
            ) {
                writePixel(output, gid, colorKey, 0);
                return;
            }
            writePixel(output, gid, template, gid);
        } else {
            writePixel(output, gid, colorKey, 0);
        }
    } else {
        writePixel(output, gid, colorKey, 0);
    }
}

问题在于,与通过 clEnqueueNDRangeKernel 排队内核相比,这些内核的 运行 时间微不足道。这意味着 运行ning 所有内核的开销太大,导致帧延迟。每个过滤器必须一次运行一个,直到图像被完全处理。

我目前对 OpenCL 的理解是,在一个排队的内核中,每个工作组都会在没有任何特定顺序的情况下排队,并且没有任何 gua运行 总并发性。因为这些过滤器必须在整个图像中一次一个地应用,所以我能想到的唯一选择是将许多小内核排队。

我试过将所有内核聚合成一个大内核(下面的代码)。有两个问题:

  1. 工作组运行不考虑总并发,这意味着一行像素可以完成所有过滤器,而另一行像素根本没有运行。
  2. 当我为聚合内核实现锁定时,它会冻结,因为并非所有工作组都同时 运行ning。
__kernel void openClKernel(
    __global const char *input,
    __global char *output,
    __global const char *colorKey,
    __global const char *replacementKey,
    __global const float *floatOptions,
    __global const int *intOptions,
    __global char *tmpActive,
    __global char *tmpStale
) {
    float tolerance = floatOptions[TOLERANCE];
    float flowKeyTolerance = floatOptions[FLOW_KEY_TOLERANCE];
    int colorSpace = intOptions[COLOR_SPACE];
    int width = intOptions[WIDTH];
    int height = intOptions[HEIGHT];
    int initialNoiseReductionIterations = intOptions[INITIAL_NOISE_REDUCTION_ITERATIONS];
    int flowKeyIterations = intOptions[FLOW_KEY_ITERATIONS];
    int finalNoiseReductionIterations = intOptions[FINAL_NOISE_REDUCTION_ITERATIONS];
    int gid = get_global_id(0);

    if (gid % 3 == 0) {
        applyInitialComparison(input, tmpActive, colorKey, replacementKey, tolerance, colorSpace, gid);
        writePixel(tmpStale, gid, tmpActive, gid);
        for (int i = 0; i < initialNoiseReductionIterations; i++) {
            applyNoiseReduction(tmpStale, tmpActive, input, replacementKey, width, height, gid);
            writePixel(tmpStale, gid, tmpActive, gid);
        }
        for (int i = 0; i < flowKeyIterations; i++) {
            applyFlowKey(tmpStale, tmpActive, input, replacementKey, flowKeyTolerance, colorSpace, width, height, gid);
            writePixel(tmpStale, gid, tmpActive, gid);
        }
        for (int i = 0; i < finalNoiseReductionIterations; i++) {
            applyNoiseReduction(tmpStale, tmpActive, input, replacementKey, width, height, gid);
            writePixel(tmpStale, gid, tmpActive, gid);
        }
        writePixel(output, gid, tmpStale, gid);
    }
}

也就是说,聚合内核 运行 比拆分内核快 1,000 倍(我实现了一个小的帧延迟计数器)。这向我表明,与手头的任务相比,排队内核的开销太大了。

我可以做些什么来优化这个程序?有没有办法有效地排队许多小内核?有没有办法同时将内核重组为 运行 ?如果需要,还请告诉我如何提高问题的质量。

谢谢!

每次内核启动都有固定的开销,比方说 1 毫秒。开销部分源于指令的加载,但主要源于每个内核末尾所有线程的同步。因此,如果您启动许多每个执行时间为 1 毫秒的小内核,则总时间的一半将作为开销损失​​。如果将许多小内核聚合成一个 运行s 9ms,那么开销仅为 10%。

因此,在数据移动允许的情况下,将尽可能多的小内核聚合为一个,而不会 运行进入竞争条件。还要确保每个内核的范围尽可能大。

或者,您可以并行使用多个队列。范围小的内核不会使 GPU 饱和,因此 GPU 的一部分随时处于空闲状态。如果您有多个并发队列,这些队列中的内核可以 运行 同时并发并一起使硬件饱和。然而,你仍然有间接损失。