如何让Combine的flatMap完成全流?

How make Combine's flatMap to complete overall stream?

我有一些这样的代码

func a() -> AnyPublisher<Void, Never> {
    Future<Void, Never> { promise in
        DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
            print(1)
            promise(.success(()))
        }
    }
    .eraseToAnyPublisher()
}

func b() -> AnyPublisher<Void, Never> {
    Future<Void, Never> { promise in
        DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
            print(2)
            promise(.success(()))
        }
    }
    .eraseToAnyPublisher()
}

var tempBag = Set<AnyCancellable>()

let subject = CurrentValueSubject<Int, Never>(1)

subject
    .flatMap({ _ in a() })
    .flatMap({ _ in b() })
    .print()
    .sink(receiveCompletion: { _ in
        tempBag.removeAll()
    }, receiveValue: {  })
    .store(in: &tempBag)

因此,我在流的根目录中有一些无法完成的主题,在 flatMap 运算符中有一些可完成的发布者。我希望整个流在最后一个 flatMap 的发布者完成时完成。所以,我希望控制台看起来像这样:

receive subscription: (FlatMap)
request unlimited
1
2
receive value: (())
receive finished

但实际结果是

receive subscription: (FlatMap)
request unlimited
1
2
receive value: (())

我怎样才能做到这一点?

您遇到的问题是您的 SubjectCurrentValueSubject)永远不会完成,因此整个链永远不会完成。您需要的是一个发出单个值然后在您的序列顶部完成的发布者,以及一个等待它跟踪的所有发布者完成后再完成自身的中间体。

您已经有一个发布者做一件事然后完成...它由 a() 返回。要等到 a()b() 都完成,您可以使用 combineLatest,因为它创建的发布者只有在它组合的所有发布者都完成后才能完成。整个事情看起来像:

a()
.combineLatest(b())
.print()
.sink(receiveCompletion: { _ in
    tempBag.removeAll()
}, receiveValue: { _ in () })
.store(in: &tempBag)

与输出

receive subscription: (CombineLatest)
request unlimited
1
2
receive value: (((), ()))
receive finished