什么会导致 SingleEmitter.onSuccess() 生成 NoSuchElement 异常?

What would cause SingleEmitter.onSuccess() to generate a NoSuchElement exception?

我有一个 Single 流程组织如下:

getSomething() // returns Single<>
.flatMap(something -> {
   // various things
   return Single.defer( () -> {
     // various other things
     return Single.<SomeType>create(emitter -> {
        // some more stuff
        someCallbackApi(result -> {
           if (result.isError()) {
             emitter.onError( result.getCause() );
           } else {
             // guaranteed non-null data
             emitter.onSuccess( result.getData() ); // this generates NoSuchElement
           }
        });
     });
   })
   .retryWhen( ... )
   .flatMap( data -> handle(data) )
   .retryWhen( ... );
})
.retryWhen( ... )
.onErrorResumeNext(error -> process(error))
.subscribe(data -> handleSuccess(data), error -> handleError(error));

在测试用例中,回调api Single成功重试多次(由测试用例决定),每次在最后一次重试时,调用emitter.onSuccess() 生成以下异常。到底是怎么回事?我一直无法重组或更改下游运营商或订阅者来避免这个问题。

java.util.NoSuchElementException: null
    at io.reactivex.internal.operators.flowable.FlowableSingleSingle$SingleElementSubscriber.onComplete(FlowableSingleSingle.java:116)
    at io.reactivex.subscribers.SerializedSubscriber.onComplete(SerializedSubscriber.java:168)
    at io.reactivex.internal.operators.flowable.FlowableRepeatWhen$WhenReceiver.onComplete(FlowableRepeatWhen.java:118)
    at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drainLoop(FlowableFlatMap.java:426)
    at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drain(FlowableFlatMap.java:366)
    at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onComplete(FlowableFlatMap.java:338)
    at io.reactivex.internal.operators.flowable.FlowableZip$ZipCoordinator.drain(FlowableZip.java:210)
    at io.reactivex.internal.operators.flowable.FlowableZip$ZipSubscriber.onNext(FlowableZip.java:381)
    at io.reactivex.processors.UnicastProcessor.drainFused(UnicastProcessor.java:363)
    at io.reactivex.processors.UnicastProcessor.drain(UnicastProcessor.java:396)
    at io.reactivex.processors.UnicastProcessor.onNext(UnicastProcessor.java:458)
    at io.reactivex.processors.SerializedProcessor.onNext(SerializedProcessor.java:103)
    at io.reactivex.internal.operators.flowable.FlowableRepeatWhen$WhenSourceSubscriber.again(FlowableRepeatWhen.java:171)
    at io.reactivex.internal.operators.flowable.FlowableRetryWhen$RetryWhenSubscriber.onError(FlowableRetryWhen.java:76)
    at io.reactivex.internal.operators.single.SingleToFlowable$SingleToFlowableObserver.onError(SingleToFlowable.java:67)
    at io.reactivex.internal.operators.single.SingleFlatMap$SingleFlatMapCallback$FlatMapSingleObserver.onError(SingleFlatMap.java:116)
    at io.reactivex.internal.operators.flowable.FlowableSingleSingle$SingleElementSubscriber.onError(FlowableSingleSingle.java:97)
    at io.reactivex.subscribers.SerializedSubscriber.onError(SerializedSubscriber.java:142)
    at io.reactivex.internal.operators.flowable.FlowableRepeatWhen$WhenReceiver.onError(FlowableRepeatWhen.java:112)
    at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.checkTerminate(FlowableFlatMap.java:567)
    at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drainLoop(FlowableFlatMap.java:374)
    at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drain(FlowableFlatMap.java:366)
    at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.innerError(FlowableFlatMap.java:606)
    at io.reactivex.internal.operators.flowable.FlowableFlatMap$InnerSubscriber.onError(FlowableFlatMap.java:672)
    at io.reactivex.internal.subscriptions.EmptySubscription.error(EmptySubscription.java:55)
    at io.reactivex.internal.operators.flowable.FlowableError.subscribeActual(FlowableError.java:40)
    at io.reactivex.Flowable.subscribe(Flowable.java:14918)
    at io.reactivex.Flowable.subscribe(Flowable.java:14865)
    at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onNext(FlowableFlatMap.java:163)
    at io.reactivex.internal.operators.flowable.FlowableZip$ZipCoordinator.drain(FlowableZip.java:249)
    at io.reactivex.internal.operators.flowable.FlowableZip$ZipSubscriber.onNext(FlowableZip.java:381)
    at io.reactivex.processors.UnicastProcessor.drainFused(UnicastProcessor.java:363)
    at io.reactivex.processors.UnicastProcessor.drain(UnicastProcessor.java:396)
    at io.reactivex.processors.UnicastProcessor.onNext(UnicastProcessor.java:458)
    at io.reactivex.processors.SerializedProcessor.onNext(SerializedProcessor.java:103)
    at io.reactivex.internal.operators.flowable.FlowableRepeatWhen$WhenSourceSubscriber.again(FlowableRepeatWhen.java:171)
    at io.reactivex.internal.operators.flowable.FlowableRetryWhen$RetryWhenSubscriber.onError(FlowableRetryWhen.java:76)
    at io.reactivex.internal.operators.single.SingleToFlowable$SingleToFlowableObserver.onError(SingleToFlowable.java:67)
    at io.reactivex.internal.operators.single.SingleFlatMap$SingleFlatMapCallback$FlatMapSingleObserver.onError(SingleFlatMap.java:116)
    at io.reactivex.internal.disposables.EmptyDisposable.error(EmptyDisposable.java:78)
    at io.reactivex.internal.operators.single.SingleError.subscribeActual(SingleError.java:42)
    at io.reactivex.Single.subscribe(Single.java:3603)
    at io.reactivex.internal.operators.single.SingleFlatMap$SingleFlatMapCallback.onSuccess(SingleFlatMap.java:84)
    at io.reactivex.internal.operators.flowable.FlowableSingleSingle$SingleElementSubscriber.onComplete(FlowableSingleSingle.java:114)
    at io.reactivex.subscribers.SerializedSubscriber.onComplete(SerializedSubscriber.java:168)
    at io.reactivex.internal.operators.flowable.FlowableRetryWhen$RetryWhenSubscriber.onComplete(FlowableRetryWhen.java:82)
    at io.reactivex.internal.subscriptions.DeferredScalarSubscription.complete(DeferredScalarSubscription.java:134)
    at io.reactivex.internal.operators.single.SingleToFlowable$SingleToFlowableObserver.onSuccess(SingleToFlowable.java:62)
    at io.reactivex.internal.operators.single.SingleCreate$Emitter.onSuccess(SingleCreate.java:67)

已解决: 非常感谢@dano 指出 retryWhenSingle 一起使用时的行为。在这种情况下,最外面的 retryWhen 运算符有一个错误的终止条件,大致如下:

.retryWhen(errors -> errors.zipWith( Flowable.range(1, maxRetries), ...)
                           .flatMap( zipped -> {
  if (zipped.retryCount() <= maxRetries) {
    return Flowable.just(0L);
  }
  return Flowable.error( new Exception() );
})

...Flowable.range() 将在生成最后一个数字时完成,这将导致 Single 发出 NoSuchElement。只需将 Flowable.range() 的计数参数增加一个就足以解决问题:

.retryWhen(errors -> errors.zipWith( Flowable.range(1, maxRetries + 1), ...)
                           .flatMap( zipped -> {
  if (zipped.retryCount() <= maxRetries) {
    return Flowable.just(0L);
  }
  return Flowable.error( new Exception() );
})

发生这种情况是因为您实现传递给 retryWhen 的回调的方式。 The retryWhen docuementation states(强调我的):

Re-subscribes to the current Single if and when the Publisher returned by the handler function signals a value.

If the Publisher signals an onComplete, the resulting Single will signal a NoSuchElementException.

您在调用 retryWhen 中返回的 Flowable 个实例中的一个正在发出 onComplete,这导致 NoSuchElementException.

这是一个产生相同错误的非常简单的示例:

Single.error(new Exception("hey"))
    .retryWhen(e -> Flowable.just(1))
    .subscribe(System.out::println, e -> e.printStackTrace());

它产生的堆栈跟踪以此开头,和你的一样:

java.util.NoSuchElementException
    at io.reactivex.internal.operators.flowable.FlowableSingleSingle$SingleElementSubscriber.onComplete(FlowableSingleSingle.java:116)
    at io.reactivex.subscribers.SerializedSubscriber.onComplete(SerializedSubscriber.java:168)
    at io.reactivex.internal.operators.flowable.FlowableRepeatWhen$WhenReceiver.onComplete(FlowableRepeatWhen.java:118)

你没有在 retryWhen 调用中包含任何代码,所以我不能确切地说出你做错了什么,但通常你想将你所做的任何事情链接到 Flowable 是传入的。所以我上面的例子看起来像这样,如果我们真的想永远重试:

Single.error(new Exception("hey"))
    .retryWhen(e -> e.flatMap(ign -> Flowable.just(1)))
    .subscribe(System.out::println, e -> e.printStackTrace());