使用 RxJava2 重新查询和更新订阅者

Requerying and updating subscribers with RxJava2

我的应用程序中有一个由改造服务支持的数据层。 (到目前为止只有网络持久化。当我进一步开发时,我将添加离线优先本地存储)

当我调用服务器时,

为我改造 returns 和 Observable<List<Item>>。这很好用。在我的订阅者中,我在订阅时收到列表,然后可以用 Items.

填充我的 UI

我遇到的问题是:如果列表被修改(通过某种外部机制),我如何才能使可观察对象重新查询改装服务并发出新的项目列表。我知道数据已经过时,但我不确定如何启动重新查询。

这是我的 DataManager

的精简版
class DataManager {

    // Retrofit
    RetrofitItemsService itemsService;

    // The observalble provided by retrofit
    Observable<List<Item>> itemsObservable;

    //ctor
    public DataManager(RetrofitItemsService itemsService) {
        this.itemsService = itemsService;
    }

    /* Creates and stores an observable if one has not been created yet.
     * Returns the observable so that it can be subscribed to by the function caller
     */
    public Observable<List<Item>> getItems(){
        if(itemsObservable == null){
            itemsObservable = itemsService.getItems();
        }

        return itemsObservable;
    }

    /* Adds a new Item to the list.
     */
    public Completable addItem(Item item){
        Completable call = itemsService.addItem(item);

        call.subscribe(()->{
            /*
             < < < Here > > >
             If someone has previously called getItems before this item was added, they now have stale data.

             How can I call something like:

             itemsObservable.refreshAllSubscribers()
            */
        });

        return call;
    }
}

你在这里遇到的问题在于 可观察。您可以 google 找到很多很棒的文章,其中详细描述了差异,所以我只描述基础知识。

Cold observable 为 every 订阅者创建一个新的生产者。这意味着,当两个独立的订阅者订阅 相同的冷可观察对象 时,他们每个人都会收到这些发射的不同实例。它们可能(!)相等,但它们始终是不同的对象。适用于您的情况,每个订阅者都有自己的生产者,它向服务器请求数据并将其传递到流中。每个订阅者都从其自己的生产者那里获得数据。

Hot observable 与其所有观察者共享一个生产者。例如,如果生产者正在遍历对象集合,则在发射过程中加入第二个订阅者意味着它将仅获得之后发射的项目(如果它没有通过 replay 等运算符进行修改)。任何订阅者收到的每个对象在所有观察者中也是相同的实例,因为它来自单个生产者。

所以从表面上看,您需要一个热可观察对象来分发您的数据,这样当您知道它不再有效时,您只需通过这个热可观察对象发出一次,每个观察者都会对一个更新。

幸运的是,将冷 observable 变成热 observable 通常没什么大不了的。您可以创建自己的生产者来模仿这种行为,使用流行的运算符之一,如 share 或者您可以只转换流,使其表现得像一个。

我建议使用 PublishSubject 来刷新数据并将其与原始的冷可观察对象合并,如下所示:

class DataManager {

    .....

    PublishSubject<Boolean> refreshSubject = PublishSubject.create();

    // The observable for retrieving always fresh data
    Observable<List<Item>> itemsObservable;

    //ctor
    public DataManager(RetrofitItemsService itemsService) {
        this.itemsService = itemsService;
        itemsObservable = itemsService.getItems()
                              .mergeWith(refreshSubject.flatMap(refresh -> itemsService.getItems()))
    }


    public Observable<List<Item>> getItems(){
        return itemsObservable;
    }

    /* Adds a new Item to the list.
     */
    public Completable addItem(Item item){
        Completable call = itemsService.addItem(item);

        call.subscribe(()->{
            refreshSubject.onNext(true);
        });

        return call;
    }
}

我猜 itemsService.getItems() returns 是一个单一的元素 Observable 因此消费者无论如何都必须重新订阅才能获得新数据,他们会得到它,因为 Retrofit Observables 是 deferred/lazy 还有。

你可以有一个单独的,"long" Observable,通过 PublishSubject 的帮助,你可以在数据更改时触发:

final Subject<Object> onItemsChanged = PublishSubject.create().toSerialized();

public Observable<Object> itemsChanged() {
    return onItemsChanged;
}

public Completable addItem(Item item){
    Completable call = itemsService.addItem(item);

    // prevent triggering the addItem multiple times
    // Needs RxJava 2 Extensions library for now
    // as there is no Completable.cache() or equivalent in 2.0.3
    CompletableSubject cs = CompletableSubject.create();

    call.doOnComplete(() -> onItemsChanged.onNext("changed"))
    .subscribe(cs);

    return cs;
}