RxJava 2.在处理其他不同的项目时延迟重试

RxJava 2. Retry with delay while proceeding other items that are distinct

我有一个 observable 可以从很多来源获取项目:

Source { List<Item> data }

来源和项目之间的关系是多对多的,在不同的来源中项目可以自我复制。 Item 是应该上传到服务器的实体,服务器不接受重复项。为了实现这一点,我合并了源并通过它们的 id 区分它们的项目,然后将唯一的项目上传到服务器。如下所示:

Observable.merge(source1(), source2(), source3())
            .flatMapIterable(sources -> sources)
            .flatMapIterable(source::getItems)
            .distinct(item -> item.getId())
            .flatMapCompletabale(item -> uploadItem(item))

项目上传可能会出现一些错误,其中一些错误我应该稍后再次尝试上传项目并在 'failed' 等待重试时继续其他项目。

我如何才能推迟重试上传 'failed' 项并在等待尝试时继续其他项?

提前致谢!

只处理一次上传失败,可以在最后一步添加算子:

  .flatMapCompletable(item->uploadItem(item))

应该变成

  .flatMapCompletable(item->uploadItem(item)
                              .retryWhen(throwable -> 
                                  throwable.delay(5, TimeUnit.SECONDS)))

编辑:我从 this Dan Lew blog entry 那里学到了很多关于 retryWhen() 运算符的知识。您会发现几个示例,包括使用 zip() 运算符和 Observable.range(3) 来限制重试次数。

我将此函数放入 retryWhen 方法并使其运行。

public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> {

private final int maxRetryCount;
private final int retryDelay;
private int retryCount;
private TimeUnit timeUnit;

public RetryWithDelay(final int maxRetryCount, final int retryDelay, final TimeUnit timeUnit) {
    this.maxRetryCount = maxRetryCount;
    this.retryDelay = retryDelay;
    this.timeUnit = timeUnit;
    this.retryCount = 0;
}

@Override
public Observable<?> apply(final Observable<? extends Throwable> attempts) {
    return attempts.flatMap((Function<Throwable, Observable<?>>) throwable -> {

        if (++retryCount < maxRetryCount) {
            return Observable.timer(retryDelay, timeUnit);
        }

        return Observable.error(throwable);
    });
}
}

我必须修改上面的例子来创建一个 Flowable 来在我的 RxJava2 项目中重试一个 Single:

import io.reactivex.Flowable; import io.reactivex.functions.Function;

import java.util.concurrent.TimeUnit;

public class RetryWithDelay implements Function<Flowable<? extends Throwable>, Flowable<?>> {

    private final int maxRetryCount;
    private final int retryDelay;
    private int retryCount;
    private TimeUnit timeUnit;

    public RetryWithDelay(final int maxRetryCount, final int retryDelay, final TimeUnit timeUnit) {
        this.maxRetryCount = maxRetryCount;
        this.retryDelay = retryDelay;
        this.timeUnit = timeUnit;
        this.retryCount = 0;
    }

    @Override
    public Flowable<?> apply(final Flowable<? extends Throwable> attempts) {

        return attempts.flatMap((Function<Throwable, Flowable<?>>) throwable -> {

            if (++retryCount < maxRetryCount) {
                return Flowable.timer(retryDelay, timeUnit);
            }

            return Flowable.error(throwable);
        });
    } }

并将其应用到我的单曲中:

.retryWhen(new RetryWithDelay(5, 2, TimeUnit.SECONDS))