当内部发布者使用 subscribe(on:) 时,CombineLatest 运算符不会发出
CombineLatest operator is not emitting when inners publishers use subscribe(on:)
我观察到有关 CombineLatest 的意外行为,如果内部发布者有 subscribe(on:)
,则 CombineLatest 流不会发出任何值。
备注:
- 使用 Zip 运算符工作
- 将 subscribe(on:) / receive(on:) 移动到 combineLatest 流也可以。但是在这个特定的用例中,内部发布者正在定义他们的 subscribe/receive
因为在其他地方(重新)使用了。
- 只对一个内部发布者添加 subscribe(on:)/receive(on:) 也可以,所以问题就出在两者都有的时候。
func makePublisher() -> AnyPublisher<Int, Never> {
Deferred {
Future { promise in
DispatchQueue.global(qos: .background).asyncAfter(deadline: .now() + 3) {
promise(.success(Int.random(in: 0...3)))
}
}
}
.subscribe(on: DispatchQueue.global())
.receive(on: DispatchQueue.main)
.eraseToAnyPublisher()
}
var cancellables = Set<AnyCancellable>()
Publishers.CombineLatest(
makePublisher(),
makePublisher()
)
.sink { completion in
print(completion)
} receiveValue: { (a, b) in
print(a, b)
}.store(in: &cancellables)
这是合并错误还是预期行为?你知道如何设置这种内部可以定义自己的订阅调度程序的流吗?
是的,这是一个错误。我们可以将测试用例简化为:
import Combine
import Dispatch
let pub = Just("x")
.subscribe(on: DispatchQueue.main)
let ticket = pub.combineLatest(pub)
.sink(
receiveCompletion: { print([=10=]) },
receiveValue: { print([=10=]) })
这从不打印任何东西。但是,如果您注释掉 subscribe(on:)
运算符,它会打印预期的内容。如果您保留 subscribe(on:)
,但插入一些 print()
运算符,您会看到 CombineLatest
运算符从不向上游发送任何需求。
我建议您复制 CombineX reimplementation of CombineLatest
和它需要编译的实用程序(我认为是 Lock
和 LockedAtomic
的 CombineX 实现)。我也不知道 CombineX 版本是否有效,但如果它有问题,至少你有源代码并可以尝试修复它。
我观察到有关 CombineLatest 的意外行为,如果内部发布者有 subscribe(on:)
,则 CombineLatest 流不会发出任何值。
备注:
- 使用 Zip 运算符工作
- 将 subscribe(on:) / receive(on:) 移动到 combineLatest 流也可以。但是在这个特定的用例中,内部发布者正在定义他们的 subscribe/receive 因为在其他地方(重新)使用了。
- 只对一个内部发布者添加 subscribe(on:)/receive(on:) 也可以,所以问题就出在两者都有的时候。
func makePublisher() -> AnyPublisher<Int, Never> {
Deferred {
Future { promise in
DispatchQueue.global(qos: .background).asyncAfter(deadline: .now() + 3) {
promise(.success(Int.random(in: 0...3)))
}
}
}
.subscribe(on: DispatchQueue.global())
.receive(on: DispatchQueue.main)
.eraseToAnyPublisher()
}
var cancellables = Set<AnyCancellable>()
Publishers.CombineLatest(
makePublisher(),
makePublisher()
)
.sink { completion in
print(completion)
} receiveValue: { (a, b) in
print(a, b)
}.store(in: &cancellables)
这是合并错误还是预期行为?你知道如何设置这种内部可以定义自己的订阅调度程序的流吗?
是的,这是一个错误。我们可以将测试用例简化为:
import Combine
import Dispatch
let pub = Just("x")
.subscribe(on: DispatchQueue.main)
let ticket = pub.combineLatest(pub)
.sink(
receiveCompletion: { print([=10=]) },
receiveValue: { print([=10=]) })
这从不打印任何东西。但是,如果您注释掉 subscribe(on:)
运算符,它会打印预期的内容。如果您保留 subscribe(on:)
,但插入一些 print()
运算符,您会看到 CombineLatest
运算符从不向上游发送任何需求。
我建议您复制 CombineX reimplementation of CombineLatest
和它需要编译的实用程序(我认为是 Lock
和 LockedAtomic
的 CombineX 实现)。我也不知道 CombineX 版本是否有效,但如果它有问题,至少你有源代码并可以尝试修复它。