Swift 仅针对某些错误类型合并重试

Swift combine retry only for some error types

我有一个自定义管道,我想对一些可恢复的错误代码进行 3 次重试,而且我想为可恢复的错误添加一些短暂的延迟。有人知道我该怎么做吗?

func createRequest(for message: Message) -> AnyPublisher<ResponseMessage, Error> {
    Future<ResponseMessage, Error> { promise in
        .....   
    }
    .tryCatch({ error -> AnyPublisher<ResponseMessage, Error> in
        // If error is a recoverable error retry, otherwise fail directly
        if case let MessageBusError.messageError(responseError) = error {
            if responseError.isRecoverable {
                // Make a next attempt only for recoverable error
                throw error
            }
        }
            //Should fail directly if the error code is not recoverable
        return Fail<ResponseMessage, Error>(error: error)
               .eraseToAnyPublisher()

    })
    .retry(3)
    .eraseToAnyPublisher()
}

通常,我尽量避免构建新的发布器,而是更喜欢从内置运算符组合发布器。我发现在这里做起来相当棘手。也许有人可以提出更好的方法。

Retry 任何 失败时重新订阅,所以为了欺骗它,我将任何 不可恢复的 错误打包成 Result 包含错误的值,但将可恢复的错误作为失败留给 .retry;然后最终将 Result 解压回相应的 value/error.

以下是它在您的情况下的工作方式:

func createRequest(for message: Message)-> AnyPublisher<ResponseMessage, Error> {
    Future<ResponseMessage, Error> { promise in
        .....   
    }

    // pack a value into Result
    .map { v -> Result<ResponseMessage, Error> in .success(v) }
    .tryCatch { error -> AnyPublisher<Result<ResponseMessage, Error>, Error> in
        if case let MessageBusError.messageError(responseError) = error {
            if responseError.isRecoverable {
                // Keep recoverable errors as failures
                throw error
            }
        }
        // pack a non-recoverable error into Result with a failure   
        return Just(.failure(error)).setFailureType(Error.self)
                  .eraseToAnyPublisher()

    }
    .retry(3)

    // unpack back
    .flatMap { result in result.publisher }

    .eraseToAnyPublisher()
}

为了完整性,使用上述方法扩展 Publisher

extension Publisher {
   private func retryOnly<U: Publisher>(
         upstream: U, 
         retries: Int, 
         when predicate: @escaping (U.Failure) -> Bool
      ) -> AnyPublisher<U.Output, U.Failure> {

      upstream
         .map { v -> Result<U.Output, U.Failure> in .success(v) }
         .catch { err -> AnyPublisher<Result<U.Output, U.Failure>, U.Failure> in
            if predicate(err) {
               return Fail(error: err).eraseToAnyPublisher()
            } else {
               return Just(.failure(err))
                  .setFailureType(to: U.Failure.self)
                  .eraseToAnyPublisher()
            }
         }
         .retry(retries)
         .flatMap { result in result.publisher }
         .eraseToAnyPublisher()
   }
    
   func retry(_ retries: Int, when predicate: @escaping (Failure) -> Bool) 
        -> AnyPublisher<Output, Failure> {

      return retryOnly(upstream: self, retries: retries, when: predicate)
   }
}
failingPublisher.retry(3, when: { [=12=] is RecoverableError })

基本上,您需要一个 retryIf 运算符,这样您就可以提供一个闭包来告诉 Combine 哪些错误应该重试,哪些不应该重试。我不知道有这样的运算符,但是自己构建一个并不难。

惯用的方法是为您的运算符使用新类型扩展 Publishers 命名空间,然后扩展 Publisher 以添加对该运算符的支持,以便您可以将它与其他运算符链接起来.

实现可能如下所示:

extension Publishers {
    struct RetryIf<P: Publisher>: Publisher {
        typealias Output = P.Output
        typealias Failure = P.Failure
        
        let publisher: P
        let times: Int
        let condition: (P.Failure) -> Bool
                
        func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
            guard times > 0 else { return publisher.receive(subscriber: subscriber) }
            
            publisher.catch { (error: P.Failure) -> AnyPublisher<Output, Failure> in
                if condition(error)  {
                    return RetryIf(publisher: publisher, times: times - 1, condition: condition).eraseToAnyPublisher()
                } else {
                    return Fail(error: error).eraseToAnyPublisher()
                }
            }.receive(subscriber: subscriber)
        }
    }
}

extension Publisher {
    func retry(times: Int, if condition: @escaping (Failure) -> Bool) -> Publishers.RetryIf<Self> {
        Publishers.RetryIf(publisher: self, times: times, condition: condition)
    }
}

用法:

func createRequest(for message: Message) -> AnyPublisher<ResponseMessage, Error> {
    Deferred {
        Future<ResponseMessage, Error> { promise in
            // future code
        }        
    }
    .retry(times: 3) { error in
        if case let MessageBusError.messageError(responseError) = error, responseError.isRecoverable {
            return true
        }
        return false
    }
    .eraseToAnyPublisher()
}

请注意,我将您的 Future 包裹在 Deferred 中,否则 retry 运算符将毫无意义,因为闭包不会被多次执行。有关该行为的更多详细信息:.


或者,您可以这样编写 Publisher 扩展名:

extension Publisher {
    func retry(_ times: Int, if condition: @escaping (Failure) -> Bool) -> Publishers.RetryIf<Self> {
        Publishers.RetryIf(publisher: self, times: times, condition: condition)
    }
    
    func retry(_ times: Int, unless condition: @escaping (Failure) -> Bool) -> Publishers.RetryIf<Self> {
        retry(times, if: { !condition([=12=]) })
    }
}

,这会启用一些时髦的东西,例如:

extension Error {
    var isRecoverable: Bool { ... }
    var isUnrecoverable: Bool { ... }
}

// retry at most 3 times while receiving recoverable errors
// bail out the first time when encountering an error that is
// not recoverable
somePublisher
    .retry(3, if: \.isRecoverable)

// retry at most 3 times, bail out the first time when
// an unrecoverable the error is encountered
somePublisher
    .retry(3, unless: \.isUnrecoverable)

或者更时髦,ruby风格:

extension Int {
    var times: Int { self }
}

somePublisher
    .retry(3.times, unless: \.isUnrecoverable)