如何使用 Mutex 同步访问 Kotlin Coroutine 中异步函数的结果?

How to gracefully access the results of asynchronous functions in Kotlin Coroutine with Mutex synchronously?

我想在IO线程中处理一些数据,需要在处理完成后访问最终数据。这是我的 Process 示例代码:

private class Process {
    private val scope = CoroutineScope(Dispatchers.IO)
    private val value: AtomicInteger = AtomicInteger(0)
    private val mutex = Mutex()

    /**
     * Do background task to change value.
     */
    fun doIt() {
        scope.launch {
            println("SCOPE - start at ${System.currentTimeMillis()}")
            mutex.withLock {
                println("Process - doIt start delay ${System.currentTimeMillis()}")
                delay(10)
                value.incrementAndGet()
                println("Process - value increase to $value")
            }
            println("SCOPE - end at ${System.currentTimeMillis()}")
        }
    }

    /**
     * Get value sync.
     */
    fun getValue(): Int {
        return runBlocking(scope.coroutineContext) {
            mutex.withLock {
                println("Process - getValue ${System.currentTimeMillis()}")
                value.get()
            }
        }
    }

    /**
     * Reset value.
     */
    fun reset() {
        runBlocking(scope.coroutineContext) {
            mutex.withLock {
                value.set(0)
                println("Process - value reset to ${value.get()}")
            }
        }
    }
}

Process用法:

fun main() {
    val process = Process()
    repeat(100) {
        println("-----------------------------")
        println("Start - ${System.currentTimeMillis()}")
        process.reset()
        process.doIt()
        val result = run {
            val count = process.getValue()
            if (count == 1) {
                println("Count - $count ${System.currentTimeMillis()}")
                true
            } else {
                println("Mutex failed in $it with $count")
                false
            }
        }
        if (!result) return@main
    }
}

我的预期结果日志是 100 个这样的片段:

-----------------------------
Start - 1645717971970
Process - value reset to 0
SCOPE - start at 1645717972011
Process - doIt start delay 1645717972011
Process - value increase to 1
SCOPE - end at 1645717972034
Process - getValue 1645717972034
Count - 1 1645717972035

但是测试总是在进行到第三个或第四个循环时中断:

-----------------------------
Start - 1645717971970
Process - value reset to 0
SCOPE - start at 1645717972011
Process - doIt start delay 1645717972011
Process - value increase to 1
SCOPE - end at 1645717972034
Process - getValue 1645717972034
Count - 1 1645717972035
-----------------------------
Start - 1645717972035
Process - value reset to 0
SCOPE - start at 1645717972036
Process - doIt start delay 1645717972036
Process - value increase to 1
SCOPE - end at 1645717972049
Process - getValue 1645717972049
Count - 1 1645717972050
-----------------------------
Start - 1645717972050
Process - value reset to 0
SCOPE - start at 1645717972050
Process - getValue 1645717972051
Process - doIt start delay 1645717972051
Mutex failed in 2 with 0

谁能告诉我在这种情况下应该如何同步才能得到已经开始执行的异步代码的结果?

您是否考虑过使用 Deferred 结果代替 AtomicInteger

var result: Deferred<Int> = CompletableDeferred(0)

fun doIt() {
    result = async { /* get new value */ }
}

fun getValue() = runBlocking { result.await() }

fun reset() {
    result = CompletableDeferred(0)
}

如果您需要在每次调用 doIt 时处理上一个作业的取消,您可能需要引入更多的复杂性。