RxJava 2.在处理其他不同的项目时延迟重试
RxJava 2. Retry with delay while proceeding other items that are distinct
我有一个 observable 可以从很多来源获取项目:
Source { List<Item> data }
来源和项目之间的关系是多对多的,在不同的来源中项目可以自我复制。 Item 是应该上传到服务器的实体,服务器不接受重复项。为了实现这一点,我合并了源并通过它们的 id 区分它们的项目,然后将唯一的项目上传到服务器。如下所示:
Observable.merge(source1(), source2(), source3())
.flatMapIterable(sources -> sources)
.flatMapIterable(source::getItems)
.distinct(item -> item.getId())
.flatMapCompletabale(item -> uploadItem(item))
项目上传可能会出现一些错误,其中一些错误我应该稍后再次尝试上传项目并在 'failed' 等待重试时继续其他项目。
我如何才能推迟重试上传 'failed' 项并在等待尝试时继续其他项?
提前致谢!
只处理一次上传失败,可以在最后一步添加算子:
.flatMapCompletable(item->uploadItem(item))
应该变成
.flatMapCompletable(item->uploadItem(item)
.retryWhen(throwable ->
throwable.delay(5, TimeUnit.SECONDS)))
编辑:我从 this Dan Lew blog entry 那里学到了很多关于 retryWhen()
运算符的知识。您会发现几个示例,包括使用 zip()
运算符和 Observable.range(3)
来限制重试次数。
我将此函数放入 retryWhen 方法并使其运行。
public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> {
private final int maxRetryCount;
private final int retryDelay;
private int retryCount;
private TimeUnit timeUnit;
public RetryWithDelay(final int maxRetryCount, final int retryDelay, final TimeUnit timeUnit) {
this.maxRetryCount = maxRetryCount;
this.retryDelay = retryDelay;
this.timeUnit = timeUnit;
this.retryCount = 0;
}
@Override
public Observable<?> apply(final Observable<? extends Throwable> attempts) {
return attempts.flatMap((Function<Throwable, Observable<?>>) throwable -> {
if (++retryCount < maxRetryCount) {
return Observable.timer(retryDelay, timeUnit);
}
return Observable.error(throwable);
});
}
}
我必须修改上面的例子来创建一个 Flowable 来在我的 RxJava2 项目中重试一个 Single:
import io.reactivex.Flowable; import io.reactivex.functions.Function;
import java.util.concurrent.TimeUnit;
public class RetryWithDelay implements Function<Flowable<? extends Throwable>, Flowable<?>> {
private final int maxRetryCount;
private final int retryDelay;
private int retryCount;
private TimeUnit timeUnit;
public RetryWithDelay(final int maxRetryCount, final int retryDelay, final TimeUnit timeUnit) {
this.maxRetryCount = maxRetryCount;
this.retryDelay = retryDelay;
this.timeUnit = timeUnit;
this.retryCount = 0;
}
@Override
public Flowable<?> apply(final Flowable<? extends Throwable> attempts) {
return attempts.flatMap((Function<Throwable, Flowable<?>>) throwable -> {
if (++retryCount < maxRetryCount) {
return Flowable.timer(retryDelay, timeUnit);
}
return Flowable.error(throwable);
});
} }
并将其应用到我的单曲中:
.retryWhen(new RetryWithDelay(5, 2, TimeUnit.SECONDS))
我有一个 observable 可以从很多来源获取项目:
Source { List<Item> data }
来源和项目之间的关系是多对多的,在不同的来源中项目可以自我复制。 Item 是应该上传到服务器的实体,服务器不接受重复项。为了实现这一点,我合并了源并通过它们的 id 区分它们的项目,然后将唯一的项目上传到服务器。如下所示:
Observable.merge(source1(), source2(), source3())
.flatMapIterable(sources -> sources)
.flatMapIterable(source::getItems)
.distinct(item -> item.getId())
.flatMapCompletabale(item -> uploadItem(item))
项目上传可能会出现一些错误,其中一些错误我应该稍后再次尝试上传项目并在 'failed' 等待重试时继续其他项目。
我如何才能推迟重试上传 'failed' 项并在等待尝试时继续其他项?
提前致谢!
只处理一次上传失败,可以在最后一步添加算子:
.flatMapCompletable(item->uploadItem(item))
应该变成
.flatMapCompletable(item->uploadItem(item)
.retryWhen(throwable ->
throwable.delay(5, TimeUnit.SECONDS)))
编辑:我从 this Dan Lew blog entry 那里学到了很多关于 retryWhen()
运算符的知识。您会发现几个示例,包括使用 zip()
运算符和 Observable.range(3)
来限制重试次数。
我将此函数放入 retryWhen 方法并使其运行。
public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> {
private final int maxRetryCount;
private final int retryDelay;
private int retryCount;
private TimeUnit timeUnit;
public RetryWithDelay(final int maxRetryCount, final int retryDelay, final TimeUnit timeUnit) {
this.maxRetryCount = maxRetryCount;
this.retryDelay = retryDelay;
this.timeUnit = timeUnit;
this.retryCount = 0;
}
@Override
public Observable<?> apply(final Observable<? extends Throwable> attempts) {
return attempts.flatMap((Function<Throwable, Observable<?>>) throwable -> {
if (++retryCount < maxRetryCount) {
return Observable.timer(retryDelay, timeUnit);
}
return Observable.error(throwable);
});
}
}
我必须修改上面的例子来创建一个 Flowable 来在我的 RxJava2 项目中重试一个 Single:
import io.reactivex.Flowable; import io.reactivex.functions.Function;
import java.util.concurrent.TimeUnit;
public class RetryWithDelay implements Function<Flowable<? extends Throwable>, Flowable<?>> {
private final int maxRetryCount;
private final int retryDelay;
private int retryCount;
private TimeUnit timeUnit;
public RetryWithDelay(final int maxRetryCount, final int retryDelay, final TimeUnit timeUnit) {
this.maxRetryCount = maxRetryCount;
this.retryDelay = retryDelay;
this.timeUnit = timeUnit;
this.retryCount = 0;
}
@Override
public Flowable<?> apply(final Flowable<? extends Throwable> attempts) {
return attempts.flatMap((Function<Throwable, Flowable<?>>) throwable -> {
if (++retryCount < maxRetryCount) {
return Flowable.timer(retryDelay, timeUnit);
}
return Flowable.error(throwable);
});
} }
并将其应用到我的单曲中:
.retryWhen(new RetryWithDelay(5, 2, TimeUnit.SECONDS))