如何在 callbackFlow 范围内正确处理来自回调的异常?

How do I correctly handle an exception from the callback within a callbackFlow scope?

我写了下面的callbackFlow:

fun getData(id: String) = callbackFlow {
    val listener = object : ValueEventListener {
        // if I throw here, the app crash
        override fun onDataChange(snapshot: DataSnapshot) {
            snapshot.getValue(Data::class.java) ?: throw RuntimeException("Error while reading data")
        }

        override fun onCancelled(error: DatabaseError) {
            throw error.toException()
        }
    }

    // if I throw here, exception is handled correctly by CoroutineExceptionHandler
    val dbRef = getDataRef(id)
    dbRef.addValueEventListener(listener)
    awaitClose { dbRef.removeEventListener(listener) }
}

我是这样收集流量的:

fun getData(id: String) = viewModelScope.launch(errorHandler) {
    db.getData(id).collect {
        // do something
    }
}

errorHandler 在哪里:

val errorHandler: CoroutineExceptionHandler
    get() = CoroutineExceptionHandler { _, throwable ->
        // do something with error
    }

如果我使用 onDataChangeonCancelled 方法抛出异常,应用程序会崩溃。

是否可以让callbackFlow作用域处理异常?我扔错了吗?我应该用适当的密封 class 调用 trySend 而不是抛出异常(并且可能在接收者中决定如何处理它)?

处理这种情况的最佳方法是什么?提前致谢。

在 Firebase API 中使用协程不是强制性的,但它确实使我们的开发更容易。也许还有其他解决方案,但我会这样做:

@ExperimentalCoroutinesApi
fun getDataFromFirebase(id: String) = callbackFlow  {
    val listener = object : ValueEventListener {
        override fun onDataChange(snapshot: DataSnapshot) {
            trySend(Result.Success(snapshot.getValue(Data::class.java)))
        }

        override fun onCancelled(error: DatabaseError) {
            trySend(Result.Error(error))
        }
    }
    val dbRef = getDataRef(id)
    dbRef.addValueEventListener(listener)
    awaitClose {
        dbRef.removeEventListener(listener)        }
}

在 ViewModel class 我会使用:

@ExperimentalCoroutinesApi
fun getData(id: String) = liveData(Dispatchers.IO) {
    repository.getDataFromFirebase(id).collect { response ->
        emit(response)
    }
}

在 activity class 里面我会使用这样的东西:

@ExperimentalCoroutinesApi
private fun getData() {
    viewModel.getData().observe(this) { response ->
        when(response) {
            is Result.Success -> print("Success")
            is Result.Error -> print("Error")
        }
    }
}

回调在 API/database 框架的上下文中执行,因此如果我们在回调中抛出,我们实际上会崩溃 API/database 组件,而不是流程。

要通过流程发送错误,您可以使用@alex-mamo 建议的结果对象。但是,如果您更愿意直接发送异常以在收集端失败,您可以取消生产者范围:

override fun onCancelled(error: DatabaseError) {
    this@callbackFlow.cancel("error message", error)
}

这类似于:

flow {
    delay(1000)
    throw Exception("error")
}

此外,CoroutineExceptionHandler 并不是要取代我们在 Kotlin 中处理异常的方式。它是未处理异常的最后手段处理程序,因此我们可以,例如以特定于我们的应用程序的方式记录它们。在您的情况下,使用常规 try ... catch.

似乎更好