RxJava2 Flowable.create blockingSubscribe 语义与 BackpressureStrategy

RxJava2 Flowable.create blockingSubscribe semantics with BackpressureStrategy

我在理解 Flowable BackpressureStrategy 如何与 blockingSubscribe 方法一起工作时遇到了一些困难 - 或者这对我来说似乎出乎意料,如果有人能向我解释一下,我将不胜感激。

我正在当前主干的 FlowableTests 文件中测试这段代码。

@Test
public void testCreateBackpressureDrop() {
    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(3);
            e.onNext(4);
            e.onComplete();
        }
    }, BackpressureStrategy.DROP).blockingSubscribe(w);

    verify(w, times(1)).onNext(1);
    verify(w, times(1)).onNext(3);
    verify(w, times(1)).onNext(4);
    verify(w, times(1)).onComplete();
}

如果我使用 subscribe(w)BackpressureStragegy.DROPBackpressure.BUFFER 测试通过。但是,如果我使用 blockingSubscribe(w)Backpressure.BUFFER 通过,但 Backpressure.DROP 失败,表示 onNext(1) 从未被调用。

谢谢!

这是使用 Mockito 模拟 Subscriber 的典型问题:您必须在其 onSubscribe 中调用 request(N):

@SuppressWarnings("unchecked")
public static <T> Subscriber<T> mockSubscriber() {
    Subscriber<T> w = mock(Subscriber.class);

    Mockito.doAnswer(new Answer<Object>() {
        @Override
        public Object answer(InvocationOnMock a) throws Throwable {
            Subscription s = a.getArgument(0);
            s.request(Long.MAX_VALUE);
            return null;
        }
    }).when(w).onSubscribe((Subscription)any());

    return w;
}

blockingSubscribe 的复杂之处在于它在 FlowableOnSubscribe 具有 运行.

之后执行上面的 w.onSubscribe