如何让Combine的flatMap完成全流?
How make Combine's flatMap to complete overall stream?
我有一些这样的代码
func a() -> AnyPublisher<Void, Never> {
Future<Void, Never> { promise in
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
print(1)
promise(.success(()))
}
}
.eraseToAnyPublisher()
}
func b() -> AnyPublisher<Void, Never> {
Future<Void, Never> { promise in
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
print(2)
promise(.success(()))
}
}
.eraseToAnyPublisher()
}
var tempBag = Set<AnyCancellable>()
let subject = CurrentValueSubject<Int, Never>(1)
subject
.flatMap({ _ in a() })
.flatMap({ _ in b() })
.print()
.sink(receiveCompletion: { _ in
tempBag.removeAll()
}, receiveValue: { })
.store(in: &tempBag)
因此,我在流的根目录中有一些无法完成的主题,在 flatMap 运算符中有一些可完成的发布者。我希望整个流在最后一个 flatMap 的发布者完成时完成。所以,我希望控制台看起来像这样:
receive subscription: (FlatMap)
request unlimited
1
2
receive value: (())
receive finished
但实际结果是
receive subscription: (FlatMap)
request unlimited
1
2
receive value: (())
我怎样才能做到这一点?
您遇到的问题是您的 Subject
(CurrentValueSubject
)永远不会完成,因此整个链永远不会完成。您需要的是一个发出单个值然后在您的序列顶部完成的发布者,以及一个等待它跟踪的所有发布者完成后再完成自身的中间体。
您已经有一个发布者做一件事然后完成...它由 a()
返回。要等到 a()
和 b()
都完成,您可以使用 combineLatest
,因为它创建的发布者只有在它组合的所有发布者都完成后才能完成。整个事情看起来像:
a()
.combineLatest(b())
.print()
.sink(receiveCompletion: { _ in
tempBag.removeAll()
}, receiveValue: { _ in () })
.store(in: &tempBag)
与输出
receive subscription: (CombineLatest)
request unlimited
1
2
receive value: (((), ()))
receive finished
我有一些这样的代码
func a() -> AnyPublisher<Void, Never> {
Future<Void, Never> { promise in
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
print(1)
promise(.success(()))
}
}
.eraseToAnyPublisher()
}
func b() -> AnyPublisher<Void, Never> {
Future<Void, Never> { promise in
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
print(2)
promise(.success(()))
}
}
.eraseToAnyPublisher()
}
var tempBag = Set<AnyCancellable>()
let subject = CurrentValueSubject<Int, Never>(1)
subject
.flatMap({ _ in a() })
.flatMap({ _ in b() })
.print()
.sink(receiveCompletion: { _ in
tempBag.removeAll()
}, receiveValue: { })
.store(in: &tempBag)
因此,我在流的根目录中有一些无法完成的主题,在 flatMap 运算符中有一些可完成的发布者。我希望整个流在最后一个 flatMap 的发布者完成时完成。所以,我希望控制台看起来像这样:
receive subscription: (FlatMap)
request unlimited
1
2
receive value: (())
receive finished
但实际结果是
receive subscription: (FlatMap)
request unlimited
1
2
receive value: (())
我怎样才能做到这一点?
您遇到的问题是您的 Subject
(CurrentValueSubject
)永远不会完成,因此整个链永远不会完成。您需要的是一个发出单个值然后在您的序列顶部完成的发布者,以及一个等待它跟踪的所有发布者完成后再完成自身的中间体。
您已经有一个发布者做一件事然后完成...它由 a()
返回。要等到 a()
和 b()
都完成,您可以使用 combineLatest
,因为它创建的发布者只有在它组合的所有发布者都完成后才能完成。整个事情看起来像:
a()
.combineLatest(b())
.print()
.sink(receiveCompletion: { _ in
tempBag.removeAll()
}, receiveValue: { _ in () })
.store(in: &tempBag)
与输出
receive subscription: (CombineLatest)
request unlimited
1
2
receive value: (((), ()))
receive finished