Kotlin 作为协程之间的消息队列流动

Kotlin flows as a message queue between coroutines

我正在尝试使用 Kotlin 的 Flow class 作为消息队列,将数据从生产者(相机)传输到一组工作人员(图像分析器)运行在单独的协程上运行。

在我的案例中,制片人是一台相机,运行 比工人快得多。应通过丢弃数据来处理背压,以便图像分析器始终对来自相机的最新图像进行操作。

当使用通道时,此解决方案有效,但看起来很乱,并且没有为我提供在相机和分析仪之间转换数据的简单方法(如 flow.map)。

class ImageAnalyzer<Result> {
    fun analyze(image: Bitmap): Result {
        // perform some work on the image and return a Result. This can take a long time.
    }
}

class CameraAdapter {

    private val imageChannel = Channel<Bitmap>(capacity = Channel.RENDEZVOUS)
    private val imageReceiveMutex = Mutex()

    // additional code to make this camera work and listen to lifecycle events of the enclosing activity.

    protected fun sendImageToStream(image: CameraOutput) {
        // use channel.offer to ensure the latest images are processed
        runBlocking { imageChannel.offer(image) }
    }

    @OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
    fun onDestroy() {
        runBlocking { imageChannel.close() }
    }

    /**
     * Get the stream of images from the camera.
     */
    fun getImageStream(): ReceiveChannel<Bitmap> = imageChannel
}

class ImageProcessor<Result>(workers: List<ImageAnalyzer<Result>>) {
    private val analysisResults = Channel<Result>(capacity = Channel.RENDEZVOUS)
    private val cancelMutex = Mutex()

   var finished = false // this can be set elsewhere when enough images have been analyzed

    fun subscribeTo(channel: ReceiveChannel<Bitmap>, processingCoroutineScope: CoroutineScope) {
        // omit some checks to make sure this is not already subscribed

        processingCoroutineScope.launch {
            val workerScope = this
            workers.forEachIndexed { index, worker ->
                launch(Dispatchers.Default) {
                    startWorker(channel, workerScope, index, worker)
                }
            }
        }
    }

    private suspend fun startWorker(
        channel: ReceiveChannel<Bitmap>,
        workerScope: CoroutineScope,
        workerId: Int,
        worker: ImageAnalyzer
    ) {
        for (bitmap in channel) {
            analysisResults.send(worker.analyze(bitmap))

            cancelMutex.withLock {
                if (finished && workerScope.isActive) {
                    workerScope.cancel()
                }
            }
        }
    }
}

class ExampleApplication : CoroutineScope {
    private val cameraAdapter: CameraAdapter = ...
    private val imageProcessor: ImageProcessor<Result> = ...

    fun analyzeCameraStream() {
        imageProcessor.subscribeTo(cameraAdapter.getImageStream())
    }
}

执行此操作的正确方法是什么?我想使用 ChannelFlow 而不是 Channel 在相机和 ImageProcessor 之间传递数据。这将允许我在将图像发送到分析器之前调用 flow.map 将元数据添加到图像中。但是,这样做时,每个 ImageAnalyzer 都会获得同一图像的副本,而不是并行处理不同的图像。是否可以将 Flow 用作消息队列而不是广播器?

我用流处理了这个!在整个序列中保持由通道支持的流很重要,这样每个工作人员都可以选择独特的图像进行操作。我已经通过单元测试确认了此功能。

这是我为后代更新的代码:

class ImageAnalyzer<Result> {
    fun analyze(image: Bitmap): Result {
        // perform some work on the image and return a Result. This can take a long time.
    }
}

class CameraAdapter {

    private val imageStream = Channel<Bitmap>(capacity = Channel.RENDEZVOUS)
    private val imageReceiveMutex = Mutex()

    // additional code to make this camera work and listen to lifecycle events of the enclosing activity.

    protected fun sendImageToStream(image: CameraOutput) {
        // use channel.offer to enforce the drop back pressure strategy
        runBlocking { imageChannel.offer(image) }
    }

    @OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
    fun onDestroy() {
        runBlocking { imageChannel.close() }
    }

    /**
     * Get the stream of images from the camera.
     */
    fun getImageStream(): Flow<Bitmap> = imageChannel.receiveAsFlow()
}

class ImageProcessor<Result>(workers: List<ImageAnalyzer<Result>>) {
    private val analysisResults = Channel<Result>(capacity = Channel.RENDEZVOUS)
    private val cancelMutex = Mutex()

   var finished = false // this can be set elsewhere when enough images have been analyzed

    fun subscribeTo(flow: Flow<Bitmap>, processingCoroutineScope: CoroutineScope): Job {
        // omit some checks to make sure this is not already subscribed

        return processingCoroutineScope.launch {
            val workerScope = this
            workers.forEachIndexed { index, worker ->
                launch(Dispatchers.Default) {
                    startWorker(flow, workerScope, index, worker)
                }
            }
        }
    }

    private suspend fun startWorker(
        flow: Flow<Bitmap>,
        workerScope: CoroutineScope,
        workerId: Int,
        worker: ImageAnalyzer
    ) {
        while (workerScope.isActive) {
            flow.collect { bitmap ->
                analysisResults.send(worker.analyze(bitmap))

                cancelMutex.withLock {
                    if (finished && workerScope.isActive) {
                        workerScope.cancel()
                    }
                }
            }
        }
    }

    fun getAnalysisResults(): Flow<Result> = analysisResults.receiveAsFlow()
}

class ExampleApplication : CoroutineScope {
    private val cameraAdapter: CameraAdapter = ...
    private val imageProcessor: ImageProcessor<Result> = ...

    fun analyzeCameraStream() {
        imageProcessor.subscribeTo(cameraAdapter.getImageStream())
    }
}

看来,只要流量有渠道支持,每个订阅者都会得到一张独一无二的图片。