Combine`s subscribe(on:options:) 运算符

Combine`s subscribe(on:options:) operator

我对 subscribe(on:options:) 运算符有疑问。如果有人能帮我解决这个问题,我将不胜感激。

那么我们从文档中得到了什么:

Specifies the scheduler on which to perform subscribe, cancel, and request operations. In contrast with receive(on:options:), which affects downstream messages, subscribe(on:options:) changes the execution context of upstream messages.

此外,我从不同的文章中得到的是,除非我们明确指定 Scheduler 来接收我们的下游消息(使用 receive(on:options:)),否则消息将在 [=17= 上发送] 用于接收订阅。

此信息与我在执行过程中实际获得的信息不一致。

我有下一个代码:

Just("Some text")
    .map { _ in
        print("Map: \(Thread.isMainThread)")
    }
    .subscribe(on: DispatchQueue.global())
    .sink { _ in
        print("Sink: \(Thread.isMainThread)")
    }
    .store(in: &subscriptions)

我希望下一个输出:

Map: false
Sink: false

但我得到的是:

Map: true
Sink: false

当我使用 Sequence 发布者时,同样的事情发生了。

如果我交换 map 运算符和 subscribe 运算符的位置,我会收到我想要的:

Just("Some text")
    .subscribe(on: DispatchQueue.global())
    .map { _ in
        print("Map: \(Thread.isMainThread)")
    }
    .sink { _ in
        print("Sink: \(Thread.isMainThread)")
    }
    .store(in: &subscriptions)

输出:

Map: false
Sink: false

有趣的是,当我使用与我的自定义发布商的第一个列表相同的运算符顺序时,我收到了我想要的行为:

struct TestJust<Output>: Publisher {
    typealias Failure = Never
    
    private let value: Output
    
    init(_ output: Output) {
        self.value = output
    }
    
    func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
        subscriber.receive(subscription: Subscriptions.empty)
        _ = subscriber.receive(value)
        subscriber.receive(completion: .finished)
    }
}

TestJust("Some text")
    .map { _ in
        print("Map: \(Thread.isMainThread)")
    }
    .subscribe(on: DispatchQueue.global())
    .sink { _ in
        print("Sink: \(Thread.isMainThread)")
    }
    .store(in: &subscriptions)

输出:

Map: false
Sink: false

所以我认为要么我完全误解了所有这些机制,要么一些发布者故意选择线程来发布值(JustSequence -> MainURLSession.DataTaskPublisher -> Some of Background),这对我来说没有意义,因为在这种情况下为什么我们需要这个 subscribe(on:options:) for.

你能帮我理解我错过了什么吗?提前谢谢你。

首先要了解的是,消息既向上 管道流动,又向下 管道流动。流向向上管道(“上游”)的消息是:

  • 订阅实际表现(接收订阅)

  • 订阅者向上游发布者请求新值

  • 取消消息(这些从最终订阅者向上渗透)

流向 管道(“下游”)的消息是:

  • 完成,包括失败(错误)或正常完成(报告发布者发出了它的最后一个值)

好吧,正如文档明确指出的那样,subscribe(on:) 是关于前者的:向上游流动的消息。但是您实际上并没有在测试中跟踪 任何 那些 消息,因此 none 的结果反映了关于它们的任何信息!在订阅点上方插入适当的 handleEvents 运算符以查看内容向上流向管道(例如,实现其 receiveRequest: 参数):

Just("Some text")
    .handleEvents(receiveRequest: {
        _ in print("Handle1: \(Thread.isMainThread)")
    })
    .map // etc.

与此同时,您应该假设消息将在其上流动的线程下游(即值和完成)。你说:

Also, what I got from different articles is that unless we explicitly specify the Scheduler to receive our downstream messages on (using receive(on:options:)), messages will be send on the Scheduler used for receiving a subscription.

但这似乎是一个虚假的假设。并且您的代码没有任何内容以明确的方式确定下游发送线程。正如你所说的那样,你 可以 通过 receive(on:) 控制它,但如果你不这样做,我会说你必须 什么都不假设 关于此事。一些发布者确实会在后台线程上产生一个值,例如数据任务发布者,这是非常有意义的(数据任务完成处理程序也会发生同样的事情)。其他人没有。

可以假设的是receive(on:)以外的运算符通常不会改变值传递线程。但是操作员是否以及如何使用订阅线程来确定接收线程,这是您不应该假设的事情。要控制接收线程,就控制它!调用 receive(on:) 或什么都不假设。

举个例子,如果你把开场改成

Just("Some text")
    .receive(on: DispatchQueue.main)

然后你的 map 和你的 sink 都会报告它们正在主线程上接收值。为什么?因为您控制了接收线程。无论您在任何 subscribe(on:) 命令中说什么,这都有效。完全是两码事。

也许如果你调用subscribe(on:)但你不调用receive(on:),一些关于下游发送线程的事情是由subscribe(on:)线程决定的,但我肯定不会'不要依赖于任何硬性规定;文档中没有这么说!相反,不要那样做。如果您实施 subscribe(on:),也实施 receive(on:),以便 负责发生的事情。