Kotlin:使用通道做一个简单的工作计数器

Kotlin: Using channel to make a simple work counter

我想制作一个线程安全的计数器,用于我的一些服务来计算并发工作。

比如说,有一个可以处理多个请求的 http 服务,并且有一个 属性 isWorking 稍后用于显示微调器。

这是我的计数器实现:

class CounterImpl @Inject constructor(
    @IoDispatcher private val ioDispatcher: CoroutineDispatcher,
    @MainDispatcher private val mainDispatcher: CoroutineDispatcher,
    private val log: Log
) : Counter {
    private val channel = Channel<Int>()
    private val _isWorking = MutableLiveData(false)
    override val isWorking = _isWorking.asFlow()

    init {
        MainScope().launch {
            channel.consumeAsFlow().onEach {
                log(FILE, "counter got a $it", LogType.Debug)
            }.scan(0) { acc, value -> acc + value }.map { it > 0}
                .collect {
                    withContext(mainDispatcher) {
                        _isWorking.value = it
                    }
                }
        }
    }

    override suspend fun invoke(increment: Boolean) {
        log(FILE, "counter invoked with $increment", LogType.Debug)
        channel.send(if (increment) 1 else -1)
    }
}

所以问题是有时对通道的最后一次发送调用没有到达代码的 consumeAsFlow 部分。

以下是所发生情况的示例日志:

[Debug] Counter: counter invoked with true
[Debug] Counter: counter invoked with false
[Debug] Counter: counter got a 1

在这里,使用 true 调用一次 invoke 并且有一行表示 counter got a 1 对应于该真实(增量)调用。但是还有一个 false 的调用,我希望有一个相应的 counter got a 0 行。但是那个从来没有出现过。

我也试过用 for c in channel 迭代频道,如果这是你的想法。

您不需要 Channel 来执行此操作,MutableStateFlow 就足够了,因为您正在执行的操作(incrementing/decrementing 一个数字)没有副作用。

    val count = MutableStateFlow(0)

    fun update(increment: Boolean) {
        count.update { it + if (increment) 1 else -1 }
    }

注意:update {} lambda 中的函数可能会执行多次,因此它不能有副作用。