创建发出侦听器回调结果的 Kotlin 协程延迟对象

Creating Kotlin coroutine Deferred object that emitts listener callback results

我目前正在一个项目中从 RxJava 切换到 Kotlin 协程,用协程等价物替换所有 Single 和 Observable return 类型。我仍然在为以下结构而苦苦挣扎:一个接口(例如存储库的接口)提供数据查询访问和 returns 一个 RxJava Single。该实现使用 Single.create 创建一个 Single 对象,并使用 onSuccess/onError 发出结果。现在检索数据的实现需要做的是创建一个带有回调的监听器并将该监听器注册到某个东西。然后,创建的侦听器的回调将在自制单曲上调用 onSuccess/onError。例如。使用 firebase(虽然我的问题不是特定于 firebase):

interface Repository {
    fun getData(query: Query): Single<DataSnapshot?>
}

fun getData(query: Query): Single<DataSnapshot?> = Single.create { emitter ->
    query.addListenerForSingleValueEvent(object : ValueEventListener {
        override fun onCancelled(error: DatabaseError?) {
            emitter.onError(Exception())
        }

        override fun onDataChange(data: DataSnapshot?) {
            emitter.onSuccess(data)
        }
    })
}

现在我想要的是接口方法 returning coroutine Deferred。必须如何创建实现,以便还可以注册一个带有回调的侦听器,然后由 Deferred 传递其结果?我看不到这些协程构建器的方法,如异步、启动等。做 onSuccess/onError 会做的事情。

interface Repository {
    fun getData(query: Query): Deferred<DataSnapshot?>
}

我的建议如下:

interface Repository {
    suspend fun getData(query: Query): Result<DataSnapshot>
}

其中结果可以是密封的 class,包含成功和错误情况:

sealed class Result<T> {
    class Success<T>(result: T) : Result<T>()
    class Error<T>(error: String) : Result<T>()
}

这样,在getData的实现端你可以做:

return Success(yourData)

return Error("Something went wrong")

一般来说,在处理协程时你应该避免返回 deferreds 并尝试使用它们 "as synchronous methods".

编辑: 现在我明白了这个问题,我希望这有助于解决它:

//This is as generic as it gets, you could use it on any Query, no need to retype it
suspend fun Query.await(): DataSnapshot = suspendCoroutine{cont ->
    addListenerForSingleValueEvent(object : ValueEventListener{
        override fun onCancelled(error: DatabaseError?) {
            cont.resumeWithException(error?: Exception("Unknown Error"))
        }

        override fun onDataChange(data: DataSnapshot?) {
            if(data != null){
                cont.resume(data)
            } else {
                cont.resumeWithException(Exception("Null data"))
            }

        }
    })
}
//this is your actual implementation
suspend fun getData(query: Query):DataSnapshot =
        query.await()

此代码假设 DatabaseError 扩展了 Exception 或 Throwable。如果不是,您需要为其创建包装器类型或使用我的原始解决方案并在这两种情况下都使用常规简历。

我认为我正在寻找的最接近的东西是 ReceiveChannel。想出了这个解决方案:

override fun getData(query: Query): ReceiveChannel<Datasnapshot?> = GlobalScope.produce {
    query.addListenerForSingleValueEvent(object : ValueEventListener {
        override fun onDataChange(data: DataSnapshot?) {
            launch { send(data) } }
        }

        override fun onCancelled(error: DatabaseError?) {
            throw Exception()
        }
    })
}

不确定这是否有点矫枉过正,还有更好的方法,欢迎提出建议