如何创建一个从多个数据更新的 Flowable?
How to create a Flowable that updates from multiple data?
我正在构建一个聊天应用程序,遇到以下情况:在我的数据库中,我有 3 个数据源,但其中两个依赖于第一个。
A -> is a Contact
B -> is the last unread message
C -> is the messages count
我需要先获取联系人,然后使用其 ID 获取另外两个。要求是持续关注联系信息、未读消息或消息计数中的任何数据变化。我怎样才能使用 RxJava 只更新必要的不阻止 UI 来做到这一点? (有些人告诉我,我可以为此使用 Flowable)。
到目前为止我尝试过的:
fun queryAllChats(): Flowable<MutableList<Chat>> =
dao.queryContactsFlowable().flatMap { contacts ->
Flowable.fromIterable(contacts)
.flatMapSingle { contact ->
getLastUnreadMessage(contact.id)
.materialize()
.zipWith(
getUnreadCount(contact.id)
) { msg: Notification<Messages>, unreadCount: Int ->
Chat(contact, msg.value, unreadCount)
}
}.toList().toFlowable()
}.subscribeOn(schedulers.io).observeOn(schedulers.main)
在视图模型中
var test = LiveDataReactiveStreams.fromPublisher(queryAllChats())
但是好像只更新了一次就没有再更新了。
你的代码中的问题是,你只观察联系人列表的变化,并且每个联系人只获取一次最后消息和未读计数。
这应该有效:
fun queryAllChats() = dao.queryContactsFlowable()
.switchMap { contacts ->
val chats = contacts.map { contact ->
Flowable.combineLatest(
getUnreadCount(contact.id), // Flowable
getLastUnreadMessage(contact.id), // Flowable
{ unreadCount, unreadMessages ->
Chat(contact, unreadMessages, unreadCount)
})
}
return@switchMap Flowable.combineLatest(chats) {
it.map { it as Chat }
}
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
我正在构建一个聊天应用程序,遇到以下情况:在我的数据库中,我有 3 个数据源,但其中两个依赖于第一个。
A -> is a Contact
B -> is the last unread message
C -> is the messages count
我需要先获取联系人,然后使用其 ID 获取另外两个。要求是持续关注联系信息、未读消息或消息计数中的任何数据变化。我怎样才能使用 RxJava 只更新必要的不阻止 UI 来做到这一点? (有些人告诉我,我可以为此使用 Flowable)。
到目前为止我尝试过的:
fun queryAllChats(): Flowable<MutableList<Chat>> =
dao.queryContactsFlowable().flatMap { contacts ->
Flowable.fromIterable(contacts)
.flatMapSingle { contact ->
getLastUnreadMessage(contact.id)
.materialize()
.zipWith(
getUnreadCount(contact.id)
) { msg: Notification<Messages>, unreadCount: Int ->
Chat(contact, msg.value, unreadCount)
}
}.toList().toFlowable()
}.subscribeOn(schedulers.io).observeOn(schedulers.main)
在视图模型中
var test = LiveDataReactiveStreams.fromPublisher(queryAllChats())
但是好像只更新了一次就没有再更新了。
你的代码中的问题是,你只观察联系人列表的变化,并且每个联系人只获取一次最后消息和未读计数。
这应该有效:
fun queryAllChats() = dao.queryContactsFlowable()
.switchMap { contacts ->
val chats = contacts.map { contact ->
Flowable.combineLatest(
getUnreadCount(contact.id), // Flowable
getLastUnreadMessage(contact.id), // Flowable
{ unreadCount, unreadMessages ->
Chat(contact, unreadMessages, unreadCount)
})
}
return@switchMap Flowable.combineLatest(chats) {
it.map { it as Chat }
}
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())