为什么 `Publishers.Map` 会急切地消耗上游值?

Why does `Publishers.Map` consume upstream values eagerly?

假设我有一个自定义订阅者请求一个订阅值,然后在收到前一个值三秒后请求一个附加值:

class MySubscriber: Subscriber {
    typealias Input = Int
    typealias Failure = Never

    private var subscription: Subscription?

    func receive(subscription: Subscription) {
        print("Subscribed")

        self.subscription = subscription
        subscription.request(.max(1))
    }

    func receive(_ input: Int) -> Subscribers.Demand {
        print("Value: \(input)")

        DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(3)) {
            self.subscription?.request(.max(1))
        }

        return .none
    }

    func receive(completion: Subscribers.Completion<Never>) {
        print("Complete")
        subscription = nil
    }
}

如果我用它来订阅一个无限范围的发布者,背压处理得很好,发布者每次等待 3 秒,直到它收到下一个发送值的请求:

(1...).publisher.subscribe(MySubscriber())

// Prints values infinitely with ~3 seconds between each:
//
//     Subscribed
//     Value: 1
//     Value: 2
//     Value: 3
//     ...

但是,如果我添加一个 map 运算符,那么 MySubscriber 甚至都不会收到订阅; map 似乎在收到其订阅后同步请求 Demand.Unlimited,并且应用程序无限旋转,因为 map 试图耗尽无限范围:

(1...).publisher
    .map { value in
        print("Map: \(value)")
        return value * 2
    }
    .subscribe(MySubscriber())

// The `map` transform is executed infinitely with no delay:
//
//     Map: 1
//     Map: 2
//     Map: 3
//     ...

我的问题是,为什么 map 会这样?我原以为 map 只是将其下游需求传递给上游。由于 map 应该用于转换而不是副作用,我不明白其当前行为的用例是什么。

编辑

我实现了一个地图版本来展示我认为它应该如何工作:

extension Publishers {
    struct MapLazily<Upstream: Publisher, Output>: Publisher {
        typealias Failure = Upstream.Failure

        let upstream: Upstream
        let transform: (Upstream.Output) -> Output

        init(upstream: Upstream, transform: @escaping (Upstream.Output) -> Output) {
            self.upstream = upstream
            self.transform = transform
        }

        public func receive<S: Subscriber>(subscriber: S) where S.Input == Output, S.Failure == Upstream.Failure {
            let mapSubscriber = Subscribers.LazyMapSubscriber(downstream: subscriber, transform: transform)
            upstream.receive(subscriber: mapSubscriber)
        }
    }
}

extension Subscribers {
    class LazyMapSubscriber<Input, DownstreamSubscriber: Subscriber>: Subscriber {
        let downstream: DownstreamSubscriber
        let transform: (Input) -> DownstreamSubscriber.Input

        init(downstream: DownstreamSubscriber, transform: @escaping (Input) -> DownstreamSubscriber.Input) {
            self.downstream = downstream
            self.transform = transform
        }

        func receive(subscription: Subscription) {
            downstream.receive(subscription: subscription)
        }

        func receive(_ input: Input) -> Subscribers.Demand {
            downstream.receive(transform(input))
        }

        func receive(completion: Subscribers.Completion<DownstreamSubscriber.Failure>) {
            downstream.receive(completion: completion)
        }
    }
}

extension Publisher {
    func mapLazily<Transformed>(transform: @escaping (Output) -> Transformed) -> AnyPublisher<Transformed, Failure> {
        Publishers.MapLazily(upstream: self, transform: transform).eraseToAnyPublisher()
    }
}

使用此运算符,MySubscriber 立即接收订阅并且 mapLazily 转换仅在有需求时执行:

(1...).publisher
    .mapLazily { value in
        print("Map: \(value)")
        return value * 2
    }
    .subscribe(MySubscriber())

// Only transforms the values when they are demanded by the downstream subscriber every 3 seconds:
//
//     Subscribed
//     Map: 1
//     Value: 2
//     Map: 2
//     Value: 4
//     Map: 3
//     Value: 6
//     Map: 4
//     Value: 8

我的猜测是为 Publishers.Sequence 定义的 map 的特定重载正在使用某种快捷方式来提高性能。这打破了无限序列,但即使对于有限序列,无论下游需求如何,急切地耗尽序列都会打乱我的直觉。在我看来,以下代码:

(1...3).publisher
    .map { value in
        print("Map: \(value)")
        return value * 2
    }
    .subscribe(MySubscriber())

应该打印:

Subscribed
Map: 1
Value: 2
Map: 2
Value: 4
Map: 3
Value: 6
Complete

而是打印:

Map: 1
Map: 2
Map: 3
Subscribed
Value: 2
Value: 4
Value: 6
Complete

这是一个不涉及任何自定义订阅者的更简单的测试:

(1...).publisher
    //.map { [=10=] }
    .flatMap(maxPublishers: .max(1)) {
        (i:Int) -> AnyPublisher<Int,Never> in
        Just<Int>(i)
            .delay(for: 3, scheduler: DispatchQueue.main)
            .eraseToAnyPublisher()
}
.sink { print([=10=]) }
.store(in: &storage)

它按预期工作,但是如果你取消对 .map 的注释,你将一无所获,因为 .map 运算符在不发布任何内容的情况下累积无限上游值。

根据您的假设 map 以某种方式针对前面的序列发布者进行了优化,我尝试了以下解决方法:

(1...).publisher.eraseToAnyPublisher()
    .map { [=11=] }
    // ...

果然,它解决了问题!通过对地图运算符隐藏序列发布者,我们阻止了优化。