Swift 的 Combine 框架是否有类似于 RXSwift 或 Reactive Swift 中的 sample(on:) 运算符?

Does Swift's Combine framework have a sample(on:) operator similar to those in RXSwift or Reactive Swift?

有谁知道如何在 Combine 中重新创建采样行为?

这是 RXMarbles 中示例行为的图表

sample() 的要点是有两个流,当一个流被触发时,如果另一个流还没有发送,则发送另一个流的最新值。

这是一个 可以 做你想做的游乐场。我没有对其进行大量测试,因此请谨慎行事:

import UIKit
import Combine
import PlaygroundSupport

struct SamplePublisher<DataSeq, Trigger, E> : Publisher
where DataSeq : Publisher,
      Trigger : Publisher,
      DataSeq.Failure == Trigger.Failure,
      E == DataSeq.Failure,
      DataSeq.Output : Equatable {

    typealias Output = DataSeq.Output
    typealias Failure = E

    // The two sequences we are observing, the data sequence and the
    // trigger sequence.  When the trigger fires it will send the
    // latest value from the dataSequence UNLESS it hasn't changed
    let dataPublisher : DataSeq
    let triggerPublisher : Trigger

    struct SamplePublisherSubscription : Subscription {
        var combineIdentifier = CombineIdentifier()

        let dataSubscription : AnyCancellable
        let triggerSubscription : Subscription

        func request(_ demand: Subscribers.Demand) {
            triggerSubscription.request(demand)
        }

        func cancel() {
            dataSubscription.cancel()
            triggerSubscription.cancel()
        }
    }

    func receive<S>(subscriber: S) where S : Subscriber, E == S.Failure, DataSeq.Output == S.Input {
        var latestData : DataSeq.Output?
        var lastSent : DataSeq.Output?
        var triggerSubscription : Subscription?

        // Compares the latest value sent to the last one that was sent.
        // If they don't match then it sends the latest value along.
        // IF they do match, or if no value has been sent on the data stream yet
        // Don't emit a new value.
        func emitIfNeeded() -> Subscribers.Demand {
            guard let latest = latestData else { return .unlimited }

            if nil == lastSent ||
                lastSent! != latest {
                lastSent = latest
                return subscriber.receive(latest)
            } else {
                return .unlimited
            }
        }

        // Here we watch the data stream for new values and simply
        // record them.  If the data stream ends, or erors we
        // pass that on to our subscriber.
        let dataSubscription = dataPublisher.sink(
            receiveCompletion: {
                switch [=10=] {
                    case .finished:
                        subscriber.receive(completion: .finished)
                    case .failure(let error):
                        subscriber.receive(completion: .failure(error))
                }
            },
            receiveValue: {
                latestData = [=10=]
            })

        // The thing that subscribes to the trigger sequence.
        // When it receives a value, we emit the latest value from the data stream (if any).
        // If the trigger stream ends or errors, that will also end or error this publisher.
        let triggerSubscriber = AnySubscriber<Trigger.Output,Trigger.Failure>(
            receiveSubscription: { subscription in triggerSubscription = subscription },
            receiveValue: { _ in emitIfNeeded() },
            receiveCompletion: {
                switch [=10=] {
                    case .finished :
                        emitIfNeeded()
                        subscriber.receive(completion: .finished)
                    case .failure(let error) :
                        subscriber.receive(completion: .failure(error))
                }
            })

        // subscribe to the trigger sequence
        triggerPublisher.subscribe(triggerSubscriber)

        // Record relevant information and return the subscription to the subscriber.
        subscriber.receive(subscription: SamplePublisherSubscription(
            dataSubscription: dataSubscription,
            triggerSubscription: triggerSubscription!))
    }
}

extension Publisher {

    // A utility function that lets you create a stream that is triggered by
    // a value being emitted from another stream
    func sample<Trigger, E>(trigger: Trigger) -> SamplePublisher<Self, Trigger, E>
    where Trigger : Publisher,
          Self.Failure == Trigger.Failure,
          E == Self.Failure,
    Self.Output : Equatable {
        return SamplePublisher( dataPublisher : self, triggerPublisher : trigger)
    }
}

var count = 0
let timer = Timer.publish(every: 5.0, on: RunLoop.current, in: .common).autoconnect().eraseToAnyPublisher()
let data = Timer.publish(every: 1.0, on: RunLoop.current, in: .common)
    .autoconnect()
    .scan(0) { total, _ in total + 1}


var subscriptions = Set<AnyCancellable>()
data.sample(trigger: timer).print()
    .sink(receiveCompletion: {
        debugPrint([=10=])
    }, receiveValue: {
        debugPrint([=10=])
    }).store(in: &subscriptions)

PlaygroundSupport.PlaygroundPage.current.needsIndefiniteExecution = true

CombineExt 库有 withLatestFrom 运算符,它可以做你想做的事,还有许多其他有用的运算符。