rxjava complete after retryWhen on completeable
rxjava complete after retryWhen on completeable
我在 Completeable 上使用 retryWhen 运算符,有没有办法告诉它从重试 Flowable 完成?
像这样 -
PublishSubject<?> retrySubject = PublishSubject.create();
public void someFunction() {
someCompletable.retryWhen(new Function<Flowable<Throwable>, Publisher<?>>() {
@Override
public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
return throwableFlowable.flatMap(throwable -> retrySubject.toFlowable(BackpressureStrategy.MISSING));
}
}).subscribe();
}
public void ignoreError(){
retrySubject.onComplete();
}
您不能通过给它一个空源来停止 flatMap
。此外,每个错误都会让越来越多的观察者订阅导致内存泄漏的主题。
使用takeUntil
通过另一个来源的帮助停止序列:
PublishProcessor<Throwable> stopProcessor = PublishProcessor.create();
source.retryWhen(errors ->
errors.takeUntil(
stopProcessor
)
.flatMap(error -> Flowable.timer(1, TimeUnit.SECONDS))
)
stopProcessor.onComplete();
编辑如果你想重复使用同一个主题,你可以抑制停止路径上的项目:
PublishProcessor<Integer> stopProcessor = PublishProcessor.create();
source.retryWhen(errors ->
errors.takeUntil(
stopProcessor.ignoreElements().toFlowable()
)
.flatMap(error -> stopProcessor)
)
// retry
stopProcessor.onNext(1);
// stop
stopProcessor.onComplete();
我在 Completeable 上使用 retryWhen 运算符,有没有办法告诉它从重试 Flowable 完成? 像这样 -
PublishSubject<?> retrySubject = PublishSubject.create();
public void someFunction() {
someCompletable.retryWhen(new Function<Flowable<Throwable>, Publisher<?>>() {
@Override
public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
return throwableFlowable.flatMap(throwable -> retrySubject.toFlowable(BackpressureStrategy.MISSING));
}
}).subscribe();
}
public void ignoreError(){
retrySubject.onComplete();
}
您不能通过给它一个空源来停止 flatMap
。此外,每个错误都会让越来越多的观察者订阅导致内存泄漏的主题。
使用takeUntil
通过另一个来源的帮助停止序列:
PublishProcessor<Throwable> stopProcessor = PublishProcessor.create();
source.retryWhen(errors ->
errors.takeUntil(
stopProcessor
)
.flatMap(error -> Flowable.timer(1, TimeUnit.SECONDS))
)
stopProcessor.onComplete();
编辑如果你想重复使用同一个主题,你可以抑制停止路径上的项目:
PublishProcessor<Integer> stopProcessor = PublishProcessor.create();
source.retryWhen(errors ->
errors.takeUntil(
stopProcessor.ignoreElements().toFlowable()
)
.flatMap(error -> stopProcessor)
)
// retry
stopProcessor.onNext(1);
// stop
stopProcessor.onComplete();