我可以在达到限制时使用 retryWhen 和 return observable 吗?
can I use retryWhen and return an observable when a limit is reached?
我正在尝试使用 java Rx(版本 1)进行重试。
我想做一个 retryWhen
而不是简单的 retry()
因为我想在达到限制时 return 一个具有特定值的可观察对象,而不是仅仅抛出异常。
所以,检查这个 https://blog.danlew.net/2016/01/25/rxjavas-repeatwhen-and-retrywhen-explained/ and this Catch error if retryWhen:s retries runs out 我能够构建一些有助于我实现目标的东西。
// this is only to simulate the real method that will possibly throw an exception
public static Observable<String> test() {
Observable<String> var = Observable.error(new IOException());
return var;
}
Observable<String> test = test().retryWhen(attempts -> {
return attempts.zipWith(Observable.range(1, 3), (throwable, attempt) -> {
if (attempt == 3) {
LOG.info("attempting");
return Observable.just("completed with error");
} else {
return attempt;
}
});
});
test.doOnError(x -> System.out.println("do on error message")).subscribe(s -> {
System.out.println(s);
});
当我在本地 运行 执行此操作时,我看到了尝试 3 次的记录(如预期的那样)。
我没有看到 println "do on error message"
(如预期)
但我没有看到我期待的completed with error
,这让我怀疑我是否真的return我想要或不想要的观察者,是什么我做错了吗?
我也不明白为什么它允许我在 zipWith 中 return 一个 observable 和一个整数。有什么想法吗?
而且,是否可以从我自己的可观察定义中抛出 exception/error?像这样:
Observable<String> test = test().retry(3).map(value -> {
// some logic to define what to do
Observable.error(new Exception("error");
});
首先,
I also don't understand why it allows me to return an observable and an integer inside zipWith.
zipWith
中 lambda 的签名是 (Throwable, Integer) -> Object
,这意味着任何东西都是有效的 return,因为它是 Object
的 child。之所以如此,是因为此函数定义了如何组合两个 objects(在本例中为 Throwable
和 Integer
,并且任何 Object
都是有效组合(或缺少其中)。
回到你的主要问题。重要的是要记住 retryWhen
实际上在做什么。这有点难以理解(至少对我而言),但基本上每当 retryWhen
主体中的观察者发出时,都会导致上游 Observable
成为 re-subscribed。这不控制下游排放。
docs 中的示例(一个 RxJava 2 片段,但观点仍然适用)表明:
Observable.create((ObservableEmitter<? super String> s) -> {
System.out.println("subscribing");
s.onError(new RuntimeException("always fails"));
}).retryWhen(attempts -> {
return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
System.out.println("delay retry by " + i + " second(s)");
return Observable.timer(i, TimeUnit.SECONDS);
});
}).blockingForEach(System.out::println);
在此示例中,retryWhen
块中的 return 在我们重新订阅初始源时进行控制。在这种情况下,我们说我们想要延迟 re-subscription i
秒。
考虑到这一点,retryWhen
可能不是您最初寻求的解决方案。另一种解决方案可能是使用 retry
无论您想尝试订阅多少次(或者 retryWhen
如果您想要更自定义的 re-subscription),然后使用 onErrorResumeNext
. See also .
举个例子:
Observable.create((ObservableEmitter<String> s) -> s.onError(new RuntimeException("always fails")))
.retry(3)
.onErrorResumeNext(throwable -> {
return Observable.just("hi");
})
.subscribe(System.out::println, System.out::println);
结果输出为 hi
。这里的关键是 onErrorResumeNext
允许我们将发出的异常转换为其他东西。几乎就像 map
的异常。
我正在尝试使用 java Rx(版本 1)进行重试。
我想做一个 retryWhen
而不是简单的 retry()
因为我想在达到限制时 return 一个具有特定值的可观察对象,而不是仅仅抛出异常。
所以,检查这个 https://blog.danlew.net/2016/01/25/rxjavas-repeatwhen-and-retrywhen-explained/ and this Catch error if retryWhen:s retries runs out 我能够构建一些有助于我实现目标的东西。
// this is only to simulate the real method that will possibly throw an exception
public static Observable<String> test() {
Observable<String> var = Observable.error(new IOException());
return var;
}
Observable<String> test = test().retryWhen(attempts -> {
return attempts.zipWith(Observable.range(1, 3), (throwable, attempt) -> {
if (attempt == 3) {
LOG.info("attempting");
return Observable.just("completed with error");
} else {
return attempt;
}
});
});
test.doOnError(x -> System.out.println("do on error message")).subscribe(s -> {
System.out.println(s);
});
当我在本地 运行 执行此操作时,我看到了尝试 3 次的记录(如预期的那样)。
我没有看到 println "do on error message"
(如预期)
但我没有看到我期待的completed with error
,这让我怀疑我是否真的return我想要或不想要的观察者,是什么我做错了吗?
我也不明白为什么它允许我在 zipWith 中 return 一个 observable 和一个整数。有什么想法吗?
而且,是否可以从我自己的可观察定义中抛出 exception/error?像这样:
Observable<String> test = test().retry(3).map(value -> {
// some logic to define what to do
Observable.error(new Exception("error");
});
首先,
I also don't understand why it allows me to return an observable and an integer inside zipWith.
zipWith
中 lambda 的签名是 (Throwable, Integer) -> Object
,这意味着任何东西都是有效的 return,因为它是 Object
的 child。之所以如此,是因为此函数定义了如何组合两个 objects(在本例中为 Throwable
和 Integer
,并且任何 Object
都是有效组合(或缺少其中)。
回到你的主要问题。重要的是要记住 retryWhen
实际上在做什么。这有点难以理解(至少对我而言),但基本上每当 retryWhen
主体中的观察者发出时,都会导致上游 Observable
成为 re-subscribed。这不控制下游排放。
docs 中的示例(一个 RxJava 2 片段,但观点仍然适用)表明:
Observable.create((ObservableEmitter<? super String> s) -> {
System.out.println("subscribing");
s.onError(new RuntimeException("always fails"));
}).retryWhen(attempts -> {
return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
System.out.println("delay retry by " + i + " second(s)");
return Observable.timer(i, TimeUnit.SECONDS);
});
}).blockingForEach(System.out::println);
在此示例中,retryWhen
块中的 return 在我们重新订阅初始源时进行控制。在这种情况下,我们说我们想要延迟 re-subscription i
秒。
考虑到这一点,retryWhen
可能不是您最初寻求的解决方案。另一种解决方案可能是使用 retry
无论您想尝试订阅多少次(或者 retryWhen
如果您想要更自定义的 re-subscription),然后使用 onErrorResumeNext
. See also
举个例子:
Observable.create((ObservableEmitter<String> s) -> s.onError(new RuntimeException("always fails")))
.retry(3)
.onErrorResumeNext(throwable -> {
return Observable.just("hi");
})
.subscribe(System.out::println, System.out::println);
结果输出为 hi
。这里的关键是 onErrorResumeNext
允许我们将发出的异常转换为其他东西。几乎就像 map
的异常。