使用 Flatmap 和多个订阅者时组合多次调用的 Future 块
Combine Future block called multiple times when using Flatmap and multiple subscribers
我已经成功使用BrightFutures in my apps mainly for async network requests. I decided it was time to see if I could migrate to Combine. However what I find is that when I combine two Futures using flatMap with two subscribers my second Future代码块被执行了两次。下面是一些示例代码,将 运行 直接放在 playground 中:
import Combine
import Foundation
extension Publisher {
func showActivityIndicatorWhileWaiting(message: String) -> AnyCancellable {
let cancellable = sink(receiveCompletion: { _ in Swift.print("Hide activity indicator") }, receiveValue: { (_) in })
Swift.print("Busy: \(message)")
return cancellable
}
}
enum ServerErrors: Error {
case authenticationFailed
case noConnection
case timeout
}
func authenticate(username: String, password: String) -> Future<Bool, ServerErrors> {
Future { promise in
print("Calling server to authenticate")
DispatchQueue.main.async {
promise(.success(true))
}
}
}
func downloadUserInfo(username: String) -> Future<String, ServerErrors> {
Future { promise in
print("Downloading user info")
DispatchQueue.main.async {
promise(.success("decoded user data"))
}
}
}
func authenticateAndDownloadUserInfo(username: String, password: String) -> some Publisher {
return authenticate(username: username, password: password).flatMap { (isAuthenticated) -> Future<String, ServerErrors> in
guard isAuthenticated else {
return Future {[=10=](.failure(.authenticationFailed)) }
}
return downloadUserInfo(username: username)
}
}
let future = authenticateAndDownloadUserInfo(username: "stack", password: "overflow")
let cancellable2 = future.showActivityIndicatorWhileWaiting(message: "Please wait downloading")
let cancellable1 = future.sink(receiveCompletion: { (completion) in
switch completion {
case .finished:
print("Completed without errors.")
case .failure(let error):
print("received error: '\(error)'")
}
}) { (output) in
print("received userInfo: '\(output)'")
}
该代码模拟进行两次网络调用,然后 flatmap
将它们作为一个单元组合在一起,要么成功,要么失败。
结果输出为:
Calling server to authenticate
Busy: Please wait downloading
Downloading user info
Downloading user info
<---- unexpected second network call
Hide activity indicator
received userInfo: 'decoded user data'
Completed without errors.
问题是 downloadUserInfo((username:)
似乎被调用了两次。如果我只有一个订阅者,那么 downloadUserInfo((username:)
只会被调用一次。我有一个丑陋的解决方案,将 flatMap
包装在另一个 Future
中,但感觉我遗漏了一些简单的东西。有什么想法吗?
当您使用 let future
创建实际发布者时,附加 .share
运算符,以便您的两个订阅者订阅单个拆分管道。
编辑: 正如我在评论中所说,我会在您的管道中进行一些其他更改。这是建议的重写。其中一些更改是风格上的/装饰性的,作为我如何编写 Combine 代码的说明;你可以接受或离开它。但其他事情几乎 de rigueur。您需要在 Futures 周围使用 Deferred 包装器来防止过早联网(即在订阅发生之前)。您需要 store
您的管道,否则它会在网络开始之前不存在。我还用 .handleEvents
代替了您的第二个订阅者,但如果您将上述解决方案与 .share
一起使用,如果您确实愿意,您仍然可以使用第二个订阅者。这是一个完整的例子;您可以直接将其复制并粘贴到项目中。
class ViewController: UIViewController {
enum ServerError: Error {
case authenticationFailed
case noConnection
case timeout
}
var storage = Set<AnyCancellable>()
func authenticate(username: String, password: String) -> AnyPublisher<Bool, ServerError> {
Deferred {
Future { promise in
print("Calling server to authenticate")
DispatchQueue.main.async {
promise(.success(true))
}
}
}.eraseToAnyPublisher()
}
func downloadUserInfo(username: String) -> AnyPublisher<String, ServerError> {
Deferred {
Future { promise in
print("Downloading user info")
DispatchQueue.main.async {
promise(.success("decoded user data"))
}
}
}.eraseToAnyPublisher()
}
func authenticateAndDownloadUserInfo(username: String, password: String) -> AnyPublisher<String, ServerError> {
let authenticate = self.authenticate(username: username, password: password)
let pipeline = authenticate.flatMap { isAuthenticated -> AnyPublisher<String, ServerError> in
if isAuthenticated {
return self.downloadUserInfo(username: username)
} else {
return Fail<String, ServerError>(error: .authenticationFailed).eraseToAnyPublisher()
}
}
return pipeline.eraseToAnyPublisher()
}
override func viewDidLoad() {
super.viewDidLoad()
authenticateAndDownloadUserInfo(username: "stack", password: "overflow")
.handleEvents(
receiveSubscription: { _ in print("start the spinner!") },
receiveCompletion: { _ in print("stop the spinner!") }
).sink(receiveCompletion: {
switch [=10=] {
case .finished:
print("Completed without errors.")
case .failure(let error):
print("received error: '\(error)'")
}
}) {
print("received userInfo: '\([=10=])'")
}.store(in: &self.storage)
}
}
输出:
start the spinner!
Calling server to authenticate
Downloading user info
received userInfo: 'decoded user data'
stop the spinner!
Completed without errors.
我已经成功使用BrightFutures in my apps mainly for async network requests. I decided it was time to see if I could migrate to Combine. However what I find is that when I combine two Futures using flatMap with two subscribers my second Future代码块被执行了两次。下面是一些示例代码,将 运行 直接放在 playground 中:
import Combine
import Foundation
extension Publisher {
func showActivityIndicatorWhileWaiting(message: String) -> AnyCancellable {
let cancellable = sink(receiveCompletion: { _ in Swift.print("Hide activity indicator") }, receiveValue: { (_) in })
Swift.print("Busy: \(message)")
return cancellable
}
}
enum ServerErrors: Error {
case authenticationFailed
case noConnection
case timeout
}
func authenticate(username: String, password: String) -> Future<Bool, ServerErrors> {
Future { promise in
print("Calling server to authenticate")
DispatchQueue.main.async {
promise(.success(true))
}
}
}
func downloadUserInfo(username: String) -> Future<String, ServerErrors> {
Future { promise in
print("Downloading user info")
DispatchQueue.main.async {
promise(.success("decoded user data"))
}
}
}
func authenticateAndDownloadUserInfo(username: String, password: String) -> some Publisher {
return authenticate(username: username, password: password).flatMap { (isAuthenticated) -> Future<String, ServerErrors> in
guard isAuthenticated else {
return Future {[=10=](.failure(.authenticationFailed)) }
}
return downloadUserInfo(username: username)
}
}
let future = authenticateAndDownloadUserInfo(username: "stack", password: "overflow")
let cancellable2 = future.showActivityIndicatorWhileWaiting(message: "Please wait downloading")
let cancellable1 = future.sink(receiveCompletion: { (completion) in
switch completion {
case .finished:
print("Completed without errors.")
case .failure(let error):
print("received error: '\(error)'")
}
}) { (output) in
print("received userInfo: '\(output)'")
}
该代码模拟进行两次网络调用,然后 flatmap
将它们作为一个单元组合在一起,要么成功,要么失败。
结果输出为:
Calling server to authenticate
Busy: Please wait downloading
Downloading user info
Downloading user info
<---- unexpected second network call
Hide activity indicator
received userInfo: 'decoded user data'
Completed without errors.
问题是 downloadUserInfo((username:)
似乎被调用了两次。如果我只有一个订阅者,那么 downloadUserInfo((username:)
只会被调用一次。我有一个丑陋的解决方案,将 flatMap
包装在另一个 Future
中,但感觉我遗漏了一些简单的东西。有什么想法吗?
当您使用 let future
创建实际发布者时,附加 .share
运算符,以便您的两个订阅者订阅单个拆分管道。
编辑: 正如我在评论中所说,我会在您的管道中进行一些其他更改。这是建议的重写。其中一些更改是风格上的/装饰性的,作为我如何编写 Combine 代码的说明;你可以接受或离开它。但其他事情几乎 de rigueur。您需要在 Futures 周围使用 Deferred 包装器来防止过早联网(即在订阅发生之前)。您需要 store
您的管道,否则它会在网络开始之前不存在。我还用 .handleEvents
代替了您的第二个订阅者,但如果您将上述解决方案与 .share
一起使用,如果您确实愿意,您仍然可以使用第二个订阅者。这是一个完整的例子;您可以直接将其复制并粘贴到项目中。
class ViewController: UIViewController {
enum ServerError: Error {
case authenticationFailed
case noConnection
case timeout
}
var storage = Set<AnyCancellable>()
func authenticate(username: String, password: String) -> AnyPublisher<Bool, ServerError> {
Deferred {
Future { promise in
print("Calling server to authenticate")
DispatchQueue.main.async {
promise(.success(true))
}
}
}.eraseToAnyPublisher()
}
func downloadUserInfo(username: String) -> AnyPublisher<String, ServerError> {
Deferred {
Future { promise in
print("Downloading user info")
DispatchQueue.main.async {
promise(.success("decoded user data"))
}
}
}.eraseToAnyPublisher()
}
func authenticateAndDownloadUserInfo(username: String, password: String) -> AnyPublisher<String, ServerError> {
let authenticate = self.authenticate(username: username, password: password)
let pipeline = authenticate.flatMap { isAuthenticated -> AnyPublisher<String, ServerError> in
if isAuthenticated {
return self.downloadUserInfo(username: username)
} else {
return Fail<String, ServerError>(error: .authenticationFailed).eraseToAnyPublisher()
}
}
return pipeline.eraseToAnyPublisher()
}
override func viewDidLoad() {
super.viewDidLoad()
authenticateAndDownloadUserInfo(username: "stack", password: "overflow")
.handleEvents(
receiveSubscription: { _ in print("start the spinner!") },
receiveCompletion: { _ in print("stop the spinner!") }
).sink(receiveCompletion: {
switch [=10=] {
case .finished:
print("Completed without errors.")
case .failure(let error):
print("received error: '\(error)'")
}
}) {
print("received userInfo: '\([=10=])'")
}.store(in: &self.storage)
}
}
输出:
start the spinner!
Calling server to authenticate
Downloading user info
received userInfo: 'decoded user data'
stop the spinner!
Completed without errors.