retryWhen operator 从不重试

retryWhen operator never retries

我正在实施带有重试的数据库更新方法。遵循 retryWhen() 运算符的通用模式,如此处所述:Using Rx Java retryWhen() ..

..但我的重试逻辑从未执行过。我正在调试它,可以看到断点在 place 3 处命中,如下所示,但它永远不会返回到 place 2 处的重试逻辑。在位置 3 之后,它总是会转到 位置 4,这是 onComplete 处理程序。

(代码正在使用 Java 8 个 lambdas)

I've applied a workaround by removing the retryWhen() block altogether and now invoking the updateWithRetrials() recursively from subscribe's > onError() block. That is working but I don't like that approach.

谁能告诉我在使用 retryWhen() 运算符时有什么不正确的地方?

private void updateWithRetrials(some input x)

{

   AtomicBoolean retryingUpdate = new AtomicBoolean(false);

   ...  

   // 1- Start from here
   Observable.<JsonDocument> just(x).map(x1 -> {

       if (retryingUpdate.get())
       {
          //2. retry logic
       }

       //doing sth with x1 here
       ...
       return <some observable>;

   })
   .retryWhen(attempts -> attempts.flatMap(n -> {

       Throwable cause = n.getThrowable();

       if (cause instanceof <errors of interest>)
       {
          // 3 - break-point hits here

          // retry update in 1 sec again
          retryingUpdate.set(true);
          return Observable.timer(1, TimeUnit.SECONDS);
       }

       // fail in all other cases...
       return Observable.error(n.getThrowable());
   }))
   .subscribe(
          doc -> {
                    //.. update was successful   
                 },

          onError -> {
                    //for unhandled errors in retryWhen() block
                  },

              {
                // 4. onComplete block

                 Sysout("Update() call completed.");
              }

     ); //subscribe ends here

}

我想错误发生在 map 中,因为它不会发生在 just 中。这不是 retryWhen 的工作方式。

使用 create 实现您的可观察对象并确保 map 中没有出现错误。如果在创建块中抛出任何错误,将调用 retryWhen 并根据您的重试逻辑重试工作单元。

    Observable.create(subscriber -> {
        // code that may throw exceptions
    }).map(item -> { 
        // code that will not throw any exceptions
    }).retryWhen(...)
      ...

您的问题是由于 Observable.just() 的一些性能优化造成的。

此 Operator 在发出项目后不检查订阅是否未取消,并在所有情况下发送 onComplete。

Observable.retryWhen(并重试)重新订阅错误,但在源发送 onComplete 时终止。

因此,即使重试运算符重新订阅,它也会从之前的订阅中获得 onComplete 并停止。

您可能会看到,下面的代码失败了(与您的一样):

@Test
public void testJustAndRetry() throws Exception {
        AtomicBoolean throwException = new AtomicBoolean(true);
        int value = Observable.just(1).map(v->{
            if( throwException.compareAndSet(true, false) ){
                throw new RuntimeException();
            }
            return v;
        }).retry(1).toBlocking().single();
    }

但如果您"don't forget"检查订阅,它就有效!:

@Test
public void testCustomJust() throws Exception {
    AtomicBoolean throwException = new AtomicBoolean(true);
    int value = Observable.create((Subscriber<? super Integer> s) -> {
                s.onNext(1);
                if (!s.isUnsubscribed()) {
                    s.onCompleted();
                }
            }
    ).map(v -> {
        if (throwException.compareAndSet(true, false)) {
            throw new RuntimeException();
        }
        return v;
    }).retry(1).toBlocking().single();

    Assert.assertEquals(1, value);
}