如何创建一个从多个数据更新的 Flowable?

How to create a Flowable that updates from multiple data?

我正在构建一个聊天应用程序,遇到以下情况:在我的数据库中,我有 3 个数据源,但其中两个依赖于第一个。

A -> is a Contact
B -> is the last unread message
C -> is the messages count

我需要先获取联系人,然后使用其 ID 获取另外两个。要求是持续关注联系信息、未读消息或消息计数中的任何数据变化。我怎样才能使用 RxJava 只更新必要的不阻止 UI 来做到这一点? (有些人告诉我,我可以为此使用 Flowable)。

到目前为止我尝试过的:

fun queryAllChats(): Flowable<MutableList<Chat>> =
dao.queryContactsFlowable().flatMap { contacts ->
        Flowable.fromIterable(contacts)
            .flatMapSingle { contact ->
                getLastUnreadMessage(contact.id)
                    .materialize()
                    .zipWith(
                        getUnreadCount(contact.id)
                    ) { msg: Notification<Messages>, unreadCount: Int ->
                        Chat(contact, msg.value, unreadCount)
                    }
            }.toList().toFlowable()
    }.subscribeOn(schedulers.io).observeOn(schedulers.main)

在视图模型中

var test = LiveDataReactiveStreams.fromPublisher(queryAllChats())

但是好像只更新了一次就没有再更新了。

你的代码中的问题是,你只观察联系人列表的变化,并且每个联系人只获取一次最后消息和未读计数。

这应该有效:

  fun queryAllChats() = dao.queryContactsFlowable()
    .switchMap { contacts ->
      val chats = contacts.map { contact ->
        Flowable.combineLatest(
          getUnreadCount(contact.id), // Flowable
          getLastUnreadMessage(contact.id), // Flowable
          { unreadCount, unreadMessages ->
            Chat(contact, unreadMessages, unreadCount)
          })
      }
      return@switchMap Flowable.combineLatest(chats) {
        it.map { it as Chat }
      }
    }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())