使用协程的 Firebase 实时快照监听器
Firebase realtime snapshot listener using Coroutines
我希望能够在我的 ViewModel 中使用 Kotlin 协程收听 Firebase DB 中的实时更新。
问题是每当在集合中创建新消息时,我的应用程序就会冻结并且无法从该状态恢复。我需要杀死它并重新启动应用程序。
第一次通过,我可以在 UI 上看到以前的消息。第二次调用 SnapshotListener
时会出现此问题。
我的observer()
函数
val channel = Channel<List<MessageEntity>>()
firestore.collection(path).addSnapshotListener { data, error ->
if (error != null) {
channel.close(error)
} else {
if (data != null) {
val messages = data.toObjects(MessageEntity::class.java)
//till this point it gets executed^^^^
channel.sendBlocking(messages)
} else {
channel.close(CancellationException("No data received"))
}
}
}
return channel
这就是我想要观察消息的方式
launch(Dispatchers.IO) {
val newMessages =
messageRepository
.observer()
.receive()
}
}
在我用 send()
替换 sendBlocking()
之后,我仍然没有在频道中收到任何新消息。 SnapshotListener
方被执行
//channel.sendBlocking(messages) was replaced by code bellow
scope.launch(Dispatchers.IO) {
channel.send(messages)
}
//scope is my viewModel
如何使用 Kotlin 协同程序在 firestore/realtime-dbs 中观察消息?
我最终得到的是我使用了Flow,它是协程1.2.0-alpha-2
的一部分
return flowViaChannel { channel ->
firestore.collection(path).addSnapshotListener { data, error ->
if (error != null) {
channel.close(error)
} else {
if (data != null) {
val messages = data.toObjects(MessageEntity::class.java)
channel.sendBlocking(messages)
} else {
channel.close(CancellationException("No data received"))
}
}
}
channel.invokeOnClose {
it?.printStackTrace()
}
}
这就是我在 ViewModel 中观察它的方式
launch {
messageRepository.observe().collect {
//process
}
}
关于主题的更多信息 https://medium.com/@elizarov/cold-flows-hot-channels-d74769805f9
我有这些扩展函数,所以我可以简单地以流的形式从查询中获取结果。
Flow 是 Kotlin 协程构造,非常适合此目的。
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/
@ExperimentalCoroutinesApi
fun CollectionReference.getQuerySnapshotFlow(): Flow<QuerySnapshot?> {
return callbackFlow {
val listenerRegistration =
addSnapshotListener { querySnapshot, firebaseFirestoreException ->
if (firebaseFirestoreException != null) {
cancel(
message = "error fetching collection data at path - $path",
cause = firebaseFirestoreException
)
return@addSnapshotListener
}
offer(querySnapshot)
}
awaitClose {
Timber.d("cancelling the listener on collection at path - $path")
listenerRegistration.remove()
}
}
}
@ExperimentalCoroutinesApi
fun <T> CollectionReference.getDataFlow(mapper: (QuerySnapshot?) -> T): Flow<T> {
return getQuerySnapshotFlow()
.map {
return@map mapper(it)
}
}
以下是如何使用上述功能的示例。
@ExperimentalCoroutinesApi
fun getShoppingListItemsFlow(): Flow<List<ShoppingListItem>> {
return FirebaseFirestore.getInstance()
.collection("$COLLECTION_SHOPPING_LIST")
.getDataFlow { querySnapshot ->
querySnapshot?.documents?.map {
getShoppingListItemFromSnapshot(it)
} ?: listOf()
}
}
// Parses the document snapshot to the desired object
fun getShoppingListItemFromSnapshot(documentSnapshot: DocumentSnapshot) : ShoppingListItem {
return documentSnapshot.toObject(ShoppingListItem::class.java)!!
}
并且在您的 ViewModel class(或您的 Fragment)中,确保您从正确的范围调用它,以便在用户离开屏幕时适当地删除侦听器。
viewModelScope.launch {
getShoppingListItemsFlow().collect{
// Show on the view.
}
}
删除回调的扩展函数
对于 Firebase 的 Firestore 数据库,有两种类型的调用。
- 一次性请求 -
addOnCompleteListener
- 实时更新 -
addSnapshotListener
一次性请求
对于一次性请求,库 org.jetbrains.kotlinx:kotlinx-coroutines-play-services:X.X.X
提供了一个 await
扩展函数。函数 returns 来自 addOnCompleteListener
.
有关最新版本,请参阅 Maven 存储库,kotlinx-coroutines-play-services。
资源
- Using Firebase on Android with Kotlin Coroutines by Joe Birch
- Using Kotlin Extension Functions and Coroutines with Firebase by Rosário Pereira Fernandes
实时更新
扩展函数 awaitRealtime
进行检查,包括验证 continuation
的状态,以查看它是否处于 isActive
状态。这很重要,因为当用户的主要内容提要通过生命周期事件、手动刷新提要或从他们的提要中删除内容时,将调用该函数。没有这个检查就会崩溃。
ExtenstionFuction.kt
data class QueryResponse(val packet: QuerySnapshot?, val error: FirebaseFirestoreException?)
suspend fun Query.awaitRealtime() = suspendCancellableCoroutine<QueryResponse> { continuation ->
addSnapshotListener({ value, error ->
if (error == null && continuation.isActive)
continuation.resume(QueryResponse(value, null))
else if (error != null && continuation.isActive)
continuation.resume(QueryResponse(null, error))
})
}
为了处理错误,使用了 try
/catch
模式。
Repository.kt
object ContentRepository {
fun getMainFeedList(isRealtime: Boolean, timeframe: Timestamp) = flow<Lce<PagedListResult>> {
emit(Loading())
val labeledSet = HashSet<String>()
val user = usersDocument.collection(getInstance().currentUser!!.uid)
syncLabeledContent(user, timeframe, labeledSet, SAVE_COLLECTION, this)
getLoggedInNonRealtimeContent(timeframe, labeledSet, this)
}
// Realtime updates with 'awaitRealtime' used
private suspend fun syncLabeledContent(user: CollectionReference, timeframe: Timestamp,
labeledSet: HashSet<String>, collection: String,
lce: FlowCollector<Lce<PagedListResult>>) {
val response = user.document(COLLECTIONS_DOCUMENT)
.collection(collection)
.orderBy(TIMESTAMP, DESCENDING)
.whereGreaterThanOrEqualTo(TIMESTAMP, timeframe)
.awaitRealtime()
if (response.error == null) {
val contentList = response.packet?.documentChanges?.map { doc ->
doc.document.toObject(Content::class.java).also { content ->
labeledSet.add(content.id)
}
}
database.contentDao().insertContentList(contentList)
} else lce.emit(Error(PagedListResult(null,
"Error retrieving user save_collection: ${response.error?.localizedMessage}")))
}
// One time updates with 'await' used
private suspend fun getLoggedInNonRealtimeContent(timeframe: Timestamp,
labeledSet: HashSet<String>,
lce: FlowCollector<Lce<PagedListResult>>) =
try {
database.contentDao().insertContentList(
contentEnCollection.orderBy(TIMESTAMP, DESCENDING)
.whereGreaterThanOrEqualTo(TIMESTAMP, timeframe).get().await()
.documentChanges
?.map { change -> change.document.toObject(Content::class.java) }
?.filter { content -> !labeledSet.contains(content.id) })
lce.emit(Lce.Content(PagedListResult(queryMainContentList(timeframe), "")))
} catch (error: FirebaseFirestoreException) {
lce.emit(Error(PagedListResult(
null,
CONTENT_LOGGED_IN_NON_REALTIME_ERROR + "${error.localizedMessage}")))
}
}
这对我有用:
suspend fun DocumentReference.observe(block: suspend (getNextSnapshot: suspend ()->DocumentSnapshot?)->Unit) {
val channel = Channel<Pair<DocumentSnapshot?, FirebaseFirestoreException?>>(Channel.UNLIMITED)
val listenerRegistration = this.addSnapshotListener { value, error ->
channel.sendBlocking(Pair(value, error))
}
try {
block {
val (value, error) = channel.receive()
if (error != null) {
throw error
}
value
}
}
finally {
channel.close()
listenerRegistration.remove()
}
}
然后你可以像这样使用它:
docRef.observe { getNextSnapshot ->
while (true) {
val value = getNextSnapshot() ?: continue
// do whatever you like with the database snapshot
}
}
如果观察者块抛出错误,或者块完成,或者你的协程被取消,监听器会自动移除。
我希望能够在我的 ViewModel 中使用 Kotlin 协程收听 Firebase DB 中的实时更新。
问题是每当在集合中创建新消息时,我的应用程序就会冻结并且无法从该状态恢复。我需要杀死它并重新启动应用程序。
第一次通过,我可以在 UI 上看到以前的消息。第二次调用 SnapshotListener
时会出现此问题。
我的observer()
函数
val channel = Channel<List<MessageEntity>>()
firestore.collection(path).addSnapshotListener { data, error ->
if (error != null) {
channel.close(error)
} else {
if (data != null) {
val messages = data.toObjects(MessageEntity::class.java)
//till this point it gets executed^^^^
channel.sendBlocking(messages)
} else {
channel.close(CancellationException("No data received"))
}
}
}
return channel
这就是我想要观察消息的方式
launch(Dispatchers.IO) {
val newMessages =
messageRepository
.observer()
.receive()
}
}
在我用 send()
替换 sendBlocking()
之后,我仍然没有在频道中收到任何新消息。 SnapshotListener
方被执行
//channel.sendBlocking(messages) was replaced by code bellow
scope.launch(Dispatchers.IO) {
channel.send(messages)
}
//scope is my viewModel
如何使用 Kotlin 协同程序在 firestore/realtime-dbs 中观察消息?
我最终得到的是我使用了Flow,它是协程1.2.0-alpha-2
return flowViaChannel { channel ->
firestore.collection(path).addSnapshotListener { data, error ->
if (error != null) {
channel.close(error)
} else {
if (data != null) {
val messages = data.toObjects(MessageEntity::class.java)
channel.sendBlocking(messages)
} else {
channel.close(CancellationException("No data received"))
}
}
}
channel.invokeOnClose {
it?.printStackTrace()
}
}
这就是我在 ViewModel 中观察它的方式
launch {
messageRepository.observe().collect {
//process
}
}
关于主题的更多信息 https://medium.com/@elizarov/cold-flows-hot-channels-d74769805f9
我有这些扩展函数,所以我可以简单地以流的形式从查询中获取结果。
Flow 是 Kotlin 协程构造,非常适合此目的。 https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/
@ExperimentalCoroutinesApi
fun CollectionReference.getQuerySnapshotFlow(): Flow<QuerySnapshot?> {
return callbackFlow {
val listenerRegistration =
addSnapshotListener { querySnapshot, firebaseFirestoreException ->
if (firebaseFirestoreException != null) {
cancel(
message = "error fetching collection data at path - $path",
cause = firebaseFirestoreException
)
return@addSnapshotListener
}
offer(querySnapshot)
}
awaitClose {
Timber.d("cancelling the listener on collection at path - $path")
listenerRegistration.remove()
}
}
}
@ExperimentalCoroutinesApi
fun <T> CollectionReference.getDataFlow(mapper: (QuerySnapshot?) -> T): Flow<T> {
return getQuerySnapshotFlow()
.map {
return@map mapper(it)
}
}
以下是如何使用上述功能的示例。
@ExperimentalCoroutinesApi
fun getShoppingListItemsFlow(): Flow<List<ShoppingListItem>> {
return FirebaseFirestore.getInstance()
.collection("$COLLECTION_SHOPPING_LIST")
.getDataFlow { querySnapshot ->
querySnapshot?.documents?.map {
getShoppingListItemFromSnapshot(it)
} ?: listOf()
}
}
// Parses the document snapshot to the desired object
fun getShoppingListItemFromSnapshot(documentSnapshot: DocumentSnapshot) : ShoppingListItem {
return documentSnapshot.toObject(ShoppingListItem::class.java)!!
}
并且在您的 ViewModel class(或您的 Fragment)中,确保您从正确的范围调用它,以便在用户离开屏幕时适当地删除侦听器。
viewModelScope.launch {
getShoppingListItemsFlow().collect{
// Show on the view.
}
}
删除回调的扩展函数
对于 Firebase 的 Firestore 数据库,有两种类型的调用。
- 一次性请求 -
addOnCompleteListener
- 实时更新 -
addSnapshotListener
一次性请求
对于一次性请求,库 org.jetbrains.kotlinx:kotlinx-coroutines-play-services:X.X.X
提供了一个 await
扩展函数。函数 returns 来自 addOnCompleteListener
.
有关最新版本,请参阅 Maven 存储库,kotlinx-coroutines-play-services。
资源
- Using Firebase on Android with Kotlin Coroutines by Joe Birch
- Using Kotlin Extension Functions and Coroutines with Firebase by Rosário Pereira Fernandes
实时更新
扩展函数 awaitRealtime
进行检查,包括验证 continuation
的状态,以查看它是否处于 isActive
状态。这很重要,因为当用户的主要内容提要通过生命周期事件、手动刷新提要或从他们的提要中删除内容时,将调用该函数。没有这个检查就会崩溃。
ExtenstionFuction.kt
data class QueryResponse(val packet: QuerySnapshot?, val error: FirebaseFirestoreException?)
suspend fun Query.awaitRealtime() = suspendCancellableCoroutine<QueryResponse> { continuation ->
addSnapshotListener({ value, error ->
if (error == null && continuation.isActive)
continuation.resume(QueryResponse(value, null))
else if (error != null && continuation.isActive)
continuation.resume(QueryResponse(null, error))
})
}
为了处理错误,使用了 try
/catch
模式。
Repository.kt
object ContentRepository {
fun getMainFeedList(isRealtime: Boolean, timeframe: Timestamp) = flow<Lce<PagedListResult>> {
emit(Loading())
val labeledSet = HashSet<String>()
val user = usersDocument.collection(getInstance().currentUser!!.uid)
syncLabeledContent(user, timeframe, labeledSet, SAVE_COLLECTION, this)
getLoggedInNonRealtimeContent(timeframe, labeledSet, this)
}
// Realtime updates with 'awaitRealtime' used
private suspend fun syncLabeledContent(user: CollectionReference, timeframe: Timestamp,
labeledSet: HashSet<String>, collection: String,
lce: FlowCollector<Lce<PagedListResult>>) {
val response = user.document(COLLECTIONS_DOCUMENT)
.collection(collection)
.orderBy(TIMESTAMP, DESCENDING)
.whereGreaterThanOrEqualTo(TIMESTAMP, timeframe)
.awaitRealtime()
if (response.error == null) {
val contentList = response.packet?.documentChanges?.map { doc ->
doc.document.toObject(Content::class.java).also { content ->
labeledSet.add(content.id)
}
}
database.contentDao().insertContentList(contentList)
} else lce.emit(Error(PagedListResult(null,
"Error retrieving user save_collection: ${response.error?.localizedMessage}")))
}
// One time updates with 'await' used
private suspend fun getLoggedInNonRealtimeContent(timeframe: Timestamp,
labeledSet: HashSet<String>,
lce: FlowCollector<Lce<PagedListResult>>) =
try {
database.contentDao().insertContentList(
contentEnCollection.orderBy(TIMESTAMP, DESCENDING)
.whereGreaterThanOrEqualTo(TIMESTAMP, timeframe).get().await()
.documentChanges
?.map { change -> change.document.toObject(Content::class.java) }
?.filter { content -> !labeledSet.contains(content.id) })
lce.emit(Lce.Content(PagedListResult(queryMainContentList(timeframe), "")))
} catch (error: FirebaseFirestoreException) {
lce.emit(Error(PagedListResult(
null,
CONTENT_LOGGED_IN_NON_REALTIME_ERROR + "${error.localizedMessage}")))
}
}
这对我有用:
suspend fun DocumentReference.observe(block: suspend (getNextSnapshot: suspend ()->DocumentSnapshot?)->Unit) {
val channel = Channel<Pair<DocumentSnapshot?, FirebaseFirestoreException?>>(Channel.UNLIMITED)
val listenerRegistration = this.addSnapshotListener { value, error ->
channel.sendBlocking(Pair(value, error))
}
try {
block {
val (value, error) = channel.receive()
if (error != null) {
throw error
}
value
}
}
finally {
channel.close()
listenerRegistration.remove()
}
}
然后你可以像这样使用它:
docRef.observe { getNextSnapshot ->
while (true) {
val value = getNextSnapshot() ?: continue
// do whatever you like with the database snapshot
}
}
如果观察者块抛出错误,或者块完成,或者你的协程被取消,监听器会自动移除。