Rx Extensions - 使用延迟来避免执行不必要的可观察对象的正确方法?

Rx Extensions - Proper way to use delay to avoid unnecessary observables from executing?

我正在尝试使用 delay 和 amb 来执行按时间分隔的一系列相同任务。 我想要的只是下载尝试在未来某个时间执行,仅当同一任务在过去失败时才执行。以下是我的设置方式,但与我的预期不同,所有三个下载似乎都没有延迟地执行。

    Observable.amb([
        Observable.catch(redditPageStream, Observable.empty()).delay(0 * 1000),
        Observable.catch(redditPageStream, Observable.empty()).delay(30 * 1000),
        Observable.catch(redditPageStream, Observable.empty()).delay(90 * 1000),
        # Observable.throw(new Error('Failed to retrieve reddit page content')).delay(10000)
        # Observable.create(
            # (observer) ->
                # throw new Error('Failed to retrieve reddit page content')
        # )

    ]).defaultIfEmpty(Observable.throw(new Error('Failed to retrieve reddit page content')))

完整代码可在此处找到。 src

我希望第一个成功的 observable 会抵消那些仍在延迟的。 感谢您的帮助。

delay 实际上并没有停止你正在做的事情的执行,它只是在传播事件时延迟。如果你想延迟执行,你需要做类似的事情:

redditPageStream.delaySubscription(1000)

由于您的源正在立即生成,上述内容将延迟对基础流的实际订阅,以有效地延迟它开始生成的时间。

我建议您使用 retry 运算符之一来处理您的 retry 逻辑,而不是通过 amb 运算符来处理您自己的逻辑。

redditPageStream.delaySubscription(1000).retry(3);

会给你一个恒定的重试延迟,但是如果你想实现线性退避方法,你可以使用 retryWhen() 运算符,它可以让你应用任何你想要的退避逻辑。

redditPageStream.retryWhen(errors => {
  return errors
          //Only take 3 errors
          .take(3)
          //Use timer to implement a linear back off and flatten it
          .flatMap((e, i) => Rx.Observable.timer(i * 30 * 1000));
});

本质上 retryWhen 将创建一个错误的可观察对象,每个通过它的事件都被视为重试尝试。如果您出错或完成流,它将停止重试。