在 RxJava 中公开 "expensive" 个 Observable 的最佳实践

Best practices for exposing "expensive" Observables in RxJava

我是 RxJava 的新手,正在尝试确定常见的习语和最佳实践。

假设我有一个 Foo class 发出 Bars(目前不完整且过于简单):

class Foo {
    public Subscriber barSubscriber;
    public Observable<Bar> getBarObservable = (...details omitted...)

    private void someMethod() {
        // emit a Bar
        barSubscriber.onNext(bar); 
    }
}

想要订阅那些 Bars 的其他对象通过调用

foo.getBarObservable().subscribe(...);

假设生产和排放 Bars 是 "expensive"。为了避免在没有更多订阅者时这样做,Foo 的 getBarObservable 可以像这样公开一个可连接的、引用计数的 Observable(使用 share()):

class Foo {
    private Subscriber barSubscriber;
    private Observable<Bar> barObservable =  Observable.create(
            new Observable.OnSubscribe<Bar>() {
                @Override
                public void call(Subscriber<? super Bar> subscriber) {
                    Foo.this.subscriber = subscriber;
                    new Thread(new Runnable() {
                        @Override
                        public void run() {
                            runUntilUnsubscribed();
                        }
                    }).start();

                }
            }
    ).share();

    public Observable<Bar> getBarObservable() {
        return barObservable;
    }

    public void runUntilUnsubscribed(} {
        while(!subscriber.isUnsubscribed()) {

            /* do some heavy stuff that produces a Bar.  If, when a 
               Bar is ready, we still have subscribers, emit the Bar */

            if (!subscriber.isUnsubscribed()) 
                subscriber.onNext(bar);
        }
    }
}

我见过的大多数示例和教程都是在订阅它们的同一块代码中即时创建 Observables 内联,所以我不清楚标准做法是什么在更真实的场景中,Observable 的创建和订阅在两个不同的地方。

  1. 对于像 Foo 这样的 class 不想知道它的订阅者是谁或有多少订阅者,这是正确的方法吗?
  2. 在我看来,这将是一个非常典型的场景 - 是吗?或者,在更高层次上,这不是考虑公开 Observable 的正确方法吗?常规使用这种方法有缺点吗?
  3. 在我看来,每次我想发出 Bar 时,我都需要那个小 if (subscriber == null && !subscriber.isUnsubscribed()) subscriber.onNext(bar); 模式。这也是一个常见的习语,还是有更好的方法?没关系,我不需要空检查...,不确定我在那里想什么。
  1. 是的,这大致是正确的方法。如果 Foo 中的 bar 需要共享给所有订阅者,则使用 .publish().refCount()(或如您所说的 share())。如果没有,那么使用一个普通的 Observable,默认是 "cold"。

  2. 公开 Observables 是一个常见的场景。在一个好的反应式架构中,大多数 classes 只有 Observable 的 getter,因为 setter 本质上不是反应式的。给定一个使用 setter 的程序或 class,您通常可以将其转换为 Observables 和 getter 而不会影响功能。由于某些控制反转,Observable 和 getter 是一种理想的方法。使用 setter,如果 FooBaz 中设置了一个值,您需要随时查看 class Foo 以了解 Baz。但是对于 Observables 和 getter,BazFoo 获取并且 Baz 定义了它自己是如何工作的,并且 Foo 可以忽略 Baz.

  3. 我从来不需要使用那个 if 模式。我也很少需要 Observable.create()。有许多 Observable 创建助手(fromintervalrangejust,仅举几例)和 Observable 转换(例如无所不能的 flatMap) 让你在表达新的 Observables 方面走得更远。 Subjects 还允许您随时随地手动创建 Observables。

您的示例 class 无法正常工作:如果订阅者是 nullsetBar 可能会抛出 NPE,runUntilUnsubscribed 引用了缺失的柱 field/value并且是一个繁忙的循环,会一遍又一遍地发出相同的值。

你说创建一个 Bar 很昂贵,但它的创建似乎在 Foo class 之外,我猜你想将这样的值分配给当前订阅的订户。这就是 PublishSubject 的用途:

class Foo {
    final PublishSubject<Bar> subject = PublishSubject.create();
    public void setBar(Bar bar) {
        subject.onNext(bar);
    }
    public Observable<Bar> getBarObservable() {
        return subject; // .asObservable() if you want to hide the subject
    }
}

如果没有任何订阅者,条形图集就会掉下来并被垃圾收集起来。如果您想保留最后一个值,请使用 BehaviorSubject 而不是 PublishSubject.

否则,如果您需要在订阅者到达时触发昂贵的 Bar 值的创建,您可以使用一些带有 share():

的启动序列
Observable.just(1)
.subscribeOn(Schedulers.computation())
.map(v -> createBar())
.share();

但是 share() 的使用实际上取决于每个 Bar 值的预期生命周期。

例如,如果您想在订阅者到达之前存储柱状图,然后进行一次繁重的计算并发送结果,您可以采用以下结构:

class Foo {
    final BehaviorSubject<Bar> subject = BehaviorSubject.create();
    final Observable<Bar> output = subject
        .observeOn(Schedulers.computation())
        .doOnNext(bar -> expensiveInplaceComputation(bar))
        .take(1)
        .share();

    public void setBar(Bar bar) {
        subject.onNext(bar);
    }
    public Observable<Bar> getBarObservable() {
        return output;
    }
}

有关可运行示例,请参阅 this gist