RxAndroid:创建简单的 Hot Observable

RxAndroid: Create Simple Hot Observable

我正在创建一个在订阅时发出整数的 Observable。我现在的实现已经设置好,因此订阅它的行为会从一开始就触发生成,如下所示:

private Observable createObservable() {
    return Observable.create (
        new Observable.OnSubscribe<Integer>() {

            @Override
            public void call(Subscriber<? super Integer> sub) {

                for (int i = 1; i < MAX_PROGRESS + 1; i++) {
                    sub.onNext(i);
                    SystemClock.sleep(1000);
                }
                sub.onCompleted();
            }
        }
    );
}

我的理解是这是一个冷的 Observable。我希望生成的序列与任何订阅者无关,并且当订阅者订阅时,希望他们接收在订阅时恰好是最新的值。 IOW,把它变成一个热 Observable。我宁愿不将 Observable 子类化,因为它将它绑定到一个具体的 Integer 中,而实际上实际类型会有所不同。

在您的示例中,您每次调用函数时都使用 "Observable.create"。 "Hot" 订阅从 Observable 保存一些实例。您还需要使用一些 Rx 方法(缓存(),重试())在代码中它看起来像:

    public Observable<Bitmap> mObservable;
    Subscriber<Bitmap> mSubscriber;
    Subscription mSubscription;
    Bitmap loadedBitmap;

....

@Override
    public View onCreateView(LayoutInflater inflater, ViewGroup container,
                             Bundle savedInstanceState) {

        if (savedInstanceState != null) {
            loadedBitmap = savedInstanceState.getParcelable("LoadedBitmap");
            imageView.setImageBitmap(loadedBitmap);
        }
        else {
            runNewObservable();
        }
        runSubscribe();
        return mainView;
    }

....

    private void runNewObservable () {
        mObservable =
                Observable.create(new Observable.OnSubscribe<Bitmap>() {
            @Override
            public void call(Subscriber<? super Bitmap> subscriber) {
                subscriber.onNext(new LoadingImage().loadImageFrom(imageURL));
                subscriber.onCompleted();
            }
        })
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .cache();

    }



    private void runNewSubscribe () {
        mSubscriber = new Subscriber<Bitmap>() {

            @Override
            public void onCompleted() { }

            @Override
            public void onError(Throwable e) { }

            @Override
            public void onNext(Bitmap bitmap) {
                loadedBitmap = bitmap;
                imageView.setImageBitmap(bitmap);
            }
        };
    }

    @Override
    public void onResume() {
        super.onResume();
        mSubscription = mObservable.subscribe(mSubscriber);
    }


    @Override
    public void onSaveInstanceState(Bundle outState) {
        super.onSaveInstanceState(outState);
        outState.putParcelable("LoadedBitmap", loadedBitmap);
    }



    @Override
    public void onStop() {
        super.onStop();
        mSubscription.unsubscribe();
    }

....

如你所见。我仅使用 mSubscription.subscribe() 和 mSubscription.unsubscribe() 并仅在 saveInstanceState 为 null 时创建 Observable。

查看 rx.subjects.BehaviorSubject<T>。如果您不熟悉 rx.subjects.Subjects,我能想到的最通用的描述方式是它们打破了 A 点和 B 点之间订阅的连续性。如何同时成为 Observer<T>;可以接受来自多个来源的 onNext()s(警告:需要外部线程安全)。另一方面,一个主题也是一个 Observable<T>,因此多个 Observer<T> 可以订阅,并且 onNext() 进入将被多播到每个下游 Observer<T>.

如果你的代码看起来像

Observable<T> src = ...;
Subscriber<T> dst;
src.subscribe(dst);

使用BehaviorSubject的方法是

Observable<T> src = ...;
BehaviorSubject<T> subject = BehaviorSubject.create(defaultValue);
src.subscribe(subject);

立即订阅源,主题将以最快的速度呈现。 BehaviorSubject 只保留最近的值并删除 defaultValue 和所有以前的值。

// safe to do multiple times.
Subscriber<T> dst;
subject.subscribe(dst);

订阅 dst 在订阅后立即从 src(或 defaultValue)接收最新值,然后是所有后续值,直到 dst 取消订阅。

警告:受试者有过度使用的倾向,所以确保你需要一个。