暂停协程直到条件为真
Suspend coroutine until condition is true
我有一个用例,我需要连接和断开作为服务的 class。只有在连接服务时才能对服务执行操作。当服务通过回调连接或断开连接时,客户端会收到通知:
class Service {
constructor(callback: ConnectionCallback) { ... }
fun connect() {
// Call callback.onConnected() some time after this method returns.
}
fun disconnect() {
// Call callback.onConnectionSuspended() some time after this method returns.
}
fun isConnected(): Boolean { ... }
fun performAction(actionName: String, callback: ActionCallback) {
// Perform a given action on the service, failing with a fatal exception if called when the service is not connected.
}
interface ConnectionCallback {
fun onConnected() // May be called multiple times
fun onConnectionSuspended() // May be called multiple times
fun onConnectionFailed()
}
}
我想使用 Kotlin 协程为 Service
class(我无法控制)编写一个包装器。
这是 ServiceWrapper
的骨架:
class ServiceWrapper {
private val service = Service(object : ConnectionCallback { ... })
fun connect() {
service.connect()
}
fun disconnect() {
service.disconnect()
}
suspend fun performActionWhenConnected(actionName: String): ActionResult {
suspendUntilConnected()
return suspendCoroutine { continuation ->
service.performAction(actionName, object : ActionCallback() {
override fun onSuccess(result: ActionResult) {
continuation.resume(result)
}
override fun onError() {
continuation.resumeWithException(RuntimeException())
}
}
}
}
}
如何使用协程实现此 suspendUntilConnected()
行为?提前致谢。
您处于暂停状态,为什么不这样:
while (!service.isConnected()) {
delay(1000)
}
您可以在此语句中加入额外的超时条件。
以下是实现方法:
class ServiceWrapper {
@Volatile
private var deferredUntilConnected = CompletableDeferred<Unit>()
private val service = Service(object : ConnectionCallback {
override fun onConnected() {
deferredUntilConnected.complete(Unit)
}
override fun onConnectionSuspended() {
deferredUntilConnected = CompletableDeferred()
}
})
private suspend fun suspendUntilConnected() = deferredUntilConnected.await()
...
}
一般注意事项:仅仅因为服务在某个时间点连接并不能保证它在您使用它时仍会连接。
suspendCoroutine
暂停当前 运行 协程,因此不需要 pre-suspension.
Obtains the current continuation instance inside suspend functions and suspends the currently running coroutine.
- 另一方面,我建议使用
async
和 return 和 Deferred
。暂停并不是 ServiceWrapper
的工作。调用者应该控制何时需要结果,因此,如果需要,何时暂停。
class ServiceWrapper {
...
fun performAction(actionName: String): Deferred<ActionResult> = coroutineScope.async {
suspendCoroutine { continuation ->
service.performAction(actionName, object : ActionCallback() {
override fun onSuccess(result: ActionResult) {
continuation.resume(result)
}
override fun onError() {
continuation.resumeWithException(RuntimeException())
}
}
}
}
}
- 如果由于某种原因,你必须在
suspendCoroutine
之前暂停,我建议使用 CompletableJob
。 Deferred
应该 return 结果。从我的角度来看,使用 Deferred<Unit>
似乎是 anti-pattern。
StateFlow
的另一种方法
- 先定义一些“状态”
enum class ServiceState {
CONNECTED, SUSPENDED, FAILED
}
- 将基于回调的代码转换为流
val connectionState = MutableStateFlow(ServiceState.FAILED)
private val service = Service(object : ConnectionCallback {
override fun onConnected() {
connectionState.value = ServiceState.CONNECTED
}
override fun onConnectionSuspended() {
connectionState.value = ServiceState.SUSPENDED
}
override fun onConnectionFailed() {
connectionState.value = ServiceState.FAILED
}
})
- 复制并粘贴这些实用程序代码
class ConditionalAwait<T>(
private val stateFlow: StateFlow<T>,
private val condition: (T) -> Boolean
) {
suspend fun await(): T {
val nowValue = stateFlow.value
return if (condition(nowValue)) {
nowValue
} else {
stateFlow.first { condition(it) }
}
}
}
suspend fun <T> StateFlow<T>.conditionalAwait(condition: (T) -> Boolean): T =
ConditionalAwait(this, condition).await()
- 使用它~
suspend fun performActionWhenConnected() {
connectionState.conditionalAwait { it == ServiceState.CONNECTED }
// other actions when service is connected
}
- 高级用法
suspend fun performActionWhenConnected() {
val state = connectionState.conditionalAwait {
it == ServiceState.CONNECTED || it == ServiceState.FAILED
} // keep suspended when Service.SUSPENDED
if (state is ServiceState.CONNECTED) {
// other actions when service is connected
} else {
// error handling
}
}
我有一个用例,我需要连接和断开作为服务的 class。只有在连接服务时才能对服务执行操作。当服务通过回调连接或断开连接时,客户端会收到通知:
class Service {
constructor(callback: ConnectionCallback) { ... }
fun connect() {
// Call callback.onConnected() some time after this method returns.
}
fun disconnect() {
// Call callback.onConnectionSuspended() some time after this method returns.
}
fun isConnected(): Boolean { ... }
fun performAction(actionName: String, callback: ActionCallback) {
// Perform a given action on the service, failing with a fatal exception if called when the service is not connected.
}
interface ConnectionCallback {
fun onConnected() // May be called multiple times
fun onConnectionSuspended() // May be called multiple times
fun onConnectionFailed()
}
}
我想使用 Kotlin 协程为 Service
class(我无法控制)编写一个包装器。
这是 ServiceWrapper
的骨架:
class ServiceWrapper {
private val service = Service(object : ConnectionCallback { ... })
fun connect() {
service.connect()
}
fun disconnect() {
service.disconnect()
}
suspend fun performActionWhenConnected(actionName: String): ActionResult {
suspendUntilConnected()
return suspendCoroutine { continuation ->
service.performAction(actionName, object : ActionCallback() {
override fun onSuccess(result: ActionResult) {
continuation.resume(result)
}
override fun onError() {
continuation.resumeWithException(RuntimeException())
}
}
}
}
}
如何使用协程实现此 suspendUntilConnected()
行为?提前致谢。
您处于暂停状态,为什么不这样:
while (!service.isConnected()) {
delay(1000)
}
您可以在此语句中加入额外的超时条件。
以下是实现方法:
class ServiceWrapper {
@Volatile
private var deferredUntilConnected = CompletableDeferred<Unit>()
private val service = Service(object : ConnectionCallback {
override fun onConnected() {
deferredUntilConnected.complete(Unit)
}
override fun onConnectionSuspended() {
deferredUntilConnected = CompletableDeferred()
}
})
private suspend fun suspendUntilConnected() = deferredUntilConnected.await()
...
}
一般注意事项:仅仅因为服务在某个时间点连接并不能保证它在您使用它时仍会连接。
suspendCoroutine
暂停当前 运行 协程,因此不需要 pre-suspension.
Obtains the current continuation instance inside suspend functions and suspends the currently running coroutine.
- 另一方面,我建议使用
async
和 return 和Deferred
。暂停并不是ServiceWrapper
的工作。调用者应该控制何时需要结果,因此,如果需要,何时暂停。
class ServiceWrapper {
...
fun performAction(actionName: String): Deferred<ActionResult> = coroutineScope.async {
suspendCoroutine { continuation ->
service.performAction(actionName, object : ActionCallback() {
override fun onSuccess(result: ActionResult) {
continuation.resume(result)
}
override fun onError() {
continuation.resumeWithException(RuntimeException())
}
}
}
}
}
- 如果由于某种原因,你必须在
suspendCoroutine
之前暂停,我建议使用CompletableJob
。Deferred
应该 return 结果。从我的角度来看,使用Deferred<Unit>
似乎是 anti-pattern。
StateFlow
- 先定义一些“状态”
enum class ServiceState {
CONNECTED, SUSPENDED, FAILED
}
- 将基于回调的代码转换为流
val connectionState = MutableStateFlow(ServiceState.FAILED)
private val service = Service(object : ConnectionCallback {
override fun onConnected() {
connectionState.value = ServiceState.CONNECTED
}
override fun onConnectionSuspended() {
connectionState.value = ServiceState.SUSPENDED
}
override fun onConnectionFailed() {
connectionState.value = ServiceState.FAILED
}
})
- 复制并粘贴这些实用程序代码
class ConditionalAwait<T>(
private val stateFlow: StateFlow<T>,
private val condition: (T) -> Boolean
) {
suspend fun await(): T {
val nowValue = stateFlow.value
return if (condition(nowValue)) {
nowValue
} else {
stateFlow.first { condition(it) }
}
}
}
suspend fun <T> StateFlow<T>.conditionalAwait(condition: (T) -> Boolean): T =
ConditionalAwait(this, condition).await()
- 使用它~
suspend fun performActionWhenConnected() {
connectionState.conditionalAwait { it == ServiceState.CONNECTED }
// other actions when service is connected
}
- 高级用法
suspend fun performActionWhenConnected() {
val state = connectionState.conditionalAwait {
it == ServiceState.CONNECTED || it == ServiceState.FAILED
} // keep suspended when Service.SUSPENDED
if (state is ServiceState.CONNECTED) {
// other actions when service is connected
} else {
// error handling
}
}