为什么点击事件这里需要onBackpressure()?
Why does it need onBackpressure() here for click events?
我正在尝试对按钮点击绑定的某些操作进行错误处理。对于绑定,我使用 RxAndroid+RxAndroid。似乎它必须与下面的代码一起使用,但它不适用于带有 onBackpressure()
的注释行:
CurrentUser signIn() {
throw new RuntimeException();
}
Integer x = 1;
PublishSubject<Throwable> loginingFailedSubject = PublishSubject.create();
@Override
public void onStart() {
super.onStart();
RxView.clicks(loginButton)
.observeOn(AndroidSchedulers.mainThread())
.doOnNext((v) -> setLoginingWaiting())
.observeOn(Schedulers.newThread())
.map((v) -> signIn())
.lift(new SuppressErrorOperator<>(throwable -> {
Log.e("MyTag", "Oops, failed " + x.toString() + " times!");
++x;
loginingFailedSubject.onNext(throwable);
}))
//.onBackpressureBuffer()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(user -> setLoginedUser(user));
loginingFailedSubject
.observeOn(AndroidSchedulers.mainThread())
.subscribe(throwable -> setLoginingFailed(throwable));
}
这是 SuppressErrorOperator
代码:
public final class SuppressErrorOperator<T> implements
Observable.Operator<T, T> {
final Action1<Throwable> errorHandler;
public SuppressErrorOperator(Action1<Throwable> errorHandler) {
this.errorHandler = errorHandler;
}
public SuppressErrorOperator() {this(null);}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
return new Subscriber<T>(subscriber) {
@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
if (errorHandler != null) {
errorHandler.call(e);
}
}
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
};
}
}
这就是我在 logcat 最后点击 100 次后得到的结果:
Oops, failed 16 times!
它在 exaclty 16 次后停止,在 17 日,它运行到 setLoginingWaiting()
(我看到了,因为这个方法禁用了按钮,这也意味着,每个请求没有人可以点击超过 1 次。或接近那个数)和.. 仅此而已。好像根本没有达到 .lift()
。
但如果我取消注释 .onBackpressureBuffer()
,它现在就可以完美运行了!我读了很多关于背压的文章。我什至花了一整天的时间来理解 Observable
、Subscriber
e.t.c 的源代码。
我知道,16 - 是 Android 缓冲区的固定大小。但是为什么会中招呢?我不经常点击按钮。另外,根本就没有onNext()
,所以缓冲区在任何情况下都不能超过!所有 onError()
被 Operator
.
吞噬
我也知道 observeOn()
通过 pull 协议工作,所以它内部想使用 request()
。如果我在 .subscribe(user -> setLoginedUser(user));
之前最后评论 observeOn()
- 它也会起作用(但当然,这是不可接受的)。
但是为什么需要 onBackpressure()
活着呢?还有,为什么它会毫无例外地死去,比如MissingBackpressureException
之类的?
OperatorObserveOn 有一个大小为 RxRingBuffer.SIZE 的队列(android 上为 16),如果超过此队列的大小,将抛出 MissingBackpressure 异常。
通常为了避免背压问题,您可以限制切换线程、限制事件发射(限制、缓冲区等)或使用 onBackpressureXXX 运算符。
虽然在你的情况下看起来像 - 登录按钮 - 你一次只需要处理一个请求,那么为什么不用 ProgressBar 隐藏按钮或在请求时设置 enable(false)
我想 OperatorObserveOn::ObserveOnSubscriber
中有一个错误。
在 init 方法中,它设置新的生产者并增加 requested
(ObserveOnSubscriber
)内部订阅者的(child)requested
。但是当 ObserveOnSubscriber
是 child 时,它有一个默认值(在 Android -- 16 的情况下) Subscriber::requested
而 requested
的实际值是ObserveOnSubscriber::requested
.
问题是 Subscriber::requested
被使用并且没有更新。所以解决方法是重写 ObserveOnSubscriber
的 init 方法(我刚刚复制了 OperatorObserveOn 的源代码并在与 RxJava 中同名的包中创建了它的副本)。
void init() {
// don't want this code in the constructor because `this` can escape through the
// setProducer call
Subscriber<? super T> localChild = child;
localChild.setProducer(new Producer() {
@Override
public void request(long n) {
if (n > 0L) {
BackpressureUtils.getAndAddRequest(requested, n);
ObserveOnSubscriber.this.request(requested.get());
schedule();
}
}
});
localChild.add(recursiveScheduler);
localChild.add(this);
}
我还在 github 上创建了 issue。
问题是您干扰了流的生命周期。 map
崩溃,但您抑制了异常并且没有值发送到下游。如果下游没有得到任何值,它就不知道应该请求更多,因此整个序列停滞,让缓冲区填满。 onBackpressureBuffer
之所以有效,是因为它请求 Long.MAX_VALUE 并保留来源 运行。
但是请注意,map
不应该像现在这样工作,而是在函数出现第一个错误迹象时取消订阅。
正确的选择是:
RxView.clicks(loginButton)
.observeOn(AndroidSchedulers.mainThread())
.doOnNext((v) -> setLoginingWaiting())
.observeOn(Schedulers.newThread())
.flatMap(v -> {
try {
return Observable.just(signIn());
} catch (Throwable ex) {
Log.e("MyTag", "Oops, failed " + x.toString() + " times!");
++x;
loginingFailedSubject.onNext(ex);
return Observable.empty();
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(user -> setLoginedUser(user));
我正在尝试对按钮点击绑定的某些操作进行错误处理。对于绑定,我使用 RxAndroid+RxAndroid。似乎它必须与下面的代码一起使用,但它不适用于带有 onBackpressure()
的注释行:
CurrentUser signIn() {
throw new RuntimeException();
}
Integer x = 1;
PublishSubject<Throwable> loginingFailedSubject = PublishSubject.create();
@Override
public void onStart() {
super.onStart();
RxView.clicks(loginButton)
.observeOn(AndroidSchedulers.mainThread())
.doOnNext((v) -> setLoginingWaiting())
.observeOn(Schedulers.newThread())
.map((v) -> signIn())
.lift(new SuppressErrorOperator<>(throwable -> {
Log.e("MyTag", "Oops, failed " + x.toString() + " times!");
++x;
loginingFailedSubject.onNext(throwable);
}))
//.onBackpressureBuffer()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(user -> setLoginedUser(user));
loginingFailedSubject
.observeOn(AndroidSchedulers.mainThread())
.subscribe(throwable -> setLoginingFailed(throwable));
}
这是 SuppressErrorOperator
代码:
public final class SuppressErrorOperator<T> implements
Observable.Operator<T, T> {
final Action1<Throwable> errorHandler;
public SuppressErrorOperator(Action1<Throwable> errorHandler) {
this.errorHandler = errorHandler;
}
public SuppressErrorOperator() {this(null);}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
return new Subscriber<T>(subscriber) {
@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
if (errorHandler != null) {
errorHandler.call(e);
}
}
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
};
}
}
这就是我在 logcat 最后点击 100 次后得到的结果:
Oops, failed 16 times!
它在 exaclty 16 次后停止,在 17 日,它运行到 setLoginingWaiting()
(我看到了,因为这个方法禁用了按钮,这也意味着,每个请求没有人可以点击超过 1 次。或接近那个数)和.. 仅此而已。好像根本没有达到 .lift()
。
但如果我取消注释 .onBackpressureBuffer()
,它现在就可以完美运行了!我读了很多关于背压的文章。我什至花了一整天的时间来理解 Observable
、Subscriber
e.t.c 的源代码。
我知道,16 - 是 Android 缓冲区的固定大小。但是为什么会中招呢?我不经常点击按钮。另外,根本就没有onNext()
,所以缓冲区在任何情况下都不能超过!所有 onError()
被 Operator
.
我也知道 observeOn()
通过 pull 协议工作,所以它内部想使用 request()
。如果我在 .subscribe(user -> setLoginedUser(user));
之前最后评论 observeOn()
- 它也会起作用(但当然,这是不可接受的)。
但是为什么需要 onBackpressure()
活着呢?还有,为什么它会毫无例外地死去,比如MissingBackpressureException
之类的?
OperatorObserveOn 有一个大小为 RxRingBuffer.SIZE 的队列(android 上为 16),如果超过此队列的大小,将抛出 MissingBackpressure 异常。
通常为了避免背压问题,您可以限制切换线程、限制事件发射(限制、缓冲区等)或使用 onBackpressureXXX 运算符。
虽然在你的情况下看起来像 - 登录按钮 - 你一次只需要处理一个请求,那么为什么不用 ProgressBar 隐藏按钮或在请求时设置 enable(false)
我想 OperatorObserveOn::ObserveOnSubscriber
中有一个错误。
在 init 方法中,它设置新的生产者并增加 requested
(ObserveOnSubscriber
)内部订阅者的(child)requested
。但是当 ObserveOnSubscriber
是 child 时,它有一个默认值(在 Android -- 16 的情况下) Subscriber::requested
而 requested
的实际值是ObserveOnSubscriber::requested
.
问题是 Subscriber::requested
被使用并且没有更新。所以解决方法是重写 ObserveOnSubscriber
的 init 方法(我刚刚复制了 OperatorObserveOn 的源代码并在与 RxJava 中同名的包中创建了它的副本)。
void init() {
// don't want this code in the constructor because `this` can escape through the
// setProducer call
Subscriber<? super T> localChild = child;
localChild.setProducer(new Producer() {
@Override
public void request(long n) {
if (n > 0L) {
BackpressureUtils.getAndAddRequest(requested, n);
ObserveOnSubscriber.this.request(requested.get());
schedule();
}
}
});
localChild.add(recursiveScheduler);
localChild.add(this);
}
我还在 github 上创建了 issue。
问题是您干扰了流的生命周期。 map
崩溃,但您抑制了异常并且没有值发送到下游。如果下游没有得到任何值,它就不知道应该请求更多,因此整个序列停滞,让缓冲区填满。 onBackpressureBuffer
之所以有效,是因为它请求 Long.MAX_VALUE 并保留来源 运行。
但是请注意,map
不应该像现在这样工作,而是在函数出现第一个错误迹象时取消订阅。
正确的选择是:
RxView.clicks(loginButton)
.observeOn(AndroidSchedulers.mainThread())
.doOnNext((v) -> setLoginingWaiting())
.observeOn(Schedulers.newThread())
.flatMap(v -> {
try {
return Observable.just(signIn());
} catch (Throwable ex) {
Log.e("MyTag", "Oops, failed " + x.toString() + " times!");
++x;
loginingFailedSubject.onNext(ex);
return Observable.empty();
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(user -> setLoginedUser(user));