为什么 `Publishers.Map` 会急切地消耗上游值?
Why does `Publishers.Map` consume upstream values eagerly?
假设我有一个自定义订阅者请求一个订阅值,然后在收到前一个值三秒后请求一个附加值:
class MySubscriber: Subscriber {
typealias Input = Int
typealias Failure = Never
private var subscription: Subscription?
func receive(subscription: Subscription) {
print("Subscribed")
self.subscription = subscription
subscription.request(.max(1))
}
func receive(_ input: Int) -> Subscribers.Demand {
print("Value: \(input)")
DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(3)) {
self.subscription?.request(.max(1))
}
return .none
}
func receive(completion: Subscribers.Completion<Never>) {
print("Complete")
subscription = nil
}
}
如果我用它来订阅一个无限范围的发布者,背压处理得很好,发布者每次等待 3 秒,直到它收到下一个发送值的请求:
(1...).publisher.subscribe(MySubscriber())
// Prints values infinitely with ~3 seconds between each:
//
// Subscribed
// Value: 1
// Value: 2
// Value: 3
// ...
但是,如果我添加一个 map
运算符,那么 MySubscriber
甚至都不会收到订阅; map
似乎在收到其订阅后同步请求 Demand.Unlimited
,并且应用程序无限旋转,因为 map
试图耗尽无限范围:
(1...).publisher
.map { value in
print("Map: \(value)")
return value * 2
}
.subscribe(MySubscriber())
// The `map` transform is executed infinitely with no delay:
//
// Map: 1
// Map: 2
// Map: 3
// ...
我的问题是,为什么 map
会这样?我原以为 map
只是将其下游需求传递给上游。由于 map
应该用于转换而不是副作用,我不明白其当前行为的用例是什么。
编辑
我实现了一个地图版本来展示我认为它应该如何工作:
extension Publishers {
struct MapLazily<Upstream: Publisher, Output>: Publisher {
typealias Failure = Upstream.Failure
let upstream: Upstream
let transform: (Upstream.Output) -> Output
init(upstream: Upstream, transform: @escaping (Upstream.Output) -> Output) {
self.upstream = upstream
self.transform = transform
}
public func receive<S: Subscriber>(subscriber: S) where S.Input == Output, S.Failure == Upstream.Failure {
let mapSubscriber = Subscribers.LazyMapSubscriber(downstream: subscriber, transform: transform)
upstream.receive(subscriber: mapSubscriber)
}
}
}
extension Subscribers {
class LazyMapSubscriber<Input, DownstreamSubscriber: Subscriber>: Subscriber {
let downstream: DownstreamSubscriber
let transform: (Input) -> DownstreamSubscriber.Input
init(downstream: DownstreamSubscriber, transform: @escaping (Input) -> DownstreamSubscriber.Input) {
self.downstream = downstream
self.transform = transform
}
func receive(subscription: Subscription) {
downstream.receive(subscription: subscription)
}
func receive(_ input: Input) -> Subscribers.Demand {
downstream.receive(transform(input))
}
func receive(completion: Subscribers.Completion<DownstreamSubscriber.Failure>) {
downstream.receive(completion: completion)
}
}
}
extension Publisher {
func mapLazily<Transformed>(transform: @escaping (Output) -> Transformed) -> AnyPublisher<Transformed, Failure> {
Publishers.MapLazily(upstream: self, transform: transform).eraseToAnyPublisher()
}
}
使用此运算符,MySubscriber
立即接收订阅并且 mapLazily
转换仅在有需求时执行:
(1...).publisher
.mapLazily { value in
print("Map: \(value)")
return value * 2
}
.subscribe(MySubscriber())
// Only transforms the values when they are demanded by the downstream subscriber every 3 seconds:
//
// Subscribed
// Map: 1
// Value: 2
// Map: 2
// Value: 4
// Map: 3
// Value: 6
// Map: 4
// Value: 8
我的猜测是为 Publishers.Sequence
定义的 map
的特定重载正在使用某种快捷方式来提高性能。这打破了无限序列,但即使对于有限序列,无论下游需求如何,急切地耗尽序列都会打乱我的直觉。在我看来,以下代码:
(1...3).publisher
.map { value in
print("Map: \(value)")
return value * 2
}
.subscribe(MySubscriber())
应该打印:
Subscribed
Map: 1
Value: 2
Map: 2
Value: 4
Map: 3
Value: 6
Complete
而是打印:
Map: 1
Map: 2
Map: 3
Subscribed
Value: 2
Value: 4
Value: 6
Complete
这是一个不涉及任何自定义订阅者的更简单的测试:
(1...).publisher
//.map { [=10=] }
.flatMap(maxPublishers: .max(1)) {
(i:Int) -> AnyPublisher<Int,Never> in
Just<Int>(i)
.delay(for: 3, scheduler: DispatchQueue.main)
.eraseToAnyPublisher()
}
.sink { print([=10=]) }
.store(in: &storage)
它按预期工作,但是如果你取消对 .map
的注释,你将一无所获,因为 .map
运算符在不发布任何内容的情况下累积无限上游值。
根据您的假设 map
以某种方式针对前面的序列发布者进行了优化,我尝试了以下解决方法:
(1...).publisher.eraseToAnyPublisher()
.map { [=11=] }
// ...
果然,它解决了问题!通过对地图运算符隐藏序列发布者,我们阻止了优化。
假设我有一个自定义订阅者请求一个订阅值,然后在收到前一个值三秒后请求一个附加值:
class MySubscriber: Subscriber {
typealias Input = Int
typealias Failure = Never
private var subscription: Subscription?
func receive(subscription: Subscription) {
print("Subscribed")
self.subscription = subscription
subscription.request(.max(1))
}
func receive(_ input: Int) -> Subscribers.Demand {
print("Value: \(input)")
DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(3)) {
self.subscription?.request(.max(1))
}
return .none
}
func receive(completion: Subscribers.Completion<Never>) {
print("Complete")
subscription = nil
}
}
如果我用它来订阅一个无限范围的发布者,背压处理得很好,发布者每次等待 3 秒,直到它收到下一个发送值的请求:
(1...).publisher.subscribe(MySubscriber())
// Prints values infinitely with ~3 seconds between each:
//
// Subscribed
// Value: 1
// Value: 2
// Value: 3
// ...
但是,如果我添加一个 map
运算符,那么 MySubscriber
甚至都不会收到订阅; map
似乎在收到其订阅后同步请求 Demand.Unlimited
,并且应用程序无限旋转,因为 map
试图耗尽无限范围:
(1...).publisher
.map { value in
print("Map: \(value)")
return value * 2
}
.subscribe(MySubscriber())
// The `map` transform is executed infinitely with no delay:
//
// Map: 1
// Map: 2
// Map: 3
// ...
我的问题是,为什么 map
会这样?我原以为 map
只是将其下游需求传递给上游。由于 map
应该用于转换而不是副作用,我不明白其当前行为的用例是什么。
编辑
我实现了一个地图版本来展示我认为它应该如何工作:
extension Publishers {
struct MapLazily<Upstream: Publisher, Output>: Publisher {
typealias Failure = Upstream.Failure
let upstream: Upstream
let transform: (Upstream.Output) -> Output
init(upstream: Upstream, transform: @escaping (Upstream.Output) -> Output) {
self.upstream = upstream
self.transform = transform
}
public func receive<S: Subscriber>(subscriber: S) where S.Input == Output, S.Failure == Upstream.Failure {
let mapSubscriber = Subscribers.LazyMapSubscriber(downstream: subscriber, transform: transform)
upstream.receive(subscriber: mapSubscriber)
}
}
}
extension Subscribers {
class LazyMapSubscriber<Input, DownstreamSubscriber: Subscriber>: Subscriber {
let downstream: DownstreamSubscriber
let transform: (Input) -> DownstreamSubscriber.Input
init(downstream: DownstreamSubscriber, transform: @escaping (Input) -> DownstreamSubscriber.Input) {
self.downstream = downstream
self.transform = transform
}
func receive(subscription: Subscription) {
downstream.receive(subscription: subscription)
}
func receive(_ input: Input) -> Subscribers.Demand {
downstream.receive(transform(input))
}
func receive(completion: Subscribers.Completion<DownstreamSubscriber.Failure>) {
downstream.receive(completion: completion)
}
}
}
extension Publisher {
func mapLazily<Transformed>(transform: @escaping (Output) -> Transformed) -> AnyPublisher<Transformed, Failure> {
Publishers.MapLazily(upstream: self, transform: transform).eraseToAnyPublisher()
}
}
使用此运算符,MySubscriber
立即接收订阅并且 mapLazily
转换仅在有需求时执行:
(1...).publisher
.mapLazily { value in
print("Map: \(value)")
return value * 2
}
.subscribe(MySubscriber())
// Only transforms the values when they are demanded by the downstream subscriber every 3 seconds:
//
// Subscribed
// Map: 1
// Value: 2
// Map: 2
// Value: 4
// Map: 3
// Value: 6
// Map: 4
// Value: 8
我的猜测是为 Publishers.Sequence
定义的 map
的特定重载正在使用某种快捷方式来提高性能。这打破了无限序列,但即使对于有限序列,无论下游需求如何,急切地耗尽序列都会打乱我的直觉。在我看来,以下代码:
(1...3).publisher
.map { value in
print("Map: \(value)")
return value * 2
}
.subscribe(MySubscriber())
应该打印:
Subscribed
Map: 1
Value: 2
Map: 2
Value: 4
Map: 3
Value: 6
Complete
而是打印:
Map: 1
Map: 2
Map: 3
Subscribed
Value: 2
Value: 4
Value: 6
Complete
这是一个不涉及任何自定义订阅者的更简单的测试:
(1...).publisher
//.map { [=10=] }
.flatMap(maxPublishers: .max(1)) {
(i:Int) -> AnyPublisher<Int,Never> in
Just<Int>(i)
.delay(for: 3, scheduler: DispatchQueue.main)
.eraseToAnyPublisher()
}
.sink { print([=10=]) }
.store(in: &storage)
它按预期工作,但是如果你取消对 .map
的注释,你将一无所获,因为 .map
运算符在不发布任何内容的情况下累积无限上游值。
根据您的假设 map
以某种方式针对前面的序列发布者进行了优化,我尝试了以下解决方法:
(1...).publisher.eraseToAnyPublisher()
.map { [=11=] }
// ...
果然,它解决了问题!通过对地图运算符隐藏序列发布者,我们阻止了优化。