如何仅在收到第一条消息后才应用 Combine 运算符?
How do you apply a Combine operator only after the first message has been received?
在 Combine
中,仅使用内置运算符,是否可以跳过第一个值的运算符,然后将该运算符应用于所有后续值?
考虑以下因素:
publisher
.debounce(...)
.sink(...)
在这种安排中,debounce
将等待指定的超时结束,然后再将值传递给 sink
。但是,很多时候您只想 debounce
在第一个元素之后启动。例如,如果用户试图过滤联系人列表,他们很可能只在文本字段中输入了一个字母。如果是这种情况,应用程序可能应该立即开始过滤,而不必等待 debounce
超时。
我知道 Drop
发布者,但我似乎无法找到它们的组合来执行更多 "skip" 操作,例如 sink
接收每个值,但 debounce
在第一个值上被忽略。
类似于以下内容:
publisher
.if_first_element_passthrough_to_sink(...), else_debounce(...)
.sink(...)
内置运算符可以实现这样的功能吗?
澄清
一些澄清,因为我原来的帖子并不像它应该的那样清楚......下面 Asperi 提供的答案非常接近,但理想情况下总是交付序列中的第一个元素,然后 debounce
会开始。
假设用户输入以下内容:
A B C ... (pauses typing for a few seconds) ... D ... (pauses) ... E F G
我想要的是:
A
、D
、E
立即发货。
B C
使用 debounce
合并为 C
F G
使用 debounce
合并为 G
如果我正确理解了您的需求,它可以基于 Concatenate
实现,如下所示(伪代码):
let originalPublisher = ...
let publisher = Publishers.Concatenate(
prefix: originalPublisher.first(),
suffix: originalPublisher.debounce(for: 0.5, scheduler: RunLoop.main))
.eraseToAnyPublisher()
所以,prefix 只是从原始发布者向下游发送第一个元素并完成,之后 suffix 只是使用 [=12 传递所有后续元素=].
在 debounce
的特定情况下,您可能更喜欢 throttle
的行为。它立即发送第一个元素,然后每个 interval
.
发送不超过一个元素
无论如何,可以使用 Combine 内置函数吗?是的,有一些困难。 应该吗?也许……
这是您的目标的弹珠图:
每次一个值进入 kennyc-debouncer 时,它都会启动一个计时器(由阴影区域表示)。如果计时器为 运行 时值到达,kennyc-debouncer 会保存该值并重新启动计时器。当计时器到期时,如果在计时器 运行 期间有任何值到达,kennyc-debouncer 会立即发出最新值。
scan
运算符允许我们保持状态,每次输入到达时我们都会改变。我们需要将两种输入发送到 scan
:来自上游发布者的输出和定时器触发。因此,让我们为这些输入定义一个类型:
fileprivate enum DebounceEvent<Value> {
case value(Value)
case timerFired
}
我们的 scan
转换中需要什么样的状态?我们肯定需要调度程序、时间间隔和调度程序选项,以便我们可以设置计时器。
我们还需要一个 PassthroughSubject
,我们可以使用它来将计时器触发转换为 scan
运算符的输入。
我们实际上无法取消并重新启动计时器,因此,当计时器触发时,我们将查看它是否应该重新启动。如果是这样,我们将启动另一个计时器。所以我们需要知道定时器是否运行,当定时器触发时发送什么输出,如果需要重启定时器的重启时间。
由于scan
的输出是整个状态值,我们还需要状态包含要发送到下游的输出值(如果有的话)。
这是状态类型:
fileprivate struct DebounceState<Value, S: Scheduler> {
let scheduler: S
let interval: S.SchedulerTimeType.Stride
let options: S.SchedulerOptions?
let subject = PassthroughSubject<Void, Never>()
enum TimerState {
case notRunning
case running(PendingOutput?)
struct PendingOutput {
var value: Value
var earliestDeliveryTime: S.SchedulerTimeType
}
}
var output: Value? = nil
var timerState: TimerState = .notRunning
}
现在让我们看看如何实际使用 scan
和其他一些运算符来实现 kennyc 版本的去抖动:
extension Publisher {
func kennycDebounce<S: Scheduler>(
for dueTime: S.SchedulerTimeType.Stride,
scheduler: S,
options: S.SchedulerOptions? = nil
) -> AnyPublisher<Output, Failure>
{
let initialState = DebounceState<Output, S>(
scheduler: scheduler,
interval: dueTime,
options: options)
let timerEvents = initialState.subject
.map { _ in DebounceEvent<Output>.timerFired }
.setFailureType(to: Failure.self)
return self
.map { DebounceEvent.value([=12=]) }
.merge(with: timerEvents)
.scan(initialState) { [=12=].updated(with: ) }
.compactMap { [=12=].output }
.eraseToAnyPublisher()
}
}
我们首先为 scan
运算符构造初始状态。
然后,我们创建一个发布者,将状态 PassthroughSubject
的 Void
输出转换为 .timerFired
事件。
最后,我们构建了完整的管道,它有四个阶段:
将上游输出(来自 self
)转换为 .value
事件。
将值事件与计时器事件合并。
使用 scan
使用值和计时器事件更新去抖动状态。实际工作是在 updated(with:)
方法中完成的,我们将在下面添加到 DebounceState
。
将完整状态映射到我们想要传递给下游的值,并丢弃空值(当上游事件被去抖抑制时发生)。
剩下的就是写updated(with:)
方法了。它查看每个传入事件的类型(value
或 timerFired
)和计时器的状态,以决定新状态应该是什么,并在必要时设置一个新计时器。
extension DebounceState {
func updated(with event: DebounceEvent<Value>) -> DebounceState<Value, S> {
var answer = self
switch (event, timerState) {
case (.value(let value), .notRunning):
answer.output = value
answer.timerState = .running(nil)
scheduler.schedule(after: scheduler.now.advanced(by: interval), tolerance: .zero, options: options) { [subject] in subject.send() }
case (.value(let value), .running(_)):
answer.output = nil
answer.timerState = .running(.init(value: value, earliestDeliveryTime: scheduler.now.advanced(by: interval)))
case (.timerFired, .running(nil)):
answer.output = nil
answer.timerState = .notRunning
case (.timerFired, .running(.some(let pendingOutput))):
let now = scheduler.now
if pendingOutput.earliestDeliveryTime <= now {
answer.output = pendingOutput.value
answer.timerState = .notRunning
} else {
answer.output = nil
scheduler.schedule(after: pendingOutput.earliestDeliveryTime, tolerance: .zero, options: options) { [subject] in subject.send() }
}
case (.timerFired, .notRunning):
// Impossible!
answer.output = nil
}
return answer
}
}
有用吗?让我们测试一下:
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
let subject = PassthroughSubject<String, Never>()
let q = DispatchQueue.main
let start = DispatchTime.now()
let cfStart = CFAbsoluteTimeGetCurrent()
q.asyncAfter(deadline: start + .milliseconds(100)) { subject.send("A") }
// A should be delivered at start + 100ms.
q.asyncAfter(deadline: start + .milliseconds(200)) { subject.send("B") }
q.asyncAfter(deadline: start + .milliseconds(300)) { subject.send("C") }
// C should be delivered at start + 800ms.
q.asyncAfter(deadline: start + .milliseconds(1100)) { subject.send("D") }
// D should be delivered at start + 1100ms.
q.asyncAfter(deadline: start + .milliseconds(1800)) { subject.send("E") }
// E should be delivered at start + 1800ms.
q.asyncAfter(deadline: start + .milliseconds(1900)) { subject.send("F") }
q.asyncAfter(deadline: start + .milliseconds(2000)) { subject.send("G") }
// G should be delivered at start + 2500ms.
let ticket = subject
.kennycDebounce(for: .milliseconds(500), scheduler: q)
.sink {
print("\([=14=]) \(((CFAbsoluteTimeGetCurrent() - cfStart) * 1000).rounded())") }
输出:
A 107.0
C 847.0
D 1167.0
E 1915.0
G 2714.0
我不知道为什么后面的活动这么晚。它可能只是游乐场的副作用。
在 Combine
中,仅使用内置运算符,是否可以跳过第一个值的运算符,然后将该运算符应用于所有后续值?
考虑以下因素:
publisher
.debounce(...)
.sink(...)
在这种安排中,debounce
将等待指定的超时结束,然后再将值传递给 sink
。但是,很多时候您只想 debounce
在第一个元素之后启动。例如,如果用户试图过滤联系人列表,他们很可能只在文本字段中输入了一个字母。如果是这种情况,应用程序可能应该立即开始过滤,而不必等待 debounce
超时。
我知道 Drop
发布者,但我似乎无法找到它们的组合来执行更多 "skip" 操作,例如 sink
接收每个值,但 debounce
在第一个值上被忽略。
类似于以下内容:
publisher
.if_first_element_passthrough_to_sink(...), else_debounce(...)
.sink(...)
内置运算符可以实现这样的功能吗?
澄清
一些澄清,因为我原来的帖子并不像它应该的那样清楚......下面 Asperi 提供的答案非常接近,但理想情况下总是交付序列中的第一个元素,然后 debounce
会开始。
假设用户输入以下内容:
A B C ... (pauses typing for a few seconds) ... D ... (pauses) ... E F G
我想要的是:
A
、D
、E
立即发货。B C
使用debounce
合并为 F G
使用debounce
合并为
C
G
如果我正确理解了您的需求,它可以基于 Concatenate
实现,如下所示(伪代码):
let originalPublisher = ...
let publisher = Publishers.Concatenate(
prefix: originalPublisher.first(),
suffix: originalPublisher.debounce(for: 0.5, scheduler: RunLoop.main))
.eraseToAnyPublisher()
所以,prefix 只是从原始发布者向下游发送第一个元素并完成,之后 suffix 只是使用 [=12 传递所有后续元素=].
在 debounce
的特定情况下,您可能更喜欢 throttle
的行为。它立即发送第一个元素,然后每个 interval
.
无论如何,可以使用 Combine 内置函数吗?是的,有一些困难。 应该吗?也许……
这是您的目标的弹珠图:
每次一个值进入 kennyc-debouncer 时,它都会启动一个计时器(由阴影区域表示)。如果计时器为 运行 时值到达,kennyc-debouncer 会保存该值并重新启动计时器。当计时器到期时,如果在计时器 运行 期间有任何值到达,kennyc-debouncer 会立即发出最新值。
scan
运算符允许我们保持状态,每次输入到达时我们都会改变。我们需要将两种输入发送到 scan
:来自上游发布者的输出和定时器触发。因此,让我们为这些输入定义一个类型:
fileprivate enum DebounceEvent<Value> {
case value(Value)
case timerFired
}
我们的 scan
转换中需要什么样的状态?我们肯定需要调度程序、时间间隔和调度程序选项,以便我们可以设置计时器。
我们还需要一个 PassthroughSubject
,我们可以使用它来将计时器触发转换为 scan
运算符的输入。
我们实际上无法取消并重新启动计时器,因此,当计时器触发时,我们将查看它是否应该重新启动。如果是这样,我们将启动另一个计时器。所以我们需要知道定时器是否运行,当定时器触发时发送什么输出,如果需要重启定时器的重启时间。
由于scan
的输出是整个状态值,我们还需要状态包含要发送到下游的输出值(如果有的话)。
这是状态类型:
fileprivate struct DebounceState<Value, S: Scheduler> {
let scheduler: S
let interval: S.SchedulerTimeType.Stride
let options: S.SchedulerOptions?
let subject = PassthroughSubject<Void, Never>()
enum TimerState {
case notRunning
case running(PendingOutput?)
struct PendingOutput {
var value: Value
var earliestDeliveryTime: S.SchedulerTimeType
}
}
var output: Value? = nil
var timerState: TimerState = .notRunning
}
现在让我们看看如何实际使用 scan
和其他一些运算符来实现 kennyc 版本的去抖动:
extension Publisher {
func kennycDebounce<S: Scheduler>(
for dueTime: S.SchedulerTimeType.Stride,
scheduler: S,
options: S.SchedulerOptions? = nil
) -> AnyPublisher<Output, Failure>
{
let initialState = DebounceState<Output, S>(
scheduler: scheduler,
interval: dueTime,
options: options)
let timerEvents = initialState.subject
.map { _ in DebounceEvent<Output>.timerFired }
.setFailureType(to: Failure.self)
return self
.map { DebounceEvent.value([=12=]) }
.merge(with: timerEvents)
.scan(initialState) { [=12=].updated(with: ) }
.compactMap { [=12=].output }
.eraseToAnyPublisher()
}
}
我们首先为 scan
运算符构造初始状态。
然后,我们创建一个发布者,将状态 PassthroughSubject
的 Void
输出转换为 .timerFired
事件。
最后,我们构建了完整的管道,它有四个阶段:
将上游输出(来自
self
)转换为.value
事件。将值事件与计时器事件合并。
使用
scan
使用值和计时器事件更新去抖动状态。实际工作是在updated(with:)
方法中完成的,我们将在下面添加到DebounceState
。将完整状态映射到我们想要传递给下游的值,并丢弃空值(当上游事件被去抖抑制时发生)。
剩下的就是写updated(with:)
方法了。它查看每个传入事件的类型(value
或 timerFired
)和计时器的状态,以决定新状态应该是什么,并在必要时设置一个新计时器。
extension DebounceState {
func updated(with event: DebounceEvent<Value>) -> DebounceState<Value, S> {
var answer = self
switch (event, timerState) {
case (.value(let value), .notRunning):
answer.output = value
answer.timerState = .running(nil)
scheduler.schedule(after: scheduler.now.advanced(by: interval), tolerance: .zero, options: options) { [subject] in subject.send() }
case (.value(let value), .running(_)):
answer.output = nil
answer.timerState = .running(.init(value: value, earliestDeliveryTime: scheduler.now.advanced(by: interval)))
case (.timerFired, .running(nil)):
answer.output = nil
answer.timerState = .notRunning
case (.timerFired, .running(.some(let pendingOutput))):
let now = scheduler.now
if pendingOutput.earliestDeliveryTime <= now {
answer.output = pendingOutput.value
answer.timerState = .notRunning
} else {
answer.output = nil
scheduler.schedule(after: pendingOutput.earliestDeliveryTime, tolerance: .zero, options: options) { [subject] in subject.send() }
}
case (.timerFired, .notRunning):
// Impossible!
answer.output = nil
}
return answer
}
}
有用吗?让我们测试一下:
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
let subject = PassthroughSubject<String, Never>()
let q = DispatchQueue.main
let start = DispatchTime.now()
let cfStart = CFAbsoluteTimeGetCurrent()
q.asyncAfter(deadline: start + .milliseconds(100)) { subject.send("A") }
// A should be delivered at start + 100ms.
q.asyncAfter(deadline: start + .milliseconds(200)) { subject.send("B") }
q.asyncAfter(deadline: start + .milliseconds(300)) { subject.send("C") }
// C should be delivered at start + 800ms.
q.asyncAfter(deadline: start + .milliseconds(1100)) { subject.send("D") }
// D should be delivered at start + 1100ms.
q.asyncAfter(deadline: start + .milliseconds(1800)) { subject.send("E") }
// E should be delivered at start + 1800ms.
q.asyncAfter(deadline: start + .milliseconds(1900)) { subject.send("F") }
q.asyncAfter(deadline: start + .milliseconds(2000)) { subject.send("G") }
// G should be delivered at start + 2500ms.
let ticket = subject
.kennycDebounce(for: .milliseconds(500), scheduler: q)
.sink {
print("\([=14=]) \(((CFAbsoluteTimeGetCurrent() - cfStart) * 1000).rounded())") }
输出:
A 107.0
C 847.0
D 1167.0
E 1915.0
G 2714.0
我不知道为什么后面的活动这么晚。它可能只是游乐场的副作用。