Combine`s subscribe(on:options:) 运算符
Combine`s subscribe(on:options:) operator
我对 subscribe(on:options:) 运算符有疑问。如果有人能帮我解决这个问题,我将不胜感激。
那么我们从文档中得到了什么:
Specifies the scheduler on which to perform subscribe, cancel, and request operations.
In contrast with receive(on:options:), which affects downstream messages, subscribe(on:options:) changes the execution context of upstream messages.
此外,我从不同的文章中得到的是,除非我们明确指定 Scheduler
来接收我们的下游消息(使用 receive(on:options:)
),否则消息将在 [=17= 上发送] 用于接收订阅。
此信息与我在执行过程中实际获得的信息不一致。
我有下一个代码:
Just("Some text")
.map { _ in
print("Map: \(Thread.isMainThread)")
}
.subscribe(on: DispatchQueue.global())
.sink { _ in
print("Sink: \(Thread.isMainThread)")
}
.store(in: &subscriptions)
我希望下一个输出:
Map: false
Sink: false
但我得到的是:
Map: true
Sink: false
当我使用 Sequence
发布者时,同样的事情发生了。
如果我交换 map
运算符和 subscribe
运算符的位置,我会收到我想要的:
Just("Some text")
.subscribe(on: DispatchQueue.global())
.map { _ in
print("Map: \(Thread.isMainThread)")
}
.sink { _ in
print("Sink: \(Thread.isMainThread)")
}
.store(in: &subscriptions)
输出:
Map: false
Sink: false
有趣的是,当我使用与我的自定义发布商的第一个列表相同的运算符顺序时,我收到了我想要的行为:
struct TestJust<Output>: Publisher {
typealias Failure = Never
private let value: Output
init(_ output: Output) {
self.value = output
}
func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
subscriber.receive(subscription: Subscriptions.empty)
_ = subscriber.receive(value)
subscriber.receive(completion: .finished)
}
}
TestJust("Some text")
.map { _ in
print("Map: \(Thread.isMainThread)")
}
.subscribe(on: DispatchQueue.global())
.sink { _ in
print("Sink: \(Thread.isMainThread)")
}
.store(in: &subscriptions)
输出:
Map: false
Sink: false
所以我认为要么我完全误解了所有这些机制,要么一些发布者故意选择线程来发布值(Just
,Sequence
-> Main
,URLSession.DataTaskPublisher
-> Some of Background
),这对我来说没有意义,因为在这种情况下为什么我们需要这个 subscribe(on:options:)
for.
你能帮我理解我错过了什么吗?提前谢谢你。
首先要了解的是,消息既向上 管道流动,又向下 管道流动。流向向上管道(“上游”)的消息是:
订阅实际表现(接收订阅)
订阅者向上游发布者请求新值
取消消息(这些从最终订阅者向上渗透)
流向 管道(“下游”)的消息是:
值
完成,包括失败(错误)或正常完成(报告发布者发出了它的最后一个值)
好吧,正如文档明确指出的那样,subscribe(on:)
是关于前者的:向上游流动的消息。但是您实际上并没有在测试中跟踪 任何 的 那些 消息,因此 none 的结果反映了关于它们的任何信息!在订阅点上方插入适当的 handleEvents
运算符以查看内容向上流向管道(例如,实现其 receiveRequest:
参数):
Just("Some text")
.handleEvents(receiveRequest: {
_ in print("Handle1: \(Thread.isMainThread)")
})
.map // etc.
与此同时,您应该不假设消息将在其上流动的线程下游(即值和完成)。你说:
Also, what I got from different articles is that unless we explicitly specify the Scheduler to receive our downstream messages on (using receive(on:options:)
), messages will be send on the Scheduler used for receiving a subscription.
但这似乎是一个虚假的假设。并且您的代码没有任何内容以明确的方式确定下游发送线程。正如你所说的那样,你 可以 通过 receive(on:)
控制它,但如果你不这样做,我会说你必须 什么都不假设 关于此事。一些发布者确实会在后台线程上产生一个值,例如数据任务发布者,这是非常有意义的(数据任务完成处理程序也会发生同样的事情)。其他人没有。
您可以假设的是receive(on:)
以外的运算符通常不会改变值传递线程。但是操作员是否以及如何使用订阅线程来确定接收线程,这是您不应该假设的事情。要控制接收线程,就控制它!调用 receive(on:)
或什么都不假设。
举个例子,如果你把开场改成
Just("Some text")
.receive(on: DispatchQueue.main)
然后你的 map
和你的 sink
都会报告它们正在主线程上接收值。为什么?因为您控制了接收线程。无论您在任何 subscribe(on:)
命令中说什么,这都有效。完全是两码事。
也许如果你调用subscribe(on:)
但你不调用receive(on:)
,一些关于下游发送线程的事情是由subscribe(on:)
线程决定的,但我肯定不会'不要依赖于任何硬性规定;文档中没有这么说!相反,不要那样做。如果您实施 subscribe(on:)
,也实施 receive(on:)
,以便 您 负责发生的事情。
我对 subscribe(on:options:) 运算符有疑问。如果有人能帮我解决这个问题,我将不胜感激。
那么我们从文档中得到了什么:
Specifies the scheduler on which to perform subscribe, cancel, and request operations. In contrast with receive(on:options:), which affects downstream messages, subscribe(on:options:) changes the execution context of upstream messages.
此外,我从不同的文章中得到的是,除非我们明确指定 Scheduler
来接收我们的下游消息(使用 receive(on:options:)
),否则消息将在 [=17= 上发送] 用于接收订阅。
此信息与我在执行过程中实际获得的信息不一致。
我有下一个代码:
Just("Some text")
.map { _ in
print("Map: \(Thread.isMainThread)")
}
.subscribe(on: DispatchQueue.global())
.sink { _ in
print("Sink: \(Thread.isMainThread)")
}
.store(in: &subscriptions)
我希望下一个输出:
Map: false
Sink: false
但我得到的是:
Map: true
Sink: false
当我使用 Sequence
发布者时,同样的事情发生了。
如果我交换 map
运算符和 subscribe
运算符的位置,我会收到我想要的:
Just("Some text")
.subscribe(on: DispatchQueue.global())
.map { _ in
print("Map: \(Thread.isMainThread)")
}
.sink { _ in
print("Sink: \(Thread.isMainThread)")
}
.store(in: &subscriptions)
输出:
Map: false
Sink: false
有趣的是,当我使用与我的自定义发布商的第一个列表相同的运算符顺序时,我收到了我想要的行为:
struct TestJust<Output>: Publisher {
typealias Failure = Never
private let value: Output
init(_ output: Output) {
self.value = output
}
func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
subscriber.receive(subscription: Subscriptions.empty)
_ = subscriber.receive(value)
subscriber.receive(completion: .finished)
}
}
TestJust("Some text")
.map { _ in
print("Map: \(Thread.isMainThread)")
}
.subscribe(on: DispatchQueue.global())
.sink { _ in
print("Sink: \(Thread.isMainThread)")
}
.store(in: &subscriptions)
输出:
Map: false
Sink: false
所以我认为要么我完全误解了所有这些机制,要么一些发布者故意选择线程来发布值(Just
,Sequence
-> Main
,URLSession.DataTaskPublisher
-> Some of Background
),这对我来说没有意义,因为在这种情况下为什么我们需要这个 subscribe(on:options:)
for.
你能帮我理解我错过了什么吗?提前谢谢你。
首先要了解的是,消息既向上 管道流动,又向下 管道流动。流向向上管道(“上游”)的消息是:
订阅实际表现(接收订阅)
订阅者向上游发布者请求新值
取消消息(这些从最终订阅者向上渗透)
流向 管道(“下游”)的消息是:
值
完成,包括失败(错误)或正常完成(报告发布者发出了它的最后一个值)
好吧,正如文档明确指出的那样,subscribe(on:)
是关于前者的:向上游流动的消息。但是您实际上并没有在测试中跟踪 任何 的 那些 消息,因此 none 的结果反映了关于它们的任何信息!在订阅点上方插入适当的 handleEvents
运算符以查看内容向上流向管道(例如,实现其 receiveRequest:
参数):
Just("Some text")
.handleEvents(receiveRequest: {
_ in print("Handle1: \(Thread.isMainThread)")
})
.map // etc.
与此同时,您应该不假设消息将在其上流动的线程下游(即值和完成)。你说:
Also, what I got from different articles is that unless we explicitly specify the Scheduler to receive our downstream messages on (using
receive(on:options:)
), messages will be send on the Scheduler used for receiving a subscription.
但这似乎是一个虚假的假设。并且您的代码没有任何内容以明确的方式确定下游发送线程。正如你所说的那样,你 可以 通过 receive(on:)
控制它,但如果你不这样做,我会说你必须 什么都不假设 关于此事。一些发布者确实会在后台线程上产生一个值,例如数据任务发布者,这是非常有意义的(数据任务完成处理程序也会发生同样的事情)。其他人没有。
您可以假设的是receive(on:)
以外的运算符通常不会改变值传递线程。但是操作员是否以及如何使用订阅线程来确定接收线程,这是您不应该假设的事情。要控制接收线程,就控制它!调用 receive(on:)
或什么都不假设。
举个例子,如果你把开场改成
Just("Some text")
.receive(on: DispatchQueue.main)
然后你的 map
和你的 sink
都会报告它们正在主线程上接收值。为什么?因为您控制了接收线程。无论您在任何 subscribe(on:)
命令中说什么,这都有效。完全是两码事。
也许如果你调用subscribe(on:)
但你不调用receive(on:)
,一些关于下游发送线程的事情是由subscribe(on:)
线程决定的,但我肯定不会'不要依赖于任何硬性规定;文档中没有这么说!相反,不要那样做。如果您实施 subscribe(on:)
,也实施 receive(on:)
,以便 您 负责发生的事情。