RxSwift refreshToken 使用重试(当:)

RxSwift refreshToken use retry(when:)

我为网络请求令牌刷新写了这样的代码。

enum NetworkingClient {
    private static func _makeUnaryCall<Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message>(
        call maker: @escaping () -> UnaryCall<Request, Response>
        ) -> Single<Response> {
        return Single<Response>.create { (handler) -> Disposable in
            let call = maker()
            call.response
                .whenComplete {
                    do {
                        handler(.success(try [=10=].get()))
                    } catch {
                        handler(.error(error))
                    }
                }
            return Disposables.create()
        }
        .subscribeOn(requestScheduler)
    }

    static func makeUnaryCall<Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message>(
        serverInfo: ServerInfo,
        call maker: @escaping () -> UnaryCall<Request, Response>
        ) -> Single<Response> {
        let gid = serverInfo.gid
        let uid = serverInfo.uid
        return self._makeUnaryCall(call: maker)
            .retryWhen { errorObservable -> Observable<Void> in
                return errorObservable.flatMap { error -> Observable<Void> in
                    if let errorCode = error as? NetworkingError, case .tokenExpired(let token) = errorCode {
                        return VoidToken.reLogin(token: token!, serverInfo: serverInfo)
                    }
                    throw error
                }            }
            .do(onError: { (err) in
                log.error("\(gid)-\(uid)-\(serverInfo.host)-\(serverInfo.port)-\(err)")
            })
            .observeOn(MainScheduler.instance)
    }
}

enum VoidToken {
    static var caches: [String: Observable<Void>] = [:]
    static let lock = NSLock()
    static func reLogin(token: Token, serverInfo: ServerInfo) -> Observable<Void> {
        VoidToken.lock.lock()
        if let refreshTokenReq = VoidToken.caches[token.accessToken] {
            VoidToken.lock.unlock()
            return refreshTokenReq
        }
        var service: LoginService? = LoginService(serverInfo: serverInfo)
        let refreshReq = service!
            .refreshToken(refreshToken: token.refreshToken)
            .retryWhen({ errorObservable in
                return errorObservable.flatMap { error -> Observable<Void> in
                    if let err = error as? GRPCStatus, err.code == .unavailable {
                        return .just(())
                    }
                    throw error
                }
            })
            .debug("refreshToken")
            .asObservable()
            .share(replay: 1, scope: .forever)
            .map { _ in }
            .do(afterCompleted: {
                service = nil
            })
        VoidToken.caches[token.accessToken] = refreshReq
        VoidToken.lock.unlock()
        return refreshReq
    }
}

enum NetworkingError: Error {
    case tokenExpired(Token?)
}

extension LoginService {
    func refreshToken(refreshToken: String, loginChatService: Bool = true) -> Single<Void> {
        let gid = self.gid
        let uid = self.uid
        return defaultImplementation
            .reLogin(refreshToken: refreshToken)
            .map { (rsp) -> Token in
                precondition(rsp.hasSess)
                AccountKit.shared.updateToken(pid: gid + uid, session: rsp.sess)
                let token = AccountKit.shared.fetchAuth(gid: gid, uid: uid)!
                log.warn("token refresh success, newToken: \(token.accessToken) newIMToken\(token.imToken)")
                return token
            }
            .flatMap { token -> Single<Void> in
                guard loginChatService else { return Single.just(()) }
                guard let chatService = AccountKit.shared.chatInfos.fetchWrapper(pid: gid + uid) else {
                    throw SourceContextError()
                }
                return chatService.rx.login(imToken: token.imToken, updateSyncTime: false)
                    .map { _ in () }
                    .catchErrorJustReturn(())
            }
    }
}

但是,我得到一个看起来像这样的日志

2021-07-19 16:24:24.208: refreshToken -> subscribed
2021-07-19 16:24:24.494: refreshToken -> isDisposed
2021-07-19 16:24:25.037: refreshToken -> subscribed
2021-07-19 16:24:25.251: refreshToken -> Event error(notLogin)
2021-07-19 16:24:25.251: refreshToken -> isDisposed

不知道为什么Refresh Token在没有ON NEXT
的情况下被Disposed 这是偶发问题,不知道是什么原因造成的
它的日志输出通常应该是这样的:

2021-07-22 10:04:45.794: refreshToken -> subscribed
2021-07-22 10:04:52.254: refreshToken -> Event next(())
2021-07-22 10:04:52.266: refreshToken -> Event completed
2021-07-22 10:04:52.266: refreshToken -> isDisposed

如果你能帮助我,我将不胜感激。

查看调试语句上方的 Observable 链:

let refreshReq = service!
    .refreshToken(refreshToken: token.refreshToken)
    .retryWhen({ errorObservable in
        return errorObservable.flatMap { error -> Observable<Void> in
            if let err = error as? GRPCStatus, err.code == .unavailable {
                return .just(())
            }
            throw error
        }
    })

refreshToken(refreshToken:)retryWhen 发出除 GRPCStatus 错误以外的任何类型的错误且 code 等于 .unavailable .

你说调试显示了一个名为 notLogin 的错误,这显然不是代码等于不可用的 GRPCStatus 错误。

另外,查看日志:

2021-07-19 16:24:24.208: refreshToken -> subscribed
2021-07-19 16:24:24.494: refreshToken -> isDisposed
2021-07-19 16:24:25.037: refreshToken -> subscribed
2021-07-19 16:24:25.251: refreshToken -> Event error(notLogin)
2021-07-19 16:24:25.251: refreshToken -> isDisposed

这告诉您,某些东西在订阅后 286 毫秒处理了 refreshToken disposable,然后在 543 毫秒后重新订阅(或其他订阅),又在 214 毫秒后发出错误...

对我来说,问题不在于您发布的代码,而是过早处置您的 reLogin observable 的代码。

顺便说一句,我写了一篇关于这个主题的文章,您可能会觉得有用:RxSwift and Handling Invalid Tokens