延迟后合并框架重试?

Combine framework retry after delay?

我看到了如何直接使用 .retry,在出错后重新订阅,像这样:

    URLSession.shared.dataTaskPublisher(for:url)
        .retry(3)

但这似乎很简单。如果我认为如果我等一会儿这个错误可能会消失怎么办?我可以插入一个 .delay 运算符,但即使没有错误,延迟也会运行。而且似乎没有办法有条件地应用运算符(即仅在出现错误时)。

我知道如何通过从头开始编写 RetryWithDelay 运算符来解决这个问题,事实上,第三方已经编写了这样的运算符。但是有没有办法说 "delay if there's an error",纯粹使用我们给定的运算符?

我的想法是我可以使用.catch,因为它的功能只有在出现错误时才会运行。但是该功能需要 return 发布者,我们将使用什么发布者?如果我们 return somePublisher.delay(...) 后跟 .retry,我们就会将 .retry 应用到错误的发布商,不是吗?

这是 Using Combine project repo a while back - the whole thread: https://github.com/heckj/swiftui-notes/issues/164 上的一个话题。

总而言之,我们做了一个我认为可以满足您要求的示例,尽管它确实使用了 catch:

let resultPublisher = upstreamPublisher.catch { error -> AnyPublisher<String, Error> in
    return Publishers.Delay(upstream: upstreamPublisher,
                            interval: 3,
                            tolerance: 1,
                            scheduler: DispatchQueue.global())
    // moving retry into this block reduces the number of duplicate requests
    // In effect, there's the original request, and the `retry(2)` here will operate
    // two additional retries on the otherwise one-shot publisher that is initiated with
    // the `Publishers.Delay()` just above. Just starting this publisher with delay makes
    // an additional request, so the total number of requests ends up being 4 (assuming all
    // fail). However, no delay is introduced in this sequence if the original request
    // is successful.
    .retry(2)
    .eraseToAnyPublisher()
}

这是引用 retry pattern I have in the book/online,这基本上就是您所描述的内容(但不是您询问的内容)。

person I was corresponding with on the issue 在该线程中提供了一个变体作为扩展,可能也很有趣:

extension Publisher {
  func retryWithDelay<T, E>()
    -> Publishers.Catch<Self, AnyPublisher<T, E>> where T == Self.Output, E == Self.Failure
  {
    return self.catch { error -> AnyPublisher<T, E> in
      return Publishers.Delay(
        upstream: self,
        interval: 3,
        tolerance: 1,
        scheduler: DispatchQueue.global()).retry(2).eraseToAnyPublisher()
    }
  }
}

使用.catch确实是答案。我们只需引用数据任务发布者并将该引用用作两个管道的头部——执行初始网络的外部管道和由 .catch 函数生成的内部管道。

让我们从创建数据任务发布者开始并停止:

let pub = URLSession.shared.dataTaskPublisher(for: url).share()

现在我可以组成流水线的头部了:

let head = pub.catch {_ in pub.delay(for: 3, scheduler: DispatchQueue.main)}
    .retry(3)

应该可以了! head 现在是一个管道,它仅在出现错误时才插入延迟运算符。然后我们可以根据 head.

继续形成管道的其余部分

观察到我们确实改变了出版商;如果出现故障并且 catch 函数运行,.delay 的上游 pub 将成为发布者,取代我们开始时使用的 pub。但是,它们是同一个对象(因为我说的是share),所以这是一个没有区别的区别。

我发现接受的答案中的实现有一些怪癖。

  • 首先,前两次尝试将立即触发,因为第一次延迟仅在第二次尝试后生效。

  • 其次,如果任何一次重试尝试成功,输出值也会延迟,这似乎是不必要的。

  • 第三,扩展不够灵活,无法让用户决定将重试尝试分派到哪个调度程序。

经过反复修改,我最终得到了这样的解决方案:

public extension Publisher {
    /**
     Creates a new publisher which will upon failure retry the upstream publisher a provided number of times, with the provided delay between retry attempts.
     If the upstream publisher succeeds the first time this is bypassed and proceeds as normal.

     - Parameters:
        - retries: The number of times to retry the upstream publisher.
        - delay: Delay in seconds between retry attempts.
        - scheduler: The scheduler to dispatch the delayed events.

     - Returns: A new publisher which will retry the upstream publisher with a delay upon failure.

     ~~~
     let url = URL(string: "https://api.myService.com")!

     URLSession.shared.dataTaskPublisher(for: url)
         .retryWithDelay(retries: 4, delay: 5, scheduler: DispatchQueue.global())
         .sink { completion in
             switch completion {
             case .finished:
                 print("Success ")
             case .failure(let error):
                 print("The last and final failure after retry attempts: \(error)")
             }
         } receiveValue: { output in
             print("Received value: \(output)")
         }
         .store(in: &cancellables)
     ~~~
     */
    func retryWithDelay<S>(
        retries: Int,
        delay: S.SchedulerTimeType.Stride,
        scheduler: S
    ) -> AnyPublisher<Output, Failure> where S: Scheduler {
        self
            .delayIfFailure(for: delay, scheduler: scheduler)
            .retry(retries)
            .eraseToAnyPublisher()
    }

    private func delayIfFailure<S>(
        for delay: S.SchedulerTimeType.Stride,
        scheduler: S
    ) -> AnyPublisher<Output, Failure> where S: Scheduler {
        self.catch { error in
            Future { completion in
                scheduler.schedule(after: scheduler.now.advanced(by: delay)) {
                    completion(.failure(error))
                }
            }
        }
        .eraseToAnyPublisher()
    }
}

我记得RxSwiftExt library had a really nice implementation of a custom retry + delay operator with many options (linear and exponential delay, plus an option to provide a custom closure) and I tried to recreate it in Combine. The original implementation is here

/**
 Provides the retry behavior that will be used - the number of retries and the delay between two subsequent retries.
 - `.immediate`: It will immediatelly retry for the specified retry count
 - `.delayed`: It will retry for the specified retry count, adding a fixed delay between each retry
 - `.exponentialDelayed`: It will retry for the specified retry count.
 The delay will be incremented by the provided multiplier after each iteration
 (`multiplier = 0.5` corresponds to 50% increase in time between each retry)
 - `.custom`: It will retry for the specified retry count. The delay will be calculated by the provided custom closure.
 The closure's argument is the current retry
 */
enum RetryBehavior<S> where S: Scheduler {
  case immediate(retries: UInt)
  case delayed(retries: UInt, time: TimeInterval)
  case exponentialDelayed(retries: UInt, initial: TimeInterval, multiplier: Double)
  case custom(retries: UInt, delayCalculator: (UInt) -> TimeInterval)
}

fileprivate extension RetryBehavior {
  
  func calculateConditions(_ currentRetry: UInt) -> (maxRetries: UInt, delay: S.SchedulerTimeType.Stride) {
    
    switch self {
    case let .immediate(retries):
      // If immediate, returns 0.0 for delay
      return (maxRetries: retries, delay: .zero)
    case let .delayed(retries, time):
      // Returns the fixed delay specified by the user
      return (maxRetries: retries, delay: .seconds(time))
    case let .exponentialDelayed(retries, initial, multiplier):
      // If it is the first retry the initial delay is used, otherwise it is calculated
      let delay = currentRetry == 1 ? initial : initial * pow(1 + multiplier, Double(currentRetry - 1))
      return (maxRetries: retries, delay: .seconds(delay))
    case let .custom(retries, delayCalculator):
      // Calculates the delay with the custom calculator
      return (maxRetries: retries, delay: .seconds(delayCalculator(currentRetry)))
    }
    
  }
  
}

public typealias RetryPredicate = (Error) -> Bool

extension Publisher {
  /**
   Retries the failed upstream publisher using the given retry behavior.
   - parameter behavior: The retry behavior that will be used in case of an error.
   - parameter shouldRetry: An optional custom closure which uses the downstream error to determine
   if the publisher should retry.
   - parameter tolerance: The allowed tolerance in firing delayed events.
   - parameter scheduler: The scheduler that will be used for delaying the retry.
   - parameter options: Options relevant to the scheduler’s behavior.
   - returns: A publisher that attempts to recreate its subscription to a failed upstream publisher.
   */
  func retry<S>(
    _ behavior: RetryBehavior<S>,
    shouldRetry: RetryPredicate? = nil,
    tolerance: S.SchedulerTimeType.Stride? = nil,
    scheduler: S,
    options: S.SchedulerOptions? = nil
  ) -> AnyPublisher<Output, Failure> where S: Scheduler {
    return retry(
      1,
      behavior: behavior,
      shouldRetry: shouldRetry,
      tolerance: tolerance,
      scheduler: scheduler,
      options: options
    )
  }
  
  private func retry<S>(
    _ currentAttempt: UInt,
    behavior: RetryBehavior<S>,
    shouldRetry: RetryPredicate? = nil,
    tolerance: S.SchedulerTimeType.Stride? = nil,
    scheduler: S,
    options: S.SchedulerOptions? = nil
  ) -> AnyPublisher<Output, Failure> where S: Scheduler {
    
    // This shouldn't happen, in case it does we finish immediately
    guard currentAttempt > 0 else { return Empty<Output, Failure>().eraseToAnyPublisher() }
    
    // Calculate the retry conditions
    let conditions = behavior.calculateConditions(currentAttempt)
    
    return self.catch { error -> AnyPublisher<Output, Failure> in
      
      // If we exceed the maximum retries we return the error
      guard currentAttempt <= conditions.maxRetries else {
        return Fail(error: error).eraseToAnyPublisher()
      }
      
      if let shouldRetry = shouldRetry, shouldRetry(error) == false {
        // If the shouldRetry predicate returns false we also return the error
        return Fail(error: error).eraseToAnyPublisher()
      }
      
      guard conditions.delay != .zero else {
        // If there is no delay, we retry immediately
        return self.retry(
          currentAttempt + 1,
          behavior: behavior,
          shouldRetry: shouldRetry,
          tolerance: tolerance,
          scheduler: scheduler,
          options: options
        )
        .eraseToAnyPublisher()
      }
      
      // We retry after the specified delay
      return Just(()).delay(for: conditions.delay, tolerance: tolerance, scheduler: scheduler, options: options).flatMap {
        return self.retry(
          currentAttempt + 1,
          behavior: behavior,
          shouldRetry: shouldRetry,
          tolerance: tolerance,
          scheduler: scheduler,
          options: options
        )
        .eraseToAnyPublisher()
      }
      .eraseToAnyPublisher()
    }
    .eraseToAnyPublisher()
  }
  
}