为什么点击事件这里需要onBackpressure()?

Why does it need onBackpressure() here for click events?

我正在尝试对按钮点击绑定的某些操作进行错误处理。对于绑定,我使用 RxAndroid+RxAndroid。似乎它必须与下面的代码一起使用,但它不适用于带有 onBackpressure() 的注释行:

CurrentUser signIn() {
    throw new RuntimeException();
}
Integer x = 1;
PublishSubject<Throwable> loginingFailedSubject = PublishSubject.create();

@Override
public void onStart() {
    super.onStart();
    RxView.clicks(loginButton)
            .observeOn(AndroidSchedulers.mainThread())
            .doOnNext((v) -> setLoginingWaiting())

            .observeOn(Schedulers.newThread())
            .map((v) -> signIn())
            .lift(new SuppressErrorOperator<>(throwable -> {
                Log.e("MyTag", "Oops, failed " + x.toString() + " times!");
                ++x;
                loginingFailedSubject.onNext(throwable);
            }))
            //.onBackpressureBuffer()

            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(user -> setLoginedUser(user));

    loginingFailedSubject
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(throwable -> setLoginingFailed(throwable));
}

这是 SuppressErrorOperator 代码:

public final class SuppressErrorOperator<T> implements 

Observable.Operator<T, T> {
    final Action1<Throwable> errorHandler;

    public SuppressErrorOperator(Action1<Throwable> errorHandler) {
        this.errorHandler = errorHandler;
    }

    public SuppressErrorOperator() {this(null);}

    @Override
    public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
        return new Subscriber<T>(subscriber) {
            @Override
            public void onCompleted() {
                subscriber.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                if (errorHandler != null) {
                    errorHandler.call(e);
                }
            }

            @Override
            public void onNext(T t) {
                subscriber.onNext(t);
            }
        };
    }
}

这就是我在 logcat 最后点击 100 次后得到的结果: Oops, failed 16 times!

它在 exaclty 16 次后停止,在 17 日,它运行到 setLoginingWaiting()(我看到了,因为这个方法禁用了按钮,这也意味着,每个请求没有人可以点击超过 1 次。或接近那个数)和.. 仅此而已。好像根本没有达到 .lift()

但如果我取消注释 .onBackpressureBuffer(),它现在就可以完美运行了!我读了很多关于背压的文章。我什至花了一整天的时间来理解 ObservableSubscriber e.t.c 的源代码。

我知道,16 - 是 Android 缓冲区的固定大小。但是为什么会中招呢?我不经常点击按钮。另外,根本就没有onNext(),所以缓冲区在任何情况下都不能超过!所有 onError()Operator.

吞噬

我也知道 observeOn() 通过 pull 协议工作,所以它内部想使用 request()。如果我在 .subscribe(user -> setLoginedUser(user)); 之前最后评论 observeOn() - 它也会起作用(但当然,这是不可接受的)。

但是为什么需要 onBackpressure() 活着呢?还有,为什么它会毫无例外地死去,比如MissingBackpressureException之类的?

OperatorObserveOn 有一个大小为 RxRingBuffer.SIZE 的队列(android 上为 16),如果超过此队列的大小,将抛出 MissingBackpressure 异常。

通常为了避免背压问题,您可以限制切换线程、限制事件发射(限制、缓冲区等)或使用 onBackpressureXXX 运算符。

虽然在你的情况下看起来像 - 登录按钮 - 你一次只需要处理一个请求,那么为什么不用 ProgressBar 隐藏按钮或在请求时设置 enable(false)

我想 OperatorObserveOn::ObserveOnSubscriber 中有一个错误。

在 init 方法中,它设置新的生产者并增加 requestedObserveOnSubscriber)内部订阅者的(child)requested。但是当 ObserveOnSubscriber 是 child 时,它有一个默认值(在 Android -- 16 的情况下) Subscriber::requestedrequested 的实际值是ObserveOnSubscriber::requested.
问题是 Subscriber::requested 被使用并且没有更新。所以解决方法是重写 ObserveOnSubscriber 的 init 方法(我刚刚复制了 OperatorObserveOn 的源代码并在与 RxJava 中同名的包中创建了它的副本)。

    void init() {
        // don't want this code in the constructor because `this` can escape through the 
        // setProducer call
        Subscriber<? super T> localChild = child;

        localChild.setProducer(new Producer() {

            @Override
            public void request(long n) {
                if (n > 0L) {
                    BackpressureUtils.getAndAddRequest(requested, n);
                    ObserveOnSubscriber.this.request(requested.get());
                    schedule();
                }
            }

        });
        localChild.add(recursiveScheduler);
        localChild.add(this);
    }

我还在 github 上创建了 issue

问题是您干扰了流的生命周期。 map 崩溃,但您抑制了异常并且没有值发送到下游。如果下游没有得到任何值,它就不知道应该请求更多,因此整个序列停滞,让缓冲区填满。 onBackpressureBuffer 之所以有效,是因为它请求 Long.MAX_VALUE 并保留来源 运行。

但是请注意,map 不应该像现在这样工作,而是在函数出现第一个错误迹象时取消订阅。

正确的选择是:

RxView.clicks(loginButton)
.observeOn(AndroidSchedulers.mainThread())
.doOnNext((v) -> setLoginingWaiting())
.observeOn(Schedulers.newThread())
.flatMap(v -> {
    try {
        return Observable.just(signIn());
    } catch (Throwable ex) {
        Log.e("MyTag", "Oops, failed " + x.toString() + " times!");
        ++x;
        loginingFailedSubject.onNext(ex);
        return Observable.empty();
    }
 })
 .observeOn(AndroidSchedulers.mainThread())
 .subscribe(user -> setLoginedUser(user));