FlowableOperator 本身是否支持背压?

Does a FlowableOperator inherently supports backpressure?

我已经按照 RxJava2 wiki (https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#operator-targeting-lift) 中的描述实现了 FlowableOperator 除了我在 onNext() 操作中执行了一些类似这样的测试:

public final class MyOperator implements FlowableOperator<Integer, Integer> {

...

static final class Op implements FlowableSubscriber<Integer>, Subscription {

    @Override
    public void onNext(Integer v) {
        if (v % 2 == 0) {
          child.onNext(v * v);
        }  
    }
   ...
  }
}

这个操作符是链的一部分,我在其中创建了一个 Flowable 背压下降。本质上,它看起来几乎是这样的:

Flowable.<Integer>create(emitter -> myAction(), DROP)
        .filter(v -> v > 2)
        .lift(new MyOperator())
        .subscribe(n -> doSomething(n));

我遇到了以下问题:

回看了David Karnok的精彩博客post http://akarnokd.blogspot.fr/2015/05/pitfalls-of-operator-implementations.html,看来我需要在onNext()方法中加一个request(1)。但那是 RxJava1...

所以,我的问题是:RxJava2 中的这个修复是否足以处理我的背压问题?或者我的操作员是否必须实施所有关于 Atomics 的东西,排出 https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#atomics-serialization-deferred-actions 中描述的东西才能正确处理我的背压问题?

注意:我添加了 request(1) 并且它似乎有效。但我无法弄清楚这是否足够,或者我的操作员是否需要队列排空和原子学等棘手的东西。

提前致谢!

是的,你必须做一些棘手的事情...

我会避免编写运算符,除非您非常确定自己在做什么?几乎所有的事情都可以用默认的运算符来实现...

Writing operators, source-like (fromEmitter) or intermediate-like (flatMap) has always been a hard task to do in RxJava. There are many rules to obey, many cases to consider but at the same time, many (legal) shortcuts to take to build a well performing code. Now writing an operator specifically for 2.x is 10 times harder than for 1.x. If you want to exploit all the advanced, 4th generation features, that's even 2-3 times harder on top (so 30 times harder in total).

解释了一些棘手的事情:https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0

Does a FlowableOperator inherently supports backpressure?

FlowableOperator 是为给定下游 Subscriber 调用的接口,应该 return 一个新的 Subscriber 包装下游并调制传递的 Reactive Streams 事件在一个或两个方向上。背压支持是 Subscriber 实现的责任,而不是这个特定的功能接口。它本来可以 Function<Subscriber, Subscriber> 但一个单独的命名接口被认为更有用并且不太容易发生重载冲突。

need to add a request(1) in the onNext() [...] But I can't figure out whether it's enough or whether my operator needs the tricky stuff of queue-drain and atomics.

是的,你也必须在 RxJava 2 中这样做。由于 RxJava 2 的 Subscriber 不是 class,它没有 v1 的便利 request 方法。您必须在 onSubscribe 中保存 Subscription 并在 onNext 中的适当路径上调用 upstream.request(1)。对于你的情况,应该足够了。

我已经用一个新的部分更新了 wiki 来明确解释这个案例:

https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#replenishing

final class FilterOddSubscriber implements FlowableSubscriber<Integer>, Subscription {

    final Subscriber<? super Integer> downstream;

    Subscription upstream;

    // ...

    @Override
    public void onSubscribe(Subscription s) {
        if (upstream != null) {
            s.cancel();
        } else {
            upstream = s;                    // <-------------------------
            downstream.onSubscribe(this);
        }
    }

    @Override
    public void onNext(Integer item) {
        if (item % 2 != 0) {
           downstream.onNext(item);
        } else {
           upstream.request(1);              // <-------------------------
        }
    }

    @Override
    public void request(long n) {
        upstream.request(n);
    }

    // the rest omitted for brevity
}