如何暂停科特林协程直到收到通知

How to suspend kotlin coroutine until notified

我想暂停一个 kotlin 协程,直到从外部调用一个方法,就像旧的 Java object.wait() 和 object.notify() 方法一样。我该怎么做?

此处:Correctly implementing wait and notify in Kotlin is an answer how to implement this with Kotlin threads (blocking). And here: 是如何使用 CompleteableDeferreds 执行此操作的答案,但我不想每次都必须创建一个新的 CompleteableDeferred 实例。

我目前正在这样做:

    var nextIndex = 0

    fun handleNext(): Boolean {
        if (nextIndex < apps.size) {
            //Do the actual work on apps[nextIndex]
            nextIndex++
        }
        //only execute again if nextIndex is a valid index
        return nextIndex < apps.size
    }

    handleNext()

    // The returned function will be called multiple times, which I would like to replace with something like notify()
    return ::handleNext

发件人:https://gitlab.com/SuperFreezZ/SuperFreezZ/blob/master/src/superfreeze/tool/android/backend/Freezer.kt#L69

Channels 可用于此(尽管它们更通用):

When capacity is 0 – it creates RendezvousChannel. This channel does not have any buffer at all. An element is transferred from sender to receiver only when send and receive invocations meet in time (rendezvous), so send suspends until another coroutine invokes receive and receive suspends until another coroutine invokes send.

所以创造

val channel = Channel<Unit>(0)

并为 object.notify() 使用 channel.receive() for object.wait(), and channel.offer(Unit)(如果您想等到另一个协程 receive,则使用 send)。

对于notifyAll,您可以使用BroadcastChannel代替。

你当然可以很容易封装它:

inline class Waiter(private val channel: Channel<Unit> = Channel<Unit>(0)) {

    suspend fun doWait() { channel.receive() }
    fun doNotify() { channel.offer(Unit) }
}

可以为此使用基本的 suspendCoroutine{..} 函数,例如

class SuspendWait() {
  private lateinit var myCont: Continuation<Unit>
  suspend fun sleepAndWait() = suspendCoroutine<Unit>{ cont ->
    myCont = cont
  }

  fun resume() {
    val cont = myCont
    myCont = null
    cont.resume(Unit)
  }
}

很明显,代码有问题,例如myCont 字段未同步,预计 sleepAndWaitresume 之前调用等等,希望现在思路清晰。

kotlinx.coroutines 库中的 Mutex class 有另一个解决方案。

class SuspendWait2 {
  private val mutex = Mutex(locaked = true)
  suspend fun sleepAndWait() = mutex.withLock{}
  fun resume() {
    mutex.unlock()
  }
}

我建议为此使用 CompletableJob

我的用例:

suspend fun onLoad() {
    var job1: CompletableJob? = Job()
    var job2: CompletableJob? = Job()

    lifecycleScope.launch {
        someList.collect {
            doSomething(it)
            job1?.complete()
        }
    }

    lifecycleScope.launch {
        otherList.collect {
            doSomethingElse(it)
            job2?.complete()
        }
    }

    joinAll(job1!!, job2!!) // suspends until both jobs are done

    job1 = null
    job2 = null

    // Do something one time
}