Swift 的 Combine 框架是否有类似于 RXSwift 或 Reactive Swift 中的 sample(on:) 运算符?
Does Swift's Combine framework have a sample(on:) operator similar to those in RXSwift or Reactive Swift?
有谁知道如何在 Combine 中重新创建采样行为?
这是 RXMarbles 中示例行为的图表
sample() 的要点是有两个流,当一个流被触发时,如果另一个流还没有发送,则发送另一个流的最新值。
这是一个 可以 做你想做的游乐场。我没有对其进行大量测试,因此请谨慎行事:
import UIKit
import Combine
import PlaygroundSupport
struct SamplePublisher<DataSeq, Trigger, E> : Publisher
where DataSeq : Publisher,
Trigger : Publisher,
DataSeq.Failure == Trigger.Failure,
E == DataSeq.Failure,
DataSeq.Output : Equatable {
typealias Output = DataSeq.Output
typealias Failure = E
// The two sequences we are observing, the data sequence and the
// trigger sequence. When the trigger fires it will send the
// latest value from the dataSequence UNLESS it hasn't changed
let dataPublisher : DataSeq
let triggerPublisher : Trigger
struct SamplePublisherSubscription : Subscription {
var combineIdentifier = CombineIdentifier()
let dataSubscription : AnyCancellable
let triggerSubscription : Subscription
func request(_ demand: Subscribers.Demand) {
triggerSubscription.request(demand)
}
func cancel() {
dataSubscription.cancel()
triggerSubscription.cancel()
}
}
func receive<S>(subscriber: S) where S : Subscriber, E == S.Failure, DataSeq.Output == S.Input {
var latestData : DataSeq.Output?
var lastSent : DataSeq.Output?
var triggerSubscription : Subscription?
// Compares the latest value sent to the last one that was sent.
// If they don't match then it sends the latest value along.
// IF they do match, or if no value has been sent on the data stream yet
// Don't emit a new value.
func emitIfNeeded() -> Subscribers.Demand {
guard let latest = latestData else { return .unlimited }
if nil == lastSent ||
lastSent! != latest {
lastSent = latest
return subscriber.receive(latest)
} else {
return .unlimited
}
}
// Here we watch the data stream for new values and simply
// record them. If the data stream ends, or erors we
// pass that on to our subscriber.
let dataSubscription = dataPublisher.sink(
receiveCompletion: {
switch [=10=] {
case .finished:
subscriber.receive(completion: .finished)
case .failure(let error):
subscriber.receive(completion: .failure(error))
}
},
receiveValue: {
latestData = [=10=]
})
// The thing that subscribes to the trigger sequence.
// When it receives a value, we emit the latest value from the data stream (if any).
// If the trigger stream ends or errors, that will also end or error this publisher.
let triggerSubscriber = AnySubscriber<Trigger.Output,Trigger.Failure>(
receiveSubscription: { subscription in triggerSubscription = subscription },
receiveValue: { _ in emitIfNeeded() },
receiveCompletion: {
switch [=10=] {
case .finished :
emitIfNeeded()
subscriber.receive(completion: .finished)
case .failure(let error) :
subscriber.receive(completion: .failure(error))
}
})
// subscribe to the trigger sequence
triggerPublisher.subscribe(triggerSubscriber)
// Record relevant information and return the subscription to the subscriber.
subscriber.receive(subscription: SamplePublisherSubscription(
dataSubscription: dataSubscription,
triggerSubscription: triggerSubscription!))
}
}
extension Publisher {
// A utility function that lets you create a stream that is triggered by
// a value being emitted from another stream
func sample<Trigger, E>(trigger: Trigger) -> SamplePublisher<Self, Trigger, E>
where Trigger : Publisher,
Self.Failure == Trigger.Failure,
E == Self.Failure,
Self.Output : Equatable {
return SamplePublisher( dataPublisher : self, triggerPublisher : trigger)
}
}
var count = 0
let timer = Timer.publish(every: 5.0, on: RunLoop.current, in: .common).autoconnect().eraseToAnyPublisher()
let data = Timer.publish(every: 1.0, on: RunLoop.current, in: .common)
.autoconnect()
.scan(0) { total, _ in total + 1}
var subscriptions = Set<AnyCancellable>()
data.sample(trigger: timer).print()
.sink(receiveCompletion: {
debugPrint([=10=])
}, receiveValue: {
debugPrint([=10=])
}).store(in: &subscriptions)
PlaygroundSupport.PlaygroundPage.current.needsIndefiniteExecution = true
CombineExt 库有 withLatestFrom
运算符,它可以做你想做的事,还有许多其他有用的运算符。
有谁知道如何在 Combine 中重新创建采样行为?
这是 RXMarbles 中示例行为的图表
sample() 的要点是有两个流,当一个流被触发时,如果另一个流还没有发送,则发送另一个流的最新值。
这是一个 可以 做你想做的游乐场。我没有对其进行大量测试,因此请谨慎行事:
import UIKit
import Combine
import PlaygroundSupport
struct SamplePublisher<DataSeq, Trigger, E> : Publisher
where DataSeq : Publisher,
Trigger : Publisher,
DataSeq.Failure == Trigger.Failure,
E == DataSeq.Failure,
DataSeq.Output : Equatable {
typealias Output = DataSeq.Output
typealias Failure = E
// The two sequences we are observing, the data sequence and the
// trigger sequence. When the trigger fires it will send the
// latest value from the dataSequence UNLESS it hasn't changed
let dataPublisher : DataSeq
let triggerPublisher : Trigger
struct SamplePublisherSubscription : Subscription {
var combineIdentifier = CombineIdentifier()
let dataSubscription : AnyCancellable
let triggerSubscription : Subscription
func request(_ demand: Subscribers.Demand) {
triggerSubscription.request(demand)
}
func cancel() {
dataSubscription.cancel()
triggerSubscription.cancel()
}
}
func receive<S>(subscriber: S) where S : Subscriber, E == S.Failure, DataSeq.Output == S.Input {
var latestData : DataSeq.Output?
var lastSent : DataSeq.Output?
var triggerSubscription : Subscription?
// Compares the latest value sent to the last one that was sent.
// If they don't match then it sends the latest value along.
// IF they do match, or if no value has been sent on the data stream yet
// Don't emit a new value.
func emitIfNeeded() -> Subscribers.Demand {
guard let latest = latestData else { return .unlimited }
if nil == lastSent ||
lastSent! != latest {
lastSent = latest
return subscriber.receive(latest)
} else {
return .unlimited
}
}
// Here we watch the data stream for new values and simply
// record them. If the data stream ends, or erors we
// pass that on to our subscriber.
let dataSubscription = dataPublisher.sink(
receiveCompletion: {
switch [=10=] {
case .finished:
subscriber.receive(completion: .finished)
case .failure(let error):
subscriber.receive(completion: .failure(error))
}
},
receiveValue: {
latestData = [=10=]
})
// The thing that subscribes to the trigger sequence.
// When it receives a value, we emit the latest value from the data stream (if any).
// If the trigger stream ends or errors, that will also end or error this publisher.
let triggerSubscriber = AnySubscriber<Trigger.Output,Trigger.Failure>(
receiveSubscription: { subscription in triggerSubscription = subscription },
receiveValue: { _ in emitIfNeeded() },
receiveCompletion: {
switch [=10=] {
case .finished :
emitIfNeeded()
subscriber.receive(completion: .finished)
case .failure(let error) :
subscriber.receive(completion: .failure(error))
}
})
// subscribe to the trigger sequence
triggerPublisher.subscribe(triggerSubscriber)
// Record relevant information and return the subscription to the subscriber.
subscriber.receive(subscription: SamplePublisherSubscription(
dataSubscription: dataSubscription,
triggerSubscription: triggerSubscription!))
}
}
extension Publisher {
// A utility function that lets you create a stream that is triggered by
// a value being emitted from another stream
func sample<Trigger, E>(trigger: Trigger) -> SamplePublisher<Self, Trigger, E>
where Trigger : Publisher,
Self.Failure == Trigger.Failure,
E == Self.Failure,
Self.Output : Equatable {
return SamplePublisher( dataPublisher : self, triggerPublisher : trigger)
}
}
var count = 0
let timer = Timer.publish(every: 5.0, on: RunLoop.current, in: .common).autoconnect().eraseToAnyPublisher()
let data = Timer.publish(every: 1.0, on: RunLoop.current, in: .common)
.autoconnect()
.scan(0) { total, _ in total + 1}
var subscriptions = Set<AnyCancellable>()
data.sample(trigger: timer).print()
.sink(receiveCompletion: {
debugPrint([=10=])
}, receiveValue: {
debugPrint([=10=])
}).store(in: &subscriptions)
PlaygroundSupport.PlaygroundPage.current.needsIndefiniteExecution = true
CombineExt 库有 withLatestFrom
运算符,它可以做你想做的事,还有许多其他有用的运算符。