当内部发布者使用 subscribe(on:) 时,CombineLatest 运算符不会发出

CombineLatest operator is not emitting when inners publishers use subscribe(on:)

我观察到有关 CombineLatest 的意外行为,如果内部发布者有 subscribe(on:),则 CombineLatest 流不会发出任何值。

备注:

    func makePublisher() -> AnyPublisher<Int, Never> {
        Deferred {
            Future { promise in
                DispatchQueue.global(qos: .background).asyncAfter(deadline: .now() + 3) {
                    promise(.success(Int.random(in: 0...3)))
                }
            }
        }
        .subscribe(on: DispatchQueue.global())
        .receive(on: DispatchQueue.main)
        .eraseToAnyPublisher()
    }
    
    var cancellables = Set<AnyCancellable>()
    Publishers.CombineLatest(
        makePublisher(),
        makePublisher()
    )
    .sink { completion in
        print(completion)
    } receiveValue: { (a, b) in
        print(a, b)
    }.store(in: &cancellables)

这是合并错误还是预期行为?你知道如何设置这种内部可以定义自己的订阅调度程序的流吗?

是的,这是一个错误。我们可以将测试用例简化为:

import Combine
import Dispatch

let pub = Just("x")
    .subscribe(on: DispatchQueue.main)

let ticket = pub.combineLatest(pub)
    .sink(
        receiveCompletion: { print([=10=]) },
        receiveValue: { print([=10=]) })

这从不打印任何东西。但是,如果您注释掉 subscribe(on:) 运算符,它会打印预期的内容。如果您保留 subscribe(on:),但插入一些 print() 运算符,您会看到 CombineLatest 运算符从不向上游发送任何需求。

我建议您复制 CombineX reimplementation of CombineLatest 和它需要编译的实用程序(我认为是 LockLockedAtomic 的 CombineX 实现)。我也不知道 CombineX 版本是否有效,但如果它有问题,至少你有源代码并可以尝试修复它。