为什么在使用 backpreasurebuffer 时会出现 MissingBackpreasureException?

Why do I get a MissingBackpreasureException while using a backpreasurebuffer?

我想在 RxJava 中实现一个处理队列来下载一些文件。我要下载的文件数量可能会达到 100 个左右。

一切都是在 Android 上使用 RxJava 1.1.1

开发的

我当前的实现看起来像这样:

PublishSubject<URL> publishSubject = PublishSubject.create();
_subject = new SerializedSubject<>(publishSubject);

_subscription = _subject
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .subscribe(_getObserver());  // Observer

_subject.onBackpressureBuffer(10000, new Action0() {
    @Override
    public void call() {
        Log.d(TAG, "onBackpressureBuffer called");
        return;
    }
});

// download a file
_subject.onNext(aValidURL);

其中 _getObserver() returns 一个新的观察者对象,它通过 "onNext" 方法下载到文件中。

但是,我的问题是我很快得到一个 MissingBackpreasureException,我不明白。我尝试实现 backpreasurebuffer,但它似乎并没有被调用。

我做错了什么?

在 RxJava 中,当你应用一个运算符时,你会得到一个新的 Observable 实例,它具有修改后的行为,但原始实例保持不变。在这里,您在 _subject 上调用了 onBackpressureBuffer 但随后不使用其结果,否则该调用不执行任何操作。您需要按顺序应用:

PublishSubject<URL> publishSubject = PublishSubject.create();
_subject = new SerializedSubject<>(publishSubject);

_subscription = _subject
                .onBackpressureBuffer(10000, new Action0() {
                    @Override
                    public void call() {
                        Log.d(TAG, "onBackpressureBuffer called");
                        return;
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .subscribe(_getObserver());  // Observer

// download a file
_subject.onNext(aValidURL);