延迟后合并框架重试?
Combine framework retry after delay?
我看到了如何直接使用 .retry
,在出错后重新订阅,像这样:
URLSession.shared.dataTaskPublisher(for:url)
.retry(3)
但这似乎很简单。如果我认为如果我等一会儿这个错误可能会消失怎么办?我可以插入一个 .delay
运算符,但即使没有错误,延迟也会运行。而且似乎没有办法有条件地应用运算符(即仅在出现错误时)。
我知道如何通过从头开始编写 RetryWithDelay 运算符来解决这个问题,事实上,第三方已经编写了这样的运算符。但是有没有办法说 "delay if there's an error",纯粹使用我们给定的运算符?
我的想法是我可以使用.catch
,因为它的功能只有在出现错误时才会运行。但是该功能需要 return 发布者,我们将使用什么发布者?如果我们 return somePublisher.delay(...)
后跟 .retry
,我们就会将 .retry
应用到错误的发布商,不是吗?
这是 Using Combine project repo a while back - the whole thread: https://github.com/heckj/swiftui-notes/issues/164 上的一个话题。
总而言之,我们做了一个我认为可以满足您要求的示例,尽管它确实使用了 catch
:
let resultPublisher = upstreamPublisher.catch { error -> AnyPublisher<String, Error> in
return Publishers.Delay(upstream: upstreamPublisher,
interval: 3,
tolerance: 1,
scheduler: DispatchQueue.global())
// moving retry into this block reduces the number of duplicate requests
// In effect, there's the original request, and the `retry(2)` here will operate
// two additional retries on the otherwise one-shot publisher that is initiated with
// the `Publishers.Delay()` just above. Just starting this publisher with delay makes
// an additional request, so the total number of requests ends up being 4 (assuming all
// fail). However, no delay is introduced in this sequence if the original request
// is successful.
.retry(2)
.eraseToAnyPublisher()
}
这是引用 retry pattern I have in the book/online,这基本上就是您所描述的内容(但不是您询问的内容)。
person I was corresponding with on the issue 在该线程中提供了一个变体作为扩展,可能也很有趣:
extension Publisher {
func retryWithDelay<T, E>()
-> Publishers.Catch<Self, AnyPublisher<T, E>> where T == Self.Output, E == Self.Failure
{
return self.catch { error -> AnyPublisher<T, E> in
return Publishers.Delay(
upstream: self,
interval: 3,
tolerance: 1,
scheduler: DispatchQueue.global()).retry(2).eraseToAnyPublisher()
}
}
}
使用.catch
确实是答案。我们只需引用数据任务发布者并将该引用用作两个管道的头部——执行初始网络的外部管道和由 .catch
函数生成的内部管道。
让我们从创建数据任务发布者开始并停止:
let pub = URLSession.shared.dataTaskPublisher(for: url).share()
现在我可以组成流水线的头部了:
let head = pub.catch {_ in pub.delay(for: 3, scheduler: DispatchQueue.main)}
.retry(3)
应该可以了! head
现在是一个管道,它仅在出现错误时才插入延迟运算符。然后我们可以根据 head
.
继续形成管道的其余部分
观察到我们确实改变了出版商;如果出现故障并且 catch
函数运行,.delay
的上游 pub
将成为发布者,取代我们开始时使用的 pub
。但是,它们是同一个对象(因为我说的是share
),所以这是一个没有区别的区别。
我发现接受的答案中的实现有一些怪癖。
首先,前两次尝试将立即触发,因为第一次延迟仅在第二次尝试后生效。
其次,如果任何一次重试尝试成功,输出值也会延迟,这似乎是不必要的。
第三,扩展不够灵活,无法让用户决定将重试尝试分派到哪个调度程序。
经过反复修改,我最终得到了这样的解决方案:
public extension Publisher {
/**
Creates a new publisher which will upon failure retry the upstream publisher a provided number of times, with the provided delay between retry attempts.
If the upstream publisher succeeds the first time this is bypassed and proceeds as normal.
- Parameters:
- retries: The number of times to retry the upstream publisher.
- delay: Delay in seconds between retry attempts.
- scheduler: The scheduler to dispatch the delayed events.
- Returns: A new publisher which will retry the upstream publisher with a delay upon failure.
~~~
let url = URL(string: "https://api.myService.com")!
URLSession.shared.dataTaskPublisher(for: url)
.retryWithDelay(retries: 4, delay: 5, scheduler: DispatchQueue.global())
.sink { completion in
switch completion {
case .finished:
print("Success ")
case .failure(let error):
print("The last and final failure after retry attempts: \(error)")
}
} receiveValue: { output in
print("Received value: \(output)")
}
.store(in: &cancellables)
~~~
*/
func retryWithDelay<S>(
retries: Int,
delay: S.SchedulerTimeType.Stride,
scheduler: S
) -> AnyPublisher<Output, Failure> where S: Scheduler {
self
.delayIfFailure(for: delay, scheduler: scheduler)
.retry(retries)
.eraseToAnyPublisher()
}
private func delayIfFailure<S>(
for delay: S.SchedulerTimeType.Stride,
scheduler: S
) -> AnyPublisher<Output, Failure> where S: Scheduler {
self.catch { error in
Future { completion in
scheduler.schedule(after: scheduler.now.advanced(by: delay)) {
completion(.failure(error))
}
}
}
.eraseToAnyPublisher()
}
}
我记得RxSwiftExt library had a really nice implementation of a custom retry + delay operator with many options (linear and exponential delay, plus an option to provide a custom closure) and I tried to recreate it in Combine. The original implementation is here。
/**
Provides the retry behavior that will be used - the number of retries and the delay between two subsequent retries.
- `.immediate`: It will immediatelly retry for the specified retry count
- `.delayed`: It will retry for the specified retry count, adding a fixed delay between each retry
- `.exponentialDelayed`: It will retry for the specified retry count.
The delay will be incremented by the provided multiplier after each iteration
(`multiplier = 0.5` corresponds to 50% increase in time between each retry)
- `.custom`: It will retry for the specified retry count. The delay will be calculated by the provided custom closure.
The closure's argument is the current retry
*/
enum RetryBehavior<S> where S: Scheduler {
case immediate(retries: UInt)
case delayed(retries: UInt, time: TimeInterval)
case exponentialDelayed(retries: UInt, initial: TimeInterval, multiplier: Double)
case custom(retries: UInt, delayCalculator: (UInt) -> TimeInterval)
}
fileprivate extension RetryBehavior {
func calculateConditions(_ currentRetry: UInt) -> (maxRetries: UInt, delay: S.SchedulerTimeType.Stride) {
switch self {
case let .immediate(retries):
// If immediate, returns 0.0 for delay
return (maxRetries: retries, delay: .zero)
case let .delayed(retries, time):
// Returns the fixed delay specified by the user
return (maxRetries: retries, delay: .seconds(time))
case let .exponentialDelayed(retries, initial, multiplier):
// If it is the first retry the initial delay is used, otherwise it is calculated
let delay = currentRetry == 1 ? initial : initial * pow(1 + multiplier, Double(currentRetry - 1))
return (maxRetries: retries, delay: .seconds(delay))
case let .custom(retries, delayCalculator):
// Calculates the delay with the custom calculator
return (maxRetries: retries, delay: .seconds(delayCalculator(currentRetry)))
}
}
}
public typealias RetryPredicate = (Error) -> Bool
extension Publisher {
/**
Retries the failed upstream publisher using the given retry behavior.
- parameter behavior: The retry behavior that will be used in case of an error.
- parameter shouldRetry: An optional custom closure which uses the downstream error to determine
if the publisher should retry.
- parameter tolerance: The allowed tolerance in firing delayed events.
- parameter scheduler: The scheduler that will be used for delaying the retry.
- parameter options: Options relevant to the scheduler’s behavior.
- returns: A publisher that attempts to recreate its subscription to a failed upstream publisher.
*/
func retry<S>(
_ behavior: RetryBehavior<S>,
shouldRetry: RetryPredicate? = nil,
tolerance: S.SchedulerTimeType.Stride? = nil,
scheduler: S,
options: S.SchedulerOptions? = nil
) -> AnyPublisher<Output, Failure> where S: Scheduler {
return retry(
1,
behavior: behavior,
shouldRetry: shouldRetry,
tolerance: tolerance,
scheduler: scheduler,
options: options
)
}
private func retry<S>(
_ currentAttempt: UInt,
behavior: RetryBehavior<S>,
shouldRetry: RetryPredicate? = nil,
tolerance: S.SchedulerTimeType.Stride? = nil,
scheduler: S,
options: S.SchedulerOptions? = nil
) -> AnyPublisher<Output, Failure> where S: Scheduler {
// This shouldn't happen, in case it does we finish immediately
guard currentAttempt > 0 else { return Empty<Output, Failure>().eraseToAnyPublisher() }
// Calculate the retry conditions
let conditions = behavior.calculateConditions(currentAttempt)
return self.catch { error -> AnyPublisher<Output, Failure> in
// If we exceed the maximum retries we return the error
guard currentAttempt <= conditions.maxRetries else {
return Fail(error: error).eraseToAnyPublisher()
}
if let shouldRetry = shouldRetry, shouldRetry(error) == false {
// If the shouldRetry predicate returns false we also return the error
return Fail(error: error).eraseToAnyPublisher()
}
guard conditions.delay != .zero else {
// If there is no delay, we retry immediately
return self.retry(
currentAttempt + 1,
behavior: behavior,
shouldRetry: shouldRetry,
tolerance: tolerance,
scheduler: scheduler,
options: options
)
.eraseToAnyPublisher()
}
// We retry after the specified delay
return Just(()).delay(for: conditions.delay, tolerance: tolerance, scheduler: scheduler, options: options).flatMap {
return self.retry(
currentAttempt + 1,
behavior: behavior,
shouldRetry: shouldRetry,
tolerance: tolerance,
scheduler: scheduler,
options: options
)
.eraseToAnyPublisher()
}
.eraseToAnyPublisher()
}
.eraseToAnyPublisher()
}
}
我看到了如何直接使用 .retry
,在出错后重新订阅,像这样:
URLSession.shared.dataTaskPublisher(for:url)
.retry(3)
但这似乎很简单。如果我认为如果我等一会儿这个错误可能会消失怎么办?我可以插入一个 .delay
运算符,但即使没有错误,延迟也会运行。而且似乎没有办法有条件地应用运算符(即仅在出现错误时)。
我知道如何通过从头开始编写 RetryWithDelay 运算符来解决这个问题,事实上,第三方已经编写了这样的运算符。但是有没有办法说 "delay if there's an error",纯粹使用我们给定的运算符?
我的想法是我可以使用.catch
,因为它的功能只有在出现错误时才会运行。但是该功能需要 return 发布者,我们将使用什么发布者?如果我们 return somePublisher.delay(...)
后跟 .retry
,我们就会将 .retry
应用到错误的发布商,不是吗?
这是 Using Combine project repo a while back - the whole thread: https://github.com/heckj/swiftui-notes/issues/164 上的一个话题。
总而言之,我们做了一个我认为可以满足您要求的示例,尽管它确实使用了 catch
:
let resultPublisher = upstreamPublisher.catch { error -> AnyPublisher<String, Error> in
return Publishers.Delay(upstream: upstreamPublisher,
interval: 3,
tolerance: 1,
scheduler: DispatchQueue.global())
// moving retry into this block reduces the number of duplicate requests
// In effect, there's the original request, and the `retry(2)` here will operate
// two additional retries on the otherwise one-shot publisher that is initiated with
// the `Publishers.Delay()` just above. Just starting this publisher with delay makes
// an additional request, so the total number of requests ends up being 4 (assuming all
// fail). However, no delay is introduced in this sequence if the original request
// is successful.
.retry(2)
.eraseToAnyPublisher()
}
这是引用 retry pattern I have in the book/online,这基本上就是您所描述的内容(但不是您询问的内容)。
person I was corresponding with on the issue 在该线程中提供了一个变体作为扩展,可能也很有趣:
extension Publisher {
func retryWithDelay<T, E>()
-> Publishers.Catch<Self, AnyPublisher<T, E>> where T == Self.Output, E == Self.Failure
{
return self.catch { error -> AnyPublisher<T, E> in
return Publishers.Delay(
upstream: self,
interval: 3,
tolerance: 1,
scheduler: DispatchQueue.global()).retry(2).eraseToAnyPublisher()
}
}
}
使用.catch
确实是答案。我们只需引用数据任务发布者并将该引用用作两个管道的头部——执行初始网络的外部管道和由 .catch
函数生成的内部管道。
让我们从创建数据任务发布者开始并停止:
let pub = URLSession.shared.dataTaskPublisher(for: url).share()
现在我可以组成流水线的头部了:
let head = pub.catch {_ in pub.delay(for: 3, scheduler: DispatchQueue.main)}
.retry(3)
应该可以了! head
现在是一个管道,它仅在出现错误时才插入延迟运算符。然后我们可以根据 head
.
观察到我们确实改变了出版商;如果出现故障并且 catch
函数运行,.delay
的上游 pub
将成为发布者,取代我们开始时使用的 pub
。但是,它们是同一个对象(因为我说的是share
),所以这是一个没有区别的区别。
我发现接受的答案中的实现有一些怪癖。
首先,前两次尝试将立即触发,因为第一次延迟仅在第二次尝试后生效。
其次,如果任何一次重试尝试成功,输出值也会延迟,这似乎是不必要的。
第三,扩展不够灵活,无法让用户决定将重试尝试分派到哪个调度程序。
经过反复修改,我最终得到了这样的解决方案:
public extension Publisher {
/**
Creates a new publisher which will upon failure retry the upstream publisher a provided number of times, with the provided delay between retry attempts.
If the upstream publisher succeeds the first time this is bypassed and proceeds as normal.
- Parameters:
- retries: The number of times to retry the upstream publisher.
- delay: Delay in seconds between retry attempts.
- scheduler: The scheduler to dispatch the delayed events.
- Returns: A new publisher which will retry the upstream publisher with a delay upon failure.
~~~
let url = URL(string: "https://api.myService.com")!
URLSession.shared.dataTaskPublisher(for: url)
.retryWithDelay(retries: 4, delay: 5, scheduler: DispatchQueue.global())
.sink { completion in
switch completion {
case .finished:
print("Success ")
case .failure(let error):
print("The last and final failure after retry attempts: \(error)")
}
} receiveValue: { output in
print("Received value: \(output)")
}
.store(in: &cancellables)
~~~
*/
func retryWithDelay<S>(
retries: Int,
delay: S.SchedulerTimeType.Stride,
scheduler: S
) -> AnyPublisher<Output, Failure> where S: Scheduler {
self
.delayIfFailure(for: delay, scheduler: scheduler)
.retry(retries)
.eraseToAnyPublisher()
}
private func delayIfFailure<S>(
for delay: S.SchedulerTimeType.Stride,
scheduler: S
) -> AnyPublisher<Output, Failure> where S: Scheduler {
self.catch { error in
Future { completion in
scheduler.schedule(after: scheduler.now.advanced(by: delay)) {
completion(.failure(error))
}
}
}
.eraseToAnyPublisher()
}
}
我记得RxSwiftExt library had a really nice implementation of a custom retry + delay operator with many options (linear and exponential delay, plus an option to provide a custom closure) and I tried to recreate it in Combine. The original implementation is here。
/**
Provides the retry behavior that will be used - the number of retries and the delay between two subsequent retries.
- `.immediate`: It will immediatelly retry for the specified retry count
- `.delayed`: It will retry for the specified retry count, adding a fixed delay between each retry
- `.exponentialDelayed`: It will retry for the specified retry count.
The delay will be incremented by the provided multiplier after each iteration
(`multiplier = 0.5` corresponds to 50% increase in time between each retry)
- `.custom`: It will retry for the specified retry count. The delay will be calculated by the provided custom closure.
The closure's argument is the current retry
*/
enum RetryBehavior<S> where S: Scheduler {
case immediate(retries: UInt)
case delayed(retries: UInt, time: TimeInterval)
case exponentialDelayed(retries: UInt, initial: TimeInterval, multiplier: Double)
case custom(retries: UInt, delayCalculator: (UInt) -> TimeInterval)
}
fileprivate extension RetryBehavior {
func calculateConditions(_ currentRetry: UInt) -> (maxRetries: UInt, delay: S.SchedulerTimeType.Stride) {
switch self {
case let .immediate(retries):
// If immediate, returns 0.0 for delay
return (maxRetries: retries, delay: .zero)
case let .delayed(retries, time):
// Returns the fixed delay specified by the user
return (maxRetries: retries, delay: .seconds(time))
case let .exponentialDelayed(retries, initial, multiplier):
// If it is the first retry the initial delay is used, otherwise it is calculated
let delay = currentRetry == 1 ? initial : initial * pow(1 + multiplier, Double(currentRetry - 1))
return (maxRetries: retries, delay: .seconds(delay))
case let .custom(retries, delayCalculator):
// Calculates the delay with the custom calculator
return (maxRetries: retries, delay: .seconds(delayCalculator(currentRetry)))
}
}
}
public typealias RetryPredicate = (Error) -> Bool
extension Publisher {
/**
Retries the failed upstream publisher using the given retry behavior.
- parameter behavior: The retry behavior that will be used in case of an error.
- parameter shouldRetry: An optional custom closure which uses the downstream error to determine
if the publisher should retry.
- parameter tolerance: The allowed tolerance in firing delayed events.
- parameter scheduler: The scheduler that will be used for delaying the retry.
- parameter options: Options relevant to the scheduler’s behavior.
- returns: A publisher that attempts to recreate its subscription to a failed upstream publisher.
*/
func retry<S>(
_ behavior: RetryBehavior<S>,
shouldRetry: RetryPredicate? = nil,
tolerance: S.SchedulerTimeType.Stride? = nil,
scheduler: S,
options: S.SchedulerOptions? = nil
) -> AnyPublisher<Output, Failure> where S: Scheduler {
return retry(
1,
behavior: behavior,
shouldRetry: shouldRetry,
tolerance: tolerance,
scheduler: scheduler,
options: options
)
}
private func retry<S>(
_ currentAttempt: UInt,
behavior: RetryBehavior<S>,
shouldRetry: RetryPredicate? = nil,
tolerance: S.SchedulerTimeType.Stride? = nil,
scheduler: S,
options: S.SchedulerOptions? = nil
) -> AnyPublisher<Output, Failure> where S: Scheduler {
// This shouldn't happen, in case it does we finish immediately
guard currentAttempt > 0 else { return Empty<Output, Failure>().eraseToAnyPublisher() }
// Calculate the retry conditions
let conditions = behavior.calculateConditions(currentAttempt)
return self.catch { error -> AnyPublisher<Output, Failure> in
// If we exceed the maximum retries we return the error
guard currentAttempt <= conditions.maxRetries else {
return Fail(error: error).eraseToAnyPublisher()
}
if let shouldRetry = shouldRetry, shouldRetry(error) == false {
// If the shouldRetry predicate returns false we also return the error
return Fail(error: error).eraseToAnyPublisher()
}
guard conditions.delay != .zero else {
// If there is no delay, we retry immediately
return self.retry(
currentAttempt + 1,
behavior: behavior,
shouldRetry: shouldRetry,
tolerance: tolerance,
scheduler: scheduler,
options: options
)
.eraseToAnyPublisher()
}
// We retry after the specified delay
return Just(()).delay(for: conditions.delay, tolerance: tolerance, scheduler: scheduler, options: options).flatMap {
return self.retry(
currentAttempt + 1,
behavior: behavior,
shouldRetry: shouldRetry,
tolerance: tolerance,
scheduler: scheduler,
options: options
)
.eraseToAnyPublisher()
}
.eraseToAnyPublisher()
}
.eraseToAnyPublisher()
}
}