Rx Extensions - 使用延迟来避免执行不必要的可观察对象的正确方法?
Rx Extensions - Proper way to use delay to avoid unnecessary observables from executing?
我正在尝试使用 delay 和 amb 来执行按时间分隔的一系列相同任务。
我想要的只是下载尝试在未来某个时间执行,仅当同一任务在过去失败时才执行。以下是我的设置方式,但与我的预期不同,所有三个下载似乎都没有延迟地执行。
Observable.amb([
Observable.catch(redditPageStream, Observable.empty()).delay(0 * 1000),
Observable.catch(redditPageStream, Observable.empty()).delay(30 * 1000),
Observable.catch(redditPageStream, Observable.empty()).delay(90 * 1000),
# Observable.throw(new Error('Failed to retrieve reddit page content')).delay(10000)
# Observable.create(
# (observer) ->
# throw new Error('Failed to retrieve reddit page content')
# )
]).defaultIfEmpty(Observable.throw(new Error('Failed to retrieve reddit page content')))
完整代码可在此处找到。 src
我希望第一个成功的 observable 会抵消那些仍在延迟的。
感谢您的帮助。
delay
实际上并没有停止你正在做的事情的执行,它只是在传播事件时延迟。如果你想延迟执行,你需要做类似的事情:
redditPageStream.delaySubscription(1000)
由于您的源正在立即生成,上述内容将延迟对基础流的实际订阅,以有效地延迟它开始生成的时间。
我建议您使用 retry
运算符之一来处理您的 retry
逻辑,而不是通过 amb
运算符来处理您自己的逻辑。
redditPageStream.delaySubscription(1000).retry(3);
会给你一个恒定的重试延迟,但是如果你想实现线性退避方法,你可以使用 retryWhen()
运算符,它可以让你应用任何你想要的退避逻辑。
redditPageStream.retryWhen(errors => {
return errors
//Only take 3 errors
.take(3)
//Use timer to implement a linear back off and flatten it
.flatMap((e, i) => Rx.Observable.timer(i * 30 * 1000));
});
本质上 retryWhen
将创建一个错误的可观察对象,每个通过它的事件都被视为重试尝试。如果您出错或完成流,它将停止重试。
我正在尝试使用 delay 和 amb 来执行按时间分隔的一系列相同任务。 我想要的只是下载尝试在未来某个时间执行,仅当同一任务在过去失败时才执行。以下是我的设置方式,但与我的预期不同,所有三个下载似乎都没有延迟地执行。
Observable.amb([
Observable.catch(redditPageStream, Observable.empty()).delay(0 * 1000),
Observable.catch(redditPageStream, Observable.empty()).delay(30 * 1000),
Observable.catch(redditPageStream, Observable.empty()).delay(90 * 1000),
# Observable.throw(new Error('Failed to retrieve reddit page content')).delay(10000)
# Observable.create(
# (observer) ->
# throw new Error('Failed to retrieve reddit page content')
# )
]).defaultIfEmpty(Observable.throw(new Error('Failed to retrieve reddit page content')))
完整代码可在此处找到。 src
我希望第一个成功的 observable 会抵消那些仍在延迟的。 感谢您的帮助。
delay
实际上并没有停止你正在做的事情的执行,它只是在传播事件时延迟。如果你想延迟执行,你需要做类似的事情:
redditPageStream.delaySubscription(1000)
由于您的源正在立即生成,上述内容将延迟对基础流的实际订阅,以有效地延迟它开始生成的时间。
我建议您使用 retry
运算符之一来处理您的 retry
逻辑,而不是通过 amb
运算符来处理您自己的逻辑。
redditPageStream.delaySubscription(1000).retry(3);
会给你一个恒定的重试延迟,但是如果你想实现线性退避方法,你可以使用 retryWhen()
运算符,它可以让你应用任何你想要的退避逻辑。
redditPageStream.retryWhen(errors => {
return errors
//Only take 3 errors
.take(3)
//Use timer to implement a linear back off and flatten it
.flatMap((e, i) => Rx.Observable.timer(i * 30 * 1000));
});
本质上 retryWhen
将创建一个错误的可观察对象,每个通过它的事件都被视为重试尝试。如果您出错或完成流,它将停止重试。