异常后如何恢复流程

How to resume flow after exception

我有以下代码:

val channel = BroadcastChannel<Event>(10)

fun setup() {
    scope.launch {
        channel.asFlow().
            .flatMapLatest { fetchSomeData() }
            .catch { emit(DefaultData()) }
            .onEach { handleData() }
            .collect()

    }
}

fun load() {
    channel.offer(Event.Load)      
}

如果 fetchSomeData 失败并出现异常,它将被 catch 捕获并传递一些默认数据。问题是流程本身被取消并从频道的订阅者中删除。这意味着提供给频道的任何新事件都将被忽略,因为不再有任何订阅者。

有没有办法确保流程在出现异常时不会被取消?

我遇到了同样的问题。我的解决方法是这样的:

/* Custom onEach extension function */
fun <T> Flow<T>.onEachCatching(block: suspend (T) -> Unit) = OnEachCatching(this, block)

class OnEachCatching<T>(private val src: Flow<T>, private val block: suspend (T) -> Unit, bufferCapacity: Int = Channel.CONFLATED) {

    private val okValue = Channel<T>(bufferCapacity)

    private var failBlock: (suspend (Throwable) -> Unit)? = null

    init {
        GlobalScope.launch {
            src.collect { value ->
                runCatching { block(value) }
                    .onFailure { failBlock?.invoke(it) }
                    .onSuccess { okValue.send(value) }
            }

            okValue.close()
        }
    }

    fun onFailure(block: suspend (Throwable) -> Unit) = this.also {
        failBlock = block
    }

    fun resumeFlow() = okValue.consumeAsFlow()
}

用法:

someData
    .onEachCatching { handleData() }
    .onFailure { emit(DefaultData()) }
    .resumeFlow()
    .collect()

您应该捕获 fetchSomeData() 的异常,因此将 catch 从主流程移至 fetchSomeData():

    scope.launch {
        channel.asFlow().
            .flatMapLatest { fetchSomeData().catch { emit(DefaultData()} }
            .onEach { handleData() }
            .collect()

    }

我的设计是简单地重启收集流程

private fun startParsingMessages() {
    coroutineScope?.launch {
        sessionController.subscribeToMessages()
            .onEach { /*code block*/ }
            .catch {
                it.cause
                    ?.let { error -> Timber.e(error) }
                    ?: Timber.e("startSession(): ${it.message}")

                startParsingMessages() //here
            }
            .collect()
    }
}