Empty() 发布者不发送完成
Empty() publisher does not send completion
在此代码中,我希望 Empty()
发布者将完成发送给 .sink
订阅者,但没有发送完成。
func testEmpty () {
let x = XCTestExpectation()
let subject = PassthroughSubject<Int, Never>()
emptyOrSubjectPublisher(subject).sink(receiveCompletion: { completion in
dump(completion)
}, receiveValue: { value in
dump(value)
}).store(in: &cancellables)
subject.send(0)
wait(for: [x], timeout: 10.0)
}
func emptyOrSubjectPublisher (_ subject: PassthroughSubject<Int, Never>) -> AnyPublisher<Int, Never> {
subject
.flatMap { (i: Int) -> AnyPublisher<Int, Never> in
if i == 1 {
return subject.eraseToAnyPublisher()
} else {
return Empty().eraseToAnyPublisher()
}
}
.eraseToAnyPublisher()
}
为什么 emptyOrSubjectPublisher
没有收到完成?
Empty完成了,但是整体pipeline没有完成,因为初始的Subject还没有完成。产生 Empty 的内部管道 (flatMap
) 已经“吞噬”了完成。这是预期的行为。
您可以通过简单地在 flatMap
中生成一个 Just 来更容易地看到这一点,例如Just(100)
:
subject
.flatMap {_ in Just(100) }
.sink(receiveCompletion: { completion in
print(completion)
}, receiveValue: { value in
print(value)
}).store(in: &cancellables)
subject.send(1)
你知道,我也知道,一个 Just 发出一次并完成。但是,尽管 Just 的 value 到达管道,但没有完成。
您可以很容易地看出为什么它会这样工作。如果我们从我们的发布者那里得到一个潜在的值序列,但是在 flatMap
中产生的某个中间发布者有能力完成整个管道并提前结束它,那将是非常错误的。
(看看我的 https://www.apeth.com/UnderstandingCombine/operators/operatorsTransformersBlockers/operatorsflatmap.html 我也表达了同样的观点。)
如果目标是在管道中发送完成,则需要完成的是 subject
。例如,您可以说
func emptyOrSubjectPublisher (_ subject: PassthroughSubject<Int, Never>) -> AnyPublisher<Int, Never> {
subject
.flatMap { (i: Int) -> AnyPublisher<Int, Never> in
if i == 1 {
return subject.eraseToAnyPublisher()
} else {
subject.send(completion: .finished) // <--
return Empty().eraseToAnyPublisher()
}
}
.eraseToAnyPublisher()
}
[但是请注意,你的整个emptyOrSubjectPublisher
是奇特的;目前尚不清楚它的目的是什么。当 i
是 1
时返回 subject
也有点毫无意义,因为 subject
在我们到达这里时已经发布了 1
,并且不是现在要发布更多内容。因此,如果您在开始时发送 1
,您将不会收到 1
作为值,因为您的 flatMap
已经吞下它并产生了一个不会发布的发布者.]
在此代码中,我希望 Empty()
发布者将完成发送给 .sink
订阅者,但没有发送完成。
func testEmpty () {
let x = XCTestExpectation()
let subject = PassthroughSubject<Int, Never>()
emptyOrSubjectPublisher(subject).sink(receiveCompletion: { completion in
dump(completion)
}, receiveValue: { value in
dump(value)
}).store(in: &cancellables)
subject.send(0)
wait(for: [x], timeout: 10.0)
}
func emptyOrSubjectPublisher (_ subject: PassthroughSubject<Int, Never>) -> AnyPublisher<Int, Never> {
subject
.flatMap { (i: Int) -> AnyPublisher<Int, Never> in
if i == 1 {
return subject.eraseToAnyPublisher()
} else {
return Empty().eraseToAnyPublisher()
}
}
.eraseToAnyPublisher()
}
为什么 emptyOrSubjectPublisher
没有收到完成?
Empty完成了,但是整体pipeline没有完成,因为初始的Subject还没有完成。产生 Empty 的内部管道 (flatMap
) 已经“吞噬”了完成。这是预期的行为。
您可以通过简单地在 flatMap
中生成一个 Just 来更容易地看到这一点,例如Just(100)
:
subject
.flatMap {_ in Just(100) }
.sink(receiveCompletion: { completion in
print(completion)
}, receiveValue: { value in
print(value)
}).store(in: &cancellables)
subject.send(1)
你知道,我也知道,一个 Just 发出一次并完成。但是,尽管 Just 的 value 到达管道,但没有完成。
您可以很容易地看出为什么它会这样工作。如果我们从我们的发布者那里得到一个潜在的值序列,但是在 flatMap
中产生的某个中间发布者有能力完成整个管道并提前结束它,那将是非常错误的。
(看看我的 https://www.apeth.com/UnderstandingCombine/operators/operatorsTransformersBlockers/operatorsflatmap.html 我也表达了同样的观点。)
如果目标是在管道中发送完成,则需要完成的是 subject
。例如,您可以说
func emptyOrSubjectPublisher (_ subject: PassthroughSubject<Int, Never>) -> AnyPublisher<Int, Never> {
subject
.flatMap { (i: Int) -> AnyPublisher<Int, Never> in
if i == 1 {
return subject.eraseToAnyPublisher()
} else {
subject.send(completion: .finished) // <--
return Empty().eraseToAnyPublisher()
}
}
.eraseToAnyPublisher()
}
[但是请注意,你的整个emptyOrSubjectPublisher
是奇特的;目前尚不清楚它的目的是什么。当 i
是 1
时返回 subject
也有点毫无意义,因为 subject
在我们到达这里时已经发布了 1
,并且不是现在要发布更多内容。因此,如果您在开始时发送 1
,您将不会收到 1
作为值,因为您的 flatMap
已经吞下它并产生了一个不会发布的发布者.]