异常后如何恢复流程
How to resume flow after exception
我有以下代码:
val channel = BroadcastChannel<Event>(10)
fun setup() {
scope.launch {
channel.asFlow().
.flatMapLatest { fetchSomeData() }
.catch { emit(DefaultData()) }
.onEach { handleData() }
.collect()
}
}
fun load() {
channel.offer(Event.Load)
}
如果 fetchSomeData
失败并出现异常,它将被 catch
捕获并传递一些默认数据。问题是流程本身被取消并从频道的订阅者中删除。这意味着提供给频道的任何新事件都将被忽略,因为不再有任何订阅者。
有没有办法确保流程在出现异常时不会被取消?
我遇到了同样的问题。我的解决方法是这样的:
/* Custom onEach extension function */
fun <T> Flow<T>.onEachCatching(block: suspend (T) -> Unit) = OnEachCatching(this, block)
class OnEachCatching<T>(private val src: Flow<T>, private val block: suspend (T) -> Unit, bufferCapacity: Int = Channel.CONFLATED) {
private val okValue = Channel<T>(bufferCapacity)
private var failBlock: (suspend (Throwable) -> Unit)? = null
init {
GlobalScope.launch {
src.collect { value ->
runCatching { block(value) }
.onFailure { failBlock?.invoke(it) }
.onSuccess { okValue.send(value) }
}
okValue.close()
}
}
fun onFailure(block: suspend (Throwable) -> Unit) = this.also {
failBlock = block
}
fun resumeFlow() = okValue.consumeAsFlow()
}
用法:
someData
.onEachCatching { handleData() }
.onFailure { emit(DefaultData()) }
.resumeFlow()
.collect()
您应该捕获 fetchSomeData() 的异常,因此将 catch
从主流程移至 fetchSomeData():
scope.launch {
channel.asFlow().
.flatMapLatest { fetchSomeData().catch { emit(DefaultData()} }
.onEach { handleData() }
.collect()
}
我的设计是简单地重启收集流程
private fun startParsingMessages() {
coroutineScope?.launch {
sessionController.subscribeToMessages()
.onEach { /*code block*/ }
.catch {
it.cause
?.let { error -> Timber.e(error) }
?: Timber.e("startSession(): ${it.message}")
startParsingMessages() //here
}
.collect()
}
}
我有以下代码:
val channel = BroadcastChannel<Event>(10)
fun setup() {
scope.launch {
channel.asFlow().
.flatMapLatest { fetchSomeData() }
.catch { emit(DefaultData()) }
.onEach { handleData() }
.collect()
}
}
fun load() {
channel.offer(Event.Load)
}
如果 fetchSomeData
失败并出现异常,它将被 catch
捕获并传递一些默认数据。问题是流程本身被取消并从频道的订阅者中删除。这意味着提供给频道的任何新事件都将被忽略,因为不再有任何订阅者。
有没有办法确保流程在出现异常时不会被取消?
我遇到了同样的问题。我的解决方法是这样的:
/* Custom onEach extension function */
fun <T> Flow<T>.onEachCatching(block: suspend (T) -> Unit) = OnEachCatching(this, block)
class OnEachCatching<T>(private val src: Flow<T>, private val block: suspend (T) -> Unit, bufferCapacity: Int = Channel.CONFLATED) {
private val okValue = Channel<T>(bufferCapacity)
private var failBlock: (suspend (Throwable) -> Unit)? = null
init {
GlobalScope.launch {
src.collect { value ->
runCatching { block(value) }
.onFailure { failBlock?.invoke(it) }
.onSuccess { okValue.send(value) }
}
okValue.close()
}
}
fun onFailure(block: suspend (Throwable) -> Unit) = this.also {
failBlock = block
}
fun resumeFlow() = okValue.consumeAsFlow()
}
用法:
someData
.onEachCatching { handleData() }
.onFailure { emit(DefaultData()) }
.resumeFlow()
.collect()
您应该捕获 fetchSomeData() 的异常,因此将 catch
从主流程移至 fetchSomeData():
scope.launch {
channel.asFlow().
.flatMapLatest { fetchSomeData().catch { emit(DefaultData()} }
.onEach { handleData() }
.collect()
}
我的设计是简单地重启收集流程
private fun startParsingMessages() {
coroutineScope?.launch {
sessionController.subscribeToMessages()
.onEach { /*code block*/ }
.catch {
it.cause
?.let { error -> Timber.e(error) }
?: Timber.e("startSession(): ${it.message}")
startParsingMessages() //here
}
.collect()
}
}