RxJava2 Flowable.create blockingSubscribe 语义与 BackpressureStrategy
RxJava2 Flowable.create blockingSubscribe semantics with BackpressureStrategy
我在理解 Flowable BackpressureStrategy 如何与 blockingSubscribe 方法一起工作时遇到了一些困难 - 或者这对我来说似乎出乎意料,如果有人能向我解释一下,我将不胜感激。
我正在当前主干的 FlowableTests
文件中测试这段代码。
@Test
public void testCreateBackpressureDrop() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(3);
e.onNext(4);
e.onComplete();
}
}, BackpressureStrategy.DROP).blockingSubscribe(w);
verify(w, times(1)).onNext(1);
verify(w, times(1)).onNext(3);
verify(w, times(1)).onNext(4);
verify(w, times(1)).onComplete();
}
如果我使用 subscribe(w)
和 BackpressureStragegy.DROP
或 Backpressure.BUFFER
测试通过。但是,如果我使用 blockingSubscribe(w)
,Backpressure.BUFFER
通过,但 Backpressure.DROP
失败,表示 onNext(1)
从未被调用。
谢谢!
这是使用 Mockito 模拟 Subscriber
的典型问题:您必须在其 onSubscribe
中调用 request(N)
:
@SuppressWarnings("unchecked")
public static <T> Subscriber<T> mockSubscriber() {
Subscriber<T> w = mock(Subscriber.class);
Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock a) throws Throwable {
Subscription s = a.getArgument(0);
s.request(Long.MAX_VALUE);
return null;
}
}).when(w).onSubscribe((Subscription)any());
return w;
}
blockingSubscribe
的复杂之处在于它在 FlowableOnSubscribe
具有 运行.
之后执行上面的 w.onSubscribe
我在理解 Flowable BackpressureStrategy 如何与 blockingSubscribe 方法一起工作时遇到了一些困难 - 或者这对我来说似乎出乎意料,如果有人能向我解释一下,我将不胜感激。
我正在当前主干的 FlowableTests
文件中测试这段代码。
@Test
public void testCreateBackpressureDrop() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(3);
e.onNext(4);
e.onComplete();
}
}, BackpressureStrategy.DROP).blockingSubscribe(w);
verify(w, times(1)).onNext(1);
verify(w, times(1)).onNext(3);
verify(w, times(1)).onNext(4);
verify(w, times(1)).onComplete();
}
如果我使用 subscribe(w)
和 BackpressureStragegy.DROP
或 Backpressure.BUFFER
测试通过。但是,如果我使用 blockingSubscribe(w)
,Backpressure.BUFFER
通过,但 Backpressure.DROP
失败,表示 onNext(1)
从未被调用。
谢谢!
这是使用 Mockito 模拟 Subscriber
的典型问题:您必须在其 onSubscribe
中调用 request(N)
:
@SuppressWarnings("unchecked")
public static <T> Subscriber<T> mockSubscriber() {
Subscriber<T> w = mock(Subscriber.class);
Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock a) throws Throwable {
Subscription s = a.getArgument(0);
s.request(Long.MAX_VALUE);
return null;
}
}).when(w).onSubscribe((Subscription)any());
return w;
}
blockingSubscribe
的复杂之处在于它在 FlowableOnSubscribe
具有 运行.
w.onSubscribe