使用协程的 Firebase 实时快照监听器

Firebase realtime snapshot listener using Coroutines

我希望能够在我的 ViewModel 中使用 Kotlin 协程收听 Firebase DB 中的实时更新。

问题是每当在集合中创建新消息时,我的应用程序就会冻结并且无法从该状态恢复。我需要杀死它并重新启动应用程序。

第一次通过,我可以在 UI 上看到以前的消息。第二次调用 SnapshotListener 时会出现此问题。

我的observer()函数

val channel = Channel<List<MessageEntity>>()
firestore.collection(path).addSnapshotListener { data, error ->
    if (error != null) {
        channel.close(error)
    } else {
        if (data != null) {
            val messages = data.toObjects(MessageEntity::class.java)
            //till this point it gets executed^^^^
            channel.sendBlocking(messages)
        } else {
            channel.close(CancellationException("No data received"))
        }
    }
}
return channel

这就是我想要观察消息的方式

launch(Dispatchers.IO) {
        val newMessages =
            messageRepository
                .observer()
                .receive()
    }
}

在我用 send() 替换 sendBlocking() 之后,我仍然没有在频道中收到任何新消息。 SnapshotListener 方被执行

//channel.sendBlocking(messages) was replaced by code bellow
scope.launch(Dispatchers.IO) {
    channel.send(messages)
}
//scope is my viewModel

如何使用 Kotlin 协同程序在 firestore/realtime-dbs 中观察消息?

我最终得到的是我使用了Flow,它是协程1.2.0-alpha-2

的一部分
return flowViaChannel { channel ->
   firestore.collection(path).addSnapshotListener { data, error ->
        if (error != null) {
            channel.close(error)
        } else {
            if (data != null) {
                val messages = data.toObjects(MessageEntity::class.java)
                channel.sendBlocking(messages)
            } else {
                channel.close(CancellationException("No data received"))
            }
        }
    }
    channel.invokeOnClose {
        it?.printStackTrace()
    }
} 

这就是我在 ViewModel 中观察它的方式

launch {
    messageRepository.observe().collect {
        //process
    }
}

关于主题的更多信息 https://medium.com/@elizarov/cold-flows-hot-channels-d74769805f9

我有这些扩展函数,所以我可以简单地以流的形式从查询中获取结果。

Flow 是 Kotlin 协程构造,非常适合此目的。 https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/

@ExperimentalCoroutinesApi
fun CollectionReference.getQuerySnapshotFlow(): Flow<QuerySnapshot?> {
    return callbackFlow {
        val listenerRegistration =
            addSnapshotListener { querySnapshot, firebaseFirestoreException ->
                if (firebaseFirestoreException != null) {
                    cancel(
                        message = "error fetching collection data at path - $path",
                        cause = firebaseFirestoreException
                    )
                    return@addSnapshotListener
                }
                offer(querySnapshot)
            }
        awaitClose {
            Timber.d("cancelling the listener on collection at path - $path")
            listenerRegistration.remove()
        }
    }
}

@ExperimentalCoroutinesApi
fun <T> CollectionReference.getDataFlow(mapper: (QuerySnapshot?) -> T): Flow<T> {
    return getQuerySnapshotFlow()
        .map {
            return@map mapper(it)
        }
}

以下是如何使用上述功能的示例。

@ExperimentalCoroutinesApi
fun getShoppingListItemsFlow(): Flow<List<ShoppingListItem>> {
    return FirebaseFirestore.getInstance()
        .collection("$COLLECTION_SHOPPING_LIST")
        .getDataFlow { querySnapshot ->
            querySnapshot?.documents?.map {
                getShoppingListItemFromSnapshot(it)
            } ?: listOf()
        }
}

// Parses the document snapshot to the desired object
fun getShoppingListItemFromSnapshot(documentSnapshot: DocumentSnapshot) : ShoppingListItem {
        return documentSnapshot.toObject(ShoppingListItem::class.java)!!
    }

并且在您的 ViewModel class(或您的 Fragment)中,确保您从正确的范围调用它,以便在用户离开屏幕时适当地删除侦听器。

viewModelScope.launch {
   getShoppingListItemsFlow().collect{
     // Show on the view.
   }
}

删除回调的扩展函数

对于 Firebase 的 Firestore 数据库,有两种类型的调用。

  1. 一次性请求 - addOnCompleteListener
  2. 实时更新 - addSnapshotListener

一次性请求

对于一次性请求,库 org.jetbrains.kotlinx:kotlinx-coroutines-play-services:X.X.X 提供了一个 await 扩展函数。函数 returns 来自 addOnCompleteListener.

有关最新版本,请参阅 Maven 存储库,kotlinx-coroutines-play-services

资源

实时更新

扩展函数 awaitRealtime 进行检查,包括验证 continuation 的状态,以查看它是否处于 isActive 状态。这很重要,因为当用户的主要内容提要通过生命周期事件、手动刷新提要或从他们的提要中删除内容时,将调用该函数。没有这个检查就会崩溃。

ExtenstionFuction.kt

data class QueryResponse(val packet: QuerySnapshot?, val error: FirebaseFirestoreException?)

suspend fun Query.awaitRealtime() = suspendCancellableCoroutine<QueryResponse> { continuation ->
    addSnapshotListener({ value, error ->
        if (error == null && continuation.isActive)
            continuation.resume(QueryResponse(value, null))
        else if (error != null && continuation.isActive)
            continuation.resume(QueryResponse(null, error))
    })
}

为了处理错误,使用了 try/catch 模式。

Repository.kt

object ContentRepository {
    fun getMainFeedList(isRealtime: Boolean, timeframe: Timestamp) = flow<Lce<PagedListResult>> {
        emit(Loading())
        val labeledSet = HashSet<String>()
        val user = usersDocument.collection(getInstance().currentUser!!.uid)
        syncLabeledContent(user, timeframe, labeledSet, SAVE_COLLECTION, this)
        getLoggedInNonRealtimeContent(timeframe, labeledSet, this)        
    }
    // Realtime updates with 'awaitRealtime' used
    private suspend fun syncLabeledContent(user: CollectionReference, timeframe: Timestamp,
                                       labeledSet: HashSet<String>, collection: String,
                                       lce: FlowCollector<Lce<PagedListResult>>) {
        val response = user.document(COLLECTIONS_DOCUMENT)
            .collection(collection)
            .orderBy(TIMESTAMP, DESCENDING)
            .whereGreaterThanOrEqualTo(TIMESTAMP, timeframe)
            .awaitRealtime()
        if (response.error == null) {
            val contentList = response.packet?.documentChanges?.map { doc ->
                doc.document.toObject(Content::class.java).also { content ->
                    labeledSet.add(content.id)
                }
            }
            database.contentDao().insertContentList(contentList)
        } else lce.emit(Error(PagedListResult(null,
            "Error retrieving user save_collection: ${response.error?.localizedMessage}")))
    }
    // One time updates with 'await' used
    private suspend fun getLoggedInNonRealtimeContent(timeframe: Timestamp,
                                                      labeledSet: HashSet<String>,
                                                      lce: FlowCollector<Lce<PagedListResult>>) =
            try {
                database.contentDao().insertContentList(
                        contentEnCollection.orderBy(TIMESTAMP, DESCENDING)
                                .whereGreaterThanOrEqualTo(TIMESTAMP, timeframe).get().await()
                                .documentChanges
                                ?.map { change -> change.document.toObject(Content::class.java) }
                                ?.filter { content -> !labeledSet.contains(content.id) })
                lce.emit(Lce.Content(PagedListResult(queryMainContentList(timeframe), "")))
            } catch (error: FirebaseFirestoreException) {
                lce.emit(Error(PagedListResult(
                        null,
                        CONTENT_LOGGED_IN_NON_REALTIME_ERROR + "${error.localizedMessage}")))
            }
}

这对我有用:

suspend fun DocumentReference.observe(block: suspend (getNextSnapshot: suspend ()->DocumentSnapshot?)->Unit) {
    val channel = Channel<Pair<DocumentSnapshot?, FirebaseFirestoreException?>>(Channel.UNLIMITED)

    val listenerRegistration = this.addSnapshotListener { value, error ->
        channel.sendBlocking(Pair(value, error))
    }

    try {
        block {
            val (value, error) = channel.receive()

            if (error != null) {
                throw error
            }
            value
        }
    }
    finally {
        channel.close()
        listenerRegistration.remove()
    }
}

然后你可以像这样使用它:

docRef.observe { getNextSnapshot ->
    while (true) {
         val value = getNextSnapshot() ?: continue
         // do whatever you like with the database snapshot
    }
}

如果观察者块抛出错误,或者块完成,或者你的协程被取消,监听器会自动移除。