为什么在使用 backpreasurebuffer 时会出现 MissingBackpreasureException?
Why do I get a MissingBackpreasureException while using a backpreasurebuffer?
我想在 RxJava 中实现一个处理队列来下载一些文件。我要下载的文件数量可能会达到 100 个左右。
一切都是在 Android 上使用 RxJava 1.1.1
开发的
我当前的实现看起来像这样:
PublishSubject<URL> publishSubject = PublishSubject.create();
_subject = new SerializedSubject<>(publishSubject);
_subscription = _subject
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(_getObserver()); // Observer
_subject.onBackpressureBuffer(10000, new Action0() {
@Override
public void call() {
Log.d(TAG, "onBackpressureBuffer called");
return;
}
});
// download a file
_subject.onNext(aValidURL);
其中 _getObserver()
returns 一个新的观察者对象,它通过 "onNext" 方法下载到文件中。
但是,我的问题是我很快得到一个 MissingBackpreasureException
,我不明白。我尝试实现 backpreasurebuffer
,但它似乎并没有被调用。
我做错了什么?
在 RxJava 中,当你应用一个运算符时,你会得到一个新的 Observable 实例,它具有修改后的行为,但原始实例保持不变。在这里,您在 _subject
上调用了 onBackpressureBuffer
但随后不使用其结果,否则该调用不执行任何操作。您需要按顺序应用:
PublishSubject<URL> publishSubject = PublishSubject.create();
_subject = new SerializedSubject<>(publishSubject);
_subscription = _subject
.onBackpressureBuffer(10000, new Action0() {
@Override
public void call() {
Log.d(TAG, "onBackpressureBuffer called");
return;
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(_getObserver()); // Observer
// download a file
_subject.onNext(aValidURL);
我想在 RxJava 中实现一个处理队列来下载一些文件。我要下载的文件数量可能会达到 100 个左右。
一切都是在 Android 上使用 RxJava 1.1.1
开发的我当前的实现看起来像这样:
PublishSubject<URL> publishSubject = PublishSubject.create();
_subject = new SerializedSubject<>(publishSubject);
_subscription = _subject
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(_getObserver()); // Observer
_subject.onBackpressureBuffer(10000, new Action0() {
@Override
public void call() {
Log.d(TAG, "onBackpressureBuffer called");
return;
}
});
// download a file
_subject.onNext(aValidURL);
其中 _getObserver()
returns 一个新的观察者对象,它通过 "onNext" 方法下载到文件中。
但是,我的问题是我很快得到一个 MissingBackpreasureException
,我不明白。我尝试实现 backpreasurebuffer
,但它似乎并没有被调用。
我做错了什么?
在 RxJava 中,当你应用一个运算符时,你会得到一个新的 Observable 实例,它具有修改后的行为,但原始实例保持不变。在这里,您在 _subject
上调用了 onBackpressureBuffer
但随后不使用其结果,否则该调用不执行任何操作。您需要按顺序应用:
PublishSubject<URL> publishSubject = PublishSubject.create();
_subject = new SerializedSubject<>(publishSubject);
_subscription = _subject
.onBackpressureBuffer(10000, new Action0() {
@Override
public void call() {
Log.d(TAG, "onBackpressureBuffer called");
return;
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(_getObserver()); // Observer
// download a file
_subject.onNext(aValidURL);