FlowableOperator 本身是否支持背压?
Does a FlowableOperator inherently supports backpressure?
我已经按照 RxJava2 wiki (https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#operator-targeting-lift) 中的描述实现了 FlowableOperator
除了我在 onNext()
操作中执行了一些类似这样的测试:
public final class MyOperator implements FlowableOperator<Integer, Integer> {
...
static final class Op implements FlowableSubscriber<Integer>, Subscription {
@Override
public void onNext(Integer v) {
if (v % 2 == 0) {
child.onNext(v * v);
}
}
...
}
}
这个操作符是链的一部分,我在其中创建了一个 Flowable
背压下降。本质上,它看起来几乎是这样的:
Flowable.<Integer>create(emitter -> myAction(), DROP)
.filter(v -> v > 2)
.lift(new MyOperator())
.subscribe(n -> doSomething(n));
我遇到了以下问题:
- 出现背压,所以
doSomething(n)
无法处理即将到来的上游
- 由于选择了背压策略,项目被丢弃
- 但是 doSomething(n) 在执行 drop 并且 doSomething(n) 已准备好处理新项目后从未收到新项目
回看了David Karnok的精彩博客post http://akarnokd.blogspot.fr/2015/05/pitfalls-of-operator-implementations.html,看来我需要在onNext()
方法中加一个request(1)
。但那是 RxJava1...
所以,我的问题是:RxJava2 中的这个修复是否足以处理我的背压问题?或者我的操作员是否必须实施所有关于 Atomics 的东西,排出 https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#atomics-serialization-deferred-actions 中描述的东西才能正确处理我的背压问题?
注意:我添加了 request(1)
并且它似乎有效。但我无法弄清楚这是否足够,或者我的操作员是否需要队列排空和原子学等棘手的东西。
提前致谢!
是的,你必须做一些棘手的事情...
我会避免编写运算符,除非您非常确定自己在做什么?几乎所有的事情都可以用默认的运算符来实现...
Writing operators, source-like (fromEmitter) or intermediate-like
(flatMap) has always been a hard task to do in RxJava. There are many
rules to obey, many cases to consider but at the same time, many
(legal) shortcuts to take to build a well performing code. Now writing
an operator specifically for 2.x is 10 times harder than for 1.x. If
you want to exploit all the advanced, 4th generation features, that's
even 2-3 times harder on top (so 30 times harder in total).
解释了一些棘手的事情:https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0
Does a FlowableOperator inherently supports backpressure?
FlowableOperator
是为给定下游 Subscriber
调用的接口,应该 return 一个新的 Subscriber
包装下游并调制传递的 Reactive Streams 事件在一个或两个方向上。背压支持是 Subscriber
实现的责任,而不是这个特定的功能接口。它本来可以 Function<Subscriber, Subscriber>
但一个单独的命名接口被认为更有用并且不太容易发生重载冲突。
need to add a request(1) in the onNext() [...]
But I can't figure out whether it's enough or whether my operator needs the tricky stuff of queue-drain and atomics.
是的,你也必须在 RxJava 2 中这样做。由于 RxJava 2 的 Subscriber
不是 class,它没有 v1 的便利 request
方法。您必须在 onSubscribe
中保存 Subscription
并在 onNext
中的适当路径上调用 upstream.request(1)
。对于你的情况,应该足够了。
我已经用一个新的部分更新了 wiki 来明确解释这个案例:
https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#replenishing
final class FilterOddSubscriber implements FlowableSubscriber<Integer>, Subscription {
final Subscriber<? super Integer> downstream;
Subscription upstream;
// ...
@Override
public void onSubscribe(Subscription s) {
if (upstream != null) {
s.cancel();
} else {
upstream = s; // <-------------------------
downstream.onSubscribe(this);
}
}
@Override
public void onNext(Integer item) {
if (item % 2 != 0) {
downstream.onNext(item);
} else {
upstream.request(1); // <-------------------------
}
}
@Override
public void request(long n) {
upstream.request(n);
}
// the rest omitted for brevity
}
我已经按照 RxJava2 wiki (https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#operator-targeting-lift) 中的描述实现了 FlowableOperator
除了我在 onNext()
操作中执行了一些类似这样的测试:
public final class MyOperator implements FlowableOperator<Integer, Integer> {
...
static final class Op implements FlowableSubscriber<Integer>, Subscription {
@Override
public void onNext(Integer v) {
if (v % 2 == 0) {
child.onNext(v * v);
}
}
...
}
}
这个操作符是链的一部分,我在其中创建了一个 Flowable
背压下降。本质上,它看起来几乎是这样的:
Flowable.<Integer>create(emitter -> myAction(), DROP)
.filter(v -> v > 2)
.lift(new MyOperator())
.subscribe(n -> doSomething(n));
我遇到了以下问题:
- 出现背压,所以
doSomething(n)
无法处理即将到来的上游 - 由于选择了背压策略,项目被丢弃
- 但是 doSomething(n) 在执行 drop 并且 doSomething(n) 已准备好处理新项目后从未收到新项目
回看了David Karnok的精彩博客post http://akarnokd.blogspot.fr/2015/05/pitfalls-of-operator-implementations.html,看来我需要在onNext()
方法中加一个request(1)
。但那是 RxJava1...
所以,我的问题是:RxJava2 中的这个修复是否足以处理我的背压问题?或者我的操作员是否必须实施所有关于 Atomics 的东西,排出 https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#atomics-serialization-deferred-actions 中描述的东西才能正确处理我的背压问题?
注意:我添加了 request(1)
并且它似乎有效。但我无法弄清楚这是否足够,或者我的操作员是否需要队列排空和原子学等棘手的东西。
提前致谢!
是的,你必须做一些棘手的事情...
我会避免编写运算符,除非您非常确定自己在做什么?几乎所有的事情都可以用默认的运算符来实现...
Writing operators, source-like (fromEmitter) or intermediate-like (flatMap) has always been a hard task to do in RxJava. There are many rules to obey, many cases to consider but at the same time, many (legal) shortcuts to take to build a well performing code. Now writing an operator specifically for 2.x is 10 times harder than for 1.x. If you want to exploit all the advanced, 4th generation features, that's even 2-3 times harder on top (so 30 times harder in total).
解释了一些棘手的事情:https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0
Does a FlowableOperator inherently supports backpressure?
FlowableOperator
是为给定下游 Subscriber
调用的接口,应该 return 一个新的 Subscriber
包装下游并调制传递的 Reactive Streams 事件在一个或两个方向上。背压支持是 Subscriber
实现的责任,而不是这个特定的功能接口。它本来可以 Function<Subscriber, Subscriber>
但一个单独的命名接口被认为更有用并且不太容易发生重载冲突。
need to add a request(1) in the onNext() [...] But I can't figure out whether it's enough or whether my operator needs the tricky stuff of queue-drain and atomics.
是的,你也必须在 RxJava 2 中这样做。由于 RxJava 2 的 Subscriber
不是 class,它没有 v1 的便利 request
方法。您必须在 onSubscribe
中保存 Subscription
并在 onNext
中的适当路径上调用 upstream.request(1)
。对于你的情况,应该足够了。
我已经用一个新的部分更新了 wiki 来明确解释这个案例:
https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#replenishing
final class FilterOddSubscriber implements FlowableSubscriber<Integer>, Subscription {
final Subscriber<? super Integer> downstream;
Subscription upstream;
// ...
@Override
public void onSubscribe(Subscription s) {
if (upstream != null) {
s.cancel();
} else {
upstream = s; // <-------------------------
downstream.onSubscribe(this);
}
}
@Override
public void onNext(Integer item) {
if (item % 2 != 0) {
downstream.onNext(item);
} else {
upstream.request(1); // <-------------------------
}
}
@Override
public void request(long n) {
upstream.request(n);
}
// the rest omitted for brevity
}