如何使用 Alamofire + RxSwift 同步刷新访问令牌

How to synchronously refresh an access token using Alamofire + RxSwift

我的 NetworkManager class 中有这个通用 fetchData() 函数,它能够请求向网络发出授权请求,如果它失败(在多次重试后),则会发出错误将重新启动我的应用程序(请求新登录)。我需要同步调用此重试令牌,我的意思是,如果多个请求失败,则一次应该只有一个请求刷新令牌。如果那个失败,另一个请求必须被丢弃。我已经尝试了一些使用 DispatchGroup / NSRecursiveLock / 以及调用函数 cancelRequests 的方法(在这种情况下,任务计数始终为 0)。我怎样才能使这种行为在这种情况下起作用?


    public func fetchData<Type: Decodable>(fromApi api: TargetType,
                                           decodeFromKeyPath keyPath: String? = nil) -> Single<Response> {
        
        let request = MultiTarget(api)

        return provider.rx.request(request)
                .asRetriableAuthenticated(target: request)
    }

    func cancelAllRequests(){
        if #available(iOS 9.0, *) {
            DefaultAlamofireManager
                .sharedManager
                .session
                .getAllTasks { (tasks) in
                tasks.forEach{ [=10=].cancel() }
            }
        } else {
            DefaultAlamofireManager
                .sharedManager
                .session
                .getTasksWithCompletionHandler { (sessionDataTask, uploadData, downloadData) in
                    
                sessionDataTask.forEach { [=10=].cancel() }
                uploadData.forEach { [=10=].cancel() }
                downloadData.forEach { [=10=].cancel() }
            }
        }
    }


public extension PrimitiveSequence where TraitType == SingleTrait, ElementType == Response {
    
    private var refreshTokenParameters: TokenParameters {
        TokenParameters(clientId: "pdappclient",
                grantType: "refresh_token",
                refreshToken: KeychainManager.shared.refreshToken)
    }

    func retryWithToken(target: MultiTarget) -> Single<E> {
        self.catchError { error -> Single<Response> in
                    if case Moya.MoyaError.statusCode(let response) = error {
                        if self.isTokenExpiredError(error) {
                            return Single.error(error)
                        } else {
                            return self.parseError(response: response)
                        }
                    }
                    return Single.error(error)
                }
                .retryToken(target: target)
                .catchError { error -> Single<Response> in
                    if case Moya.MoyaError.statusCode(let response) = error {
                        return self.parseError(response: response)
                    }
                    return Single.error(InvalidGrantException())
                }
    }

    private func retryToken(target: MultiTarget) -> Single<E> {
        let maxRetries = 1
        return self.retryWhen({ error in
            error
                    .enumerated()
                    .flatMap { (attempt, error) -> Observable<Int> in
                        if attempt >= maxRetries {
                            return Observable.error(error)
                        }
                        if self.isTokenExpiredError(error) {
                            return Observable<Int>.just(attempt + 1)
                        }
                        return Observable.error(error)
                    }
                    .flatMap { _ -> Single<TokenResponse> in
                        self.refreshTokenRequest()
                    }
                    .share()
                    .asObservable()
        })
    }
    
    private func refreshTokenRequest() -> Single<TokenResponse> {
        return NetworkManager.shared.fetchData(fromApi: IdentityServerAPI
            .token(parameters: self.refreshTokenParameters)).do(onSuccess: { tokenResponse in
                    
            KeychainManager.shared.accessToken = tokenResponse.accessToken
            KeychainManager.shared.refreshToken = tokenResponse.refreshToken
        }, onError: { error in
            NetworkManager.shared.cancelAllRequests()
        })
    }

    func parseError<E>(response: Response) -> Single<E> {
        if response.statusCode == 401 {
            // TODO
        }

        let decoder = JSONDecoder()
        if let errors = try? response.map([BaseResponseError].self, atKeyPath: "errors", using: decoder,
                failsOnEmptyData: true) {
            return Single.error(BaseAPIErrorResponse(errors: errors))
        }

        return Single.error(APIError2.unknown)
    }

    func isTokenExpiredError(_ error: Error) -> Bool {
        if let moyaError = error as? MoyaError {
            switch moyaError {
            case .statusCode(let response):
                if response.statusCode != 401 {
                    return false
                } else if response.data.count == 0 {
                    return true
                }
            default:
                break
            }
        }
        return false
    }

    func filterUnauthorized() -> Single<E> {
        flatMap { (response) -> Single<E> in
            if 200...299 ~= response.statusCode {
                return Single.just(response)
            } else if response.statusCode == 404 {
                return Single.just(response)
            } else {
                return Single.error(MoyaError.statusCode(response))
            }
        }
    }

    func asRetriableAuthenticated(target: MultiTarget) -> Single<Element> {
        filterUnauthorized()
                .retryWithToken(target: target)
                .filterStatusCode()
    }

    func filterStatusCode() -> Single<E> {
        flatMap { (response) -> Single<E> in
            if 200...299 ~= response.statusCode {
                return Single.just(response)
            } else {
                return self.parseError(response: response)
            }
        }
    }
}

这是一个 RxSwift 解决方案:RxSwift 和 Handling Invalid Tokens

仅仅postlink并不是最好的,所以我也会post解决方案的核心:

关键是制作一个与 ActivityMonitor class 非常相似但处理令牌刷新的 class...

public final class TokenAcquisitionService<T> {

    /// responds with the current token immediatly and emits a new token whenver a new one is aquired. You can, for example, subscribe to it in order to save the token as it's updated.
    public var token: Observable<T> {
        return _token.asObservable()
    }

    public typealias GetToken = (T) -> Observable<(response: HTTPURLResponse, data: Data)>

    /// Creates a `TokenAcquisitionService` object that will store the most recent authorization token acquired and will acquire new ones as needed.
    ///
    /// - Parameters:
    ///   - initialToken: The token the service should start with. Provide a token from storage or an empty string (object represting a missing token) if one has not been aquired yet.
    ///   - getToken: A function responsable for aquiring new tokens when needed.
    ///   - extractToken: A function that can extract a token from the data returned by `getToken`.
    public init(initialToken: T, getToken: @escaping GetToken, extractToken: @escaping (Data) throws -> T) {
        relay
            .flatMapFirst { getToken([=10=]) }
            .map { (urlResponse) -> T in
                guard urlResponse.response.statusCode / 100 == 2 else { throw TokenAcquisitionError.refusedToken(response: urlResponse.response, data: urlResponse.data) }
                return try extractToken(urlResponse.data)
            }
            .startWith(initialToken)
            .subscribe(_token)
            .disposed(by: disposeBag)
    }

    /// Allows the token to be set imperativly if necessary.
    /// - Parameter token: The new token the service should use. It will immediatly be emitted to any subscribers to the service.
    func setToken(_ token: T) {
        lock.lock()
        _token.onNext(token)
        lock.unlock()
    }

    /// Monitors the source for `.unauthorized` error events and passes all other errors on. When an `.unauthorized` error is seen, `self` will get a new token and emit a signal that it's safe to retry the request.
    ///
    /// - Parameter source: An `Observable` (or like type) that emits errors.
    /// - Returns: A trigger that will emit when it's safe to retry the request.
    func trackErrors<O: ObservableConvertibleType>(for source: O) -> Observable<Void> where O.Element == Error {
        let lock = self.lock
        let relay = self.relay
        let error = source
            .asObservable()
            .map { error in
                guard (error as? TokenAcquisitionError) == .unauthorized else { throw error }
            }
            .flatMap { [unowned self] in  self.token }
            .do(onNext: {
                lock.lock()
                relay.onNext([=10=])
                lock.unlock()
            })
            .filter { _ in false }
            .map { _ in }

        return Observable.merge(token.skip(1).map { _ in }, error)
    }

    private let _token = ReplaySubject<T>.create(bufferSize: 1)
    private let relay = PublishSubject<T>()
    private let lock = NSRecursiveLock()
    private let disposeBag = DisposeBag()
}

extension ObservableConvertibleType where Element == Error {

    /// Monitors self for `.unauthorized` error events and passes all other errors on. When an `.unauthorized` error is seen, the `service` will get a new token and emit a signal that it's safe to retry the request.
    ///
    /// - Parameter service: A `TokenAcquisitionService` object that is being used to store the auth token for the request.
    /// - Returns: A trigger that will emit when it's safe to retry the request.
    public func renewToken<T>(with service: TokenAcquisitionService<T>) -> Observable<Void> {
        return service.trackErrors(for: self)
    }
}

将以上内容放入您的应用程序后,您只需在请求的末尾添加一个 .retryWhen { [=11=].renewToken(with: tokenAcquisitionService) }。如果令牌未经授权,请确保您的请求发出 ResponseError.unauthorized 并且服务将处理重试。

我使用 DispatchWorkItem 找到了解决问题的方法,并使用布尔值控制函数的入口:isTokenRefreshing。也许这不是最优雅的解决方案,但它确实有效。

因此,在我的 NetworkManager class 中,我添加了这两个新属性:

public var savedRequests: [DispatchWorkItem] = []
public var isTokenRefreshing = false

现在,在我的 SingleTrait 扩展中,每当我进入令牌刷新方法时,我都会将布尔值 isTokenRefreshing 设置为 true。所以,如果它是真的,我没有启动另一个请求,而是简单地抛出一个 RefreshTokenProcessInProgressException 并将当前请求保存在我的 savedRequests 数组中。

private func saveRequest(_ block: @escaping () -> Void) {
    // Save request to DispatchWorkItem array
    NetworkManager.shared.savedRequests.append( DispatchWorkItem {
        block()
    })
}

(当然,那个,如果token刷新成功你要记得继续所有保存在array里面的savedRequests,下面的代码里面还没描述)

嗯,我的 SingleTrait 扩展现在是这样的:

import Foundation
import Moya
import RxSwift
import Domain

public extension PrimitiveSequence where TraitType == SingleTrait, ElementType == Response {
    
    private var refreshTokenParameters: TokenParameters {
        TokenParameters(clientId: "pdappclient",
                grantType: "refresh_token",
                refreshToken: KeychainManager.shared.refreshToken)
    }

    func retryWithToken(target: MultiTarget) -> Single<E> {
        return self.catchError { error -> Single<Response> in
                    if case Moya.MoyaError.statusCode(let response) = error {
                        if self.isTokenExpiredError(error) {
                            return Single.error(error)
                        } else {
                            return self.parseError(response: response)
                        }
                    }
                    return Single.error(error)
                }
                .retryToken(target: target)
                .catchError { error -> Single<Response> in
                    if case Moya.MoyaError.statusCode(let response) = error {
                        return self.parseError(response: response)
                    }
                    return Single.error(error)
                }
    }

    private func retryToken(target: MultiTarget) -> Single<E> {
        let maxRetries = 1
        
        return self.retryWhen({ error in
            error
                    .enumerated()
                    .flatMap { (attempt, error) -> Observable<Int> in
                        if attempt >= maxRetries {
                            return Observable.error(error)
                        }
                        if self.isTokenExpiredError(error) {
                            return Observable<Int>.just(attempt + 1)
                        }
                        return Observable.error(error)
                    }
                    .flatMapFirst { _ -> Single<TokenResponse> in
                        if NetworkManager.shared.isTokenRefreshing {
                            self.saveRequest {
                                self.retryToken(target: target)
                            }
                            return Single.error(RefreshTokenProcessInProgressException())
                        } else {
                            return self.refreshTokenRequest()
                        }
                    }
                    .share()
                    .asObservable()
        })
    }
    
    private func refreshTokenRequest() -> Single<TokenResponse> {
        NetworkManager.shared.isTokenRefreshing = true
        
        return NetworkManager.shared.fetchData(fromApi: IdentityServerAPI
            .token(parameters: self.refreshTokenParameters))
            .do(onSuccess: { tokenResponse in
                KeychainManager.shared.accessToken = tokenResponse.accessToken
                KeychainManager.shared.refreshToken = tokenResponse.refreshToken
            }).catchError { error -> Single<TokenResponse> in
                return Single.error(InvalidGrantException())
        }
    }
    
    private func saveRequest(_ block: @escaping () -> Void) {
        // Save request to DispatchWorkItem array
        NetworkManager.shared.savedRequests.append( DispatchWorkItem {
            block()
        })
    }

    func parseError<E>(response: Response) -> Single<E> {
        if response.statusCode == 401 {
            // TODO
        }

        let decoder = JSONDecoder()
        if let errors = try? response.map([BaseResponseError].self, atKeyPath: "errors", using: decoder,
                failsOnEmptyData: true) {
            return Single.error(BaseAPIErrorResponse(errors: errors))
        }

        return Single.error(APIError2.unknown)
    }

    func isTokenExpiredError(_ error: Error) -> Bool {
        if let moyaError = error as? MoyaError {
            switch moyaError {
            case .statusCode(let response):
                if response.statusCode != 401 {
                    return false
                } else if response.data.count == 0 {
                    return true
                }
            default:
                break
            }
        }
        return false
    }

    func filterUnauthorized() -> Single<E> {
        flatMap { (response) -> Single<E> in
            if 200...299 ~= response.statusCode {
                return Single.just(response)
            } else if response.statusCode == 404 {
                return Single.just(response)
            } else {
                return Single.error(MoyaError.statusCode(response))
            }
        }
    }

    func asRetriableAuthenticated(target: MultiTarget) -> Single<Element> {
        filterUnauthorized()
                .retryWithToken(target: target)
                .filterStatusCode()
    }

    func filterStatusCode() -> Single<E> {
        flatMap { (response) -> Single<E> in
            if 200...299 ~= response.statusCode {
                return Single.just(response)
            } else {
                return self.parseError(response: response)
            }
        }
    }
}

在我的例子中,如果令牌刷新失败,在重试 N 次后,我会重新启动应用程序。因此,每当重新启动应用程序时,我都会再次将 isTokenRefreshing 设置为 false。

这是我找到的解决这个问题的方法。如果您有其他方法,请告诉我。