使用 Flatmap 和多个订阅者时组合多次调用的 Future 块

Combine Future block called multiple times when using Flatmap and multiple subscribers

我已经成功使用BrightFutures in my apps mainly for async network requests. I decided it was time to see if I could migrate to Combine. However what I find is that when I combine two Futures using flatMap with two subscribers my second Future代码块被执行了两次。下面是一些示例代码,将 运行 直接放在 playground 中:

import Combine
import Foundation

extension Publisher {
    func showActivityIndicatorWhileWaiting(message: String) -> AnyCancellable {
        let cancellable = sink(receiveCompletion: { _ in Swift.print("Hide activity indicator") }, receiveValue: { (_) in })
        Swift.print("Busy: \(message)")
        return cancellable
    }
}

enum ServerErrors: Error {
    case authenticationFailed
    case noConnection
    case timeout
}

func authenticate(username: String, password: String) -> Future<Bool, ServerErrors> {
    Future { promise in
        print("Calling server to authenticate")
        DispatchQueue.main.async {
            promise(.success(true))
        }
    }
}

func downloadUserInfo(username: String) -> Future<String, ServerErrors> {
    Future { promise in
        print("Downloading user info")
        DispatchQueue.main.async {
            promise(.success("decoded user data"))
        }
    }
}

func authenticateAndDownloadUserInfo(username: String, password: String) -> some Publisher {
    return authenticate(username: username, password: password).flatMap { (isAuthenticated) -> Future<String, ServerErrors> in
        guard isAuthenticated else {
            return Future {[=10=](.failure(.authenticationFailed)) }
        }
        return downloadUserInfo(username: username)
    }
}

let future = authenticateAndDownloadUserInfo(username: "stack", password: "overflow")
let cancellable2 = future.showActivityIndicatorWhileWaiting(message: "Please wait downloading")
let cancellable1 = future.sink(receiveCompletion: { (completion) in
    switch completion {
    case .finished:
        print("Completed without errors.")
    case .failure(let error):
        print("received error: '\(error)'")
    }
}) { (output) in
    print("received userInfo: '\(output)'")
}

该代码模拟进行两次网络调用,然后 flatmap 将它们作为一个单元组合在一起,要么成功,要么失败。 结果输出为:

Calling server to authenticate
Busy: Please wait downloading
Downloading user info
Downloading user info     <---- unexpected second network call
Hide activity indicator
received userInfo: 'decoded user data'
Completed without errors.

问题是 downloadUserInfo((username:) 似乎被调用了两次。如果我只有一个订阅者,那么 downloadUserInfo((username:) 只会被调用一次。我有一个丑陋的解决方案,将 flatMap 包装在另一个 Future 中,但感觉我遗漏了一些简单的东西。有什么想法吗?

当您使用 let future 创建实际发布者时,附加 .share 运算符,以便您的两个订阅者订阅单个拆分管道。

编辑: 正如我在评论中所说,我会在您的管道中进行一些其他更改。这是建议的重写。其中一些更改是风格上的/装饰性的,作为我如何编写 Combine 代码的说明;你可以接受或离开它。但其他事情几乎 de rigueur。您需要在 Futures 周围使用 Deferred 包装器来防止过早联网(即在订阅发生之前)。您需要 store 您的管道,否则它会在网络开始之前不存在。我还用 .handleEvents 代替了您的第二个订阅者,但如果您将上述解决方案与 .share 一起使用,如果您确实愿意,您仍然可以使用第二个订阅者。这是一个完整的例子;您可以直接将其复制并粘贴到项目中。

class ViewController: UIViewController {
    enum ServerError: Error {
        case authenticationFailed
        case noConnection
        case timeout
    }
    var storage = Set<AnyCancellable>()
    func authenticate(username: String, password: String) -> AnyPublisher<Bool, ServerError> {
        Deferred {
            Future { promise in
                print("Calling server to authenticate")
                DispatchQueue.main.async {
                    promise(.success(true))
                }
            }
        }.eraseToAnyPublisher()
    }
    func downloadUserInfo(username: String) -> AnyPublisher<String, ServerError> {
        Deferred {
            Future { promise in
                print("Downloading user info")
                DispatchQueue.main.async {
                    promise(.success("decoded user data"))
                }
            }
        }.eraseToAnyPublisher()
    }
    func authenticateAndDownloadUserInfo(username: String, password: String) -> AnyPublisher<String, ServerError> {
        let authenticate = self.authenticate(username: username, password: password)
        let pipeline = authenticate.flatMap { isAuthenticated -> AnyPublisher<String, ServerError> in
            if isAuthenticated {
                return self.downloadUserInfo(username: username)
            } else {
                return Fail<String, ServerError>(error: .authenticationFailed).eraseToAnyPublisher()
            }
        }
        return pipeline.eraseToAnyPublisher()
    }
    override func viewDidLoad() {
        super.viewDidLoad()
        authenticateAndDownloadUserInfo(username: "stack", password: "overflow")
            .handleEvents(
                receiveSubscription: { _ in print("start the spinner!") },
                receiveCompletion: { _ in print("stop the spinner!") }
        ).sink(receiveCompletion: {
            switch [=10=] {
            case .finished:
                print("Completed without errors.")
            case .failure(let error):
                print("received error: '\(error)'")
            }
        }) {
            print("received userInfo: '\([=10=])'")
        }.store(in: &self.storage)
    }
}

输出:

start the spinner!
Calling server to authenticate
Downloading user info
received userInfo: 'decoded user data'
stop the spinner!
Completed without errors.