如何暂停科特林协程直到收到通知
How to suspend kotlin coroutine until notified
我想暂停一个 kotlin 协程,直到从外部调用一个方法,就像旧的 Java object.wait() 和 object.notify() 方法一样。我该怎么做?
此处:Correctly implementing wait and notify in Kotlin is an answer how to implement this with Kotlin threads (blocking). And here: 是如何使用 CompleteableDeferreds 执行此操作的答案,但我不想每次都必须创建一个新的 CompleteableDeferred 实例。
我目前正在这样做:
var nextIndex = 0
fun handleNext(): Boolean {
if (nextIndex < apps.size) {
//Do the actual work on apps[nextIndex]
nextIndex++
}
//only execute again if nextIndex is a valid index
return nextIndex < apps.size
}
handleNext()
// The returned function will be called multiple times, which I would like to replace with something like notify()
return ::handleNext
Channels 可用于此(尽管它们更通用):
When capacity is 0 – it creates RendezvousChannel. This channel does not have any buffer at all. An element is transferred from sender to receiver only when send and receive invocations meet in time (rendezvous), so send suspends until another coroutine invokes receive and receive suspends until another coroutine invokes send.
所以创造
val channel = Channel<Unit>(0)
并为 object.notify()
使用 channel.receive()
for object.wait()
, and channel.offer(Unit)
(如果您想等到另一个协程 receive
,则使用 send
)。
对于notifyAll
,您可以使用BroadcastChannel
代替。
你当然可以很容易封装它:
inline class Waiter(private val channel: Channel<Unit> = Channel<Unit>(0)) {
suspend fun doWait() { channel.receive() }
fun doNotify() { channel.offer(Unit) }
}
可以为此使用基本的 suspendCoroutine{..}
函数,例如
class SuspendWait() {
private lateinit var myCont: Continuation<Unit>
suspend fun sleepAndWait() = suspendCoroutine<Unit>{ cont ->
myCont = cont
}
fun resume() {
val cont = myCont
myCont = null
cont.resume(Unit)
}
}
很明显,代码有问题,例如myCont
字段未同步,预计 sleepAndWait
在 resume
之前调用等等,希望现在思路清晰。
kotlinx.coroutines 库中的 Mutex
class 有另一个解决方案。
class SuspendWait2 {
private val mutex = Mutex(locaked = true)
suspend fun sleepAndWait() = mutex.withLock{}
fun resume() {
mutex.unlock()
}
}
我建议为此使用 CompletableJob
。
我的用例:
suspend fun onLoad() {
var job1: CompletableJob? = Job()
var job2: CompletableJob? = Job()
lifecycleScope.launch {
someList.collect {
doSomething(it)
job1?.complete()
}
}
lifecycleScope.launch {
otherList.collect {
doSomethingElse(it)
job2?.complete()
}
}
joinAll(job1!!, job2!!) // suspends until both jobs are done
job1 = null
job2 = null
// Do something one time
}
我想暂停一个 kotlin 协程,直到从外部调用一个方法,就像旧的 Java object.wait() 和 object.notify() 方法一样。我该怎么做?
此处:Correctly implementing wait and notify in Kotlin is an answer how to implement this with Kotlin threads (blocking). And here:
我目前正在这样做:
var nextIndex = 0
fun handleNext(): Boolean {
if (nextIndex < apps.size) {
//Do the actual work on apps[nextIndex]
nextIndex++
}
//only execute again if nextIndex is a valid index
return nextIndex < apps.size
}
handleNext()
// The returned function will be called multiple times, which I would like to replace with something like notify()
return ::handleNext
Channels 可用于此(尽管它们更通用):
When capacity is 0 – it creates RendezvousChannel. This channel does not have any buffer at all. An element is transferred from sender to receiver only when send and receive invocations meet in time (rendezvous), so send suspends until another coroutine invokes receive and receive suspends until another coroutine invokes send.
所以创造
val channel = Channel<Unit>(0)
并为 object.notify()
使用 channel.receive()
for object.wait()
, and channel.offer(Unit)
(如果您想等到另一个协程 receive
,则使用 send
)。
对于notifyAll
,您可以使用BroadcastChannel
代替。
你当然可以很容易封装它:
inline class Waiter(private val channel: Channel<Unit> = Channel<Unit>(0)) {
suspend fun doWait() { channel.receive() }
fun doNotify() { channel.offer(Unit) }
}
可以为此使用基本的 suspendCoroutine{..}
函数,例如
class SuspendWait() {
private lateinit var myCont: Continuation<Unit>
suspend fun sleepAndWait() = suspendCoroutine<Unit>{ cont ->
myCont = cont
}
fun resume() {
val cont = myCont
myCont = null
cont.resume(Unit)
}
}
很明显,代码有问题,例如myCont
字段未同步,预计 sleepAndWait
在 resume
之前调用等等,希望现在思路清晰。
kotlinx.coroutines 库中的 Mutex
class 有另一个解决方案。
class SuspendWait2 {
private val mutex = Mutex(locaked = true)
suspend fun sleepAndWait() = mutex.withLock{}
fun resume() {
mutex.unlock()
}
}
我建议为此使用 CompletableJob
。
我的用例:
suspend fun onLoad() {
var job1: CompletableJob? = Job()
var job2: CompletableJob? = Job()
lifecycleScope.launch {
someList.collect {
doSomething(it)
job1?.complete()
}
}
lifecycleScope.launch {
otherList.collect {
doSomethingElse(it)
job2?.complete()
}
}
joinAll(job1!!, job2!!) // suspends until both jobs are done
job1 = null
job2 = null
// Do something one time
}