RetryWhen 中需要使用 AtomicBoolean

Use of AtomicBoolean required in RetryWhen

Observable.retryWhen 的 Javadoc 示例中,AtomicInteger 用于 counter 而不是更简单的正则 Int。这真的有必要吗?在什么情况下 errors 可以在不同的线程上发出?

我阅读文档和源代码表明 takeWhileflatMap 闭包始终保证 运行 在同一线程上。

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#retryWhen-io.reactivex.functions.Function-

Observable.timer(1, TimeUnit.SECONDS)
     .doOnSubscribe(s -> System.out.println("subscribing"))
     .map(v -> { throw new RuntimeException(); })
     .retryWhen(errors -> {
         AtomicInteger counter = new AtomicInteger();
         return errors
                   .takeWhile(e -> counter.getAndIncrement() != 3)
                   .flatMap(e -> {
                       System.out.println("delay retry by " + counter.get() + " second(s)");
                       return Observable.timer(counter.get(), TimeUnit.SECONDS);
                   });
     })
     .blockingSubscribe(System.out::println, System.out::println);

这不是绝对必要的,但有些人在看到用于计数器的单元素 int 数组时会心脏病发作,因此 AtomicInteger

 Observable.timer(1, TimeUnit.SECONDS)
 .doOnSubscribe(s -> System.out.println("subscribing"))
 .map(v -> { throw new RuntimeException(); })
 .retryWhen(errors -> {
     int[] counter = { 0 };
     return errors
               .takeWhile(e -> counter[0]++ != 3)
               .flatMap(e -> {
                   System.out.println("delay retry by " + counter[0] + " second(s)");
                   return Observable.timer(counter[0], TimeUnit.SECONDS);
               });
 })
 .blockingSubscribe(System.out::println, System.out::println);

Under what circumstances can errors emit on a different thread?

处理程序序列可以有自己的线程,因此无论何时共享对可变状态的外部访问,都应确保访问是线程安全的。同样,在该示例中,这不是必需的,因为在使用计数器期间的特定组合在单个线程上是 运行 并且保证不会自我重叠,因为任何新错误只能在当前序列发出重试信号后发生发生了。