如何仅在收到第一条消息后才应用 Combine 运算符?

How do you apply a Combine operator only after the first message has been received?

Combine 中,仅使用内置运算符,是否可以跳过第一个值的运算符,然后将该运算符应用于所有后续值?

考虑以下因素:

publisher
  .debounce(...)
  .sink(...)

在这种安排中,debounce 将等待指定的超时结束,然后再将值传递给 sink。但是,很多时候您只想 debounce 在第一个元素之后启动。例如,如果用户试图过滤联系人列表,他们很可能只在文本字段中输入了一个字母。如果是这种情况,应用程序可能应该立即开始过滤,而不必等待 debounce 超时。

我知道 Drop 发布者,但我似乎无法找到它们的组合来执行更多 "skip" 操作,例如 sink接收每个值,但 debounce 在第一个值上被忽略。

类似于以下内容:

publisher
  .if_first_element_passthrough_to_sink(...), else_debounce(...)
  .sink(...)

内置运算符可以实现这样的功能吗?

澄清

一些澄清,因为我原来的帖子并不像它应该的那样清楚......下面 Asperi 提供的答案非常接近,但理想情况下总是交付序列中的第一个元素,然后 debounce 会开始。

假设用户输入以下内容:

A B C ... (pauses typing for a few seconds) ... D ... (pauses) ... E F G

我想要的是:

如果我正确理解了您的需求,它可以基于 Concatenate 实现,如下所示(伪代码):

let originalPublisher = ...
let publisher = Publishers.Concatenate(
        prefix: originalPublisher.first(),
        suffix: originalPublisher.debounce(for: 0.5, scheduler: RunLoop.main))
    .eraseToAnyPublisher()

所以,prefix 只是从原始发布者向下游发送第一个元素并完成,之后 suffix 只是使用 [=12 传递所有后续元素=].

debounce 的特定情况下,您可能更喜欢 throttle 的行为。它立即发送第一个元素,然后每个 interval.

发送不超过一个元素

无论如何,可以使用 Combine 内置函数吗?是的,有一些困难。 应该吗?也许……

这是您的目标的弹珠图:

每次一个值进入 kennyc-debouncer 时,它都会启动一个计时器(由阴影区域表示)。如果计时器为 运行 时值到达,kennyc-debouncer 会保存该值并重新启动计时器。当计时器到期时,如果在计时器 运行 期间有任何值到达,kennyc-debouncer 会立即发出最新值。

scan 运算符允许我们保持状态,每次输入到达时我们都会改变。我们需要将两种输入发送到 scan:来自上游发布者的输出和定时器触发。因此,让我们为这些输入定义一个类型:

fileprivate enum DebounceEvent<Value> {
    case value(Value)
    case timerFired
}

我们的 scan 转换中需要什么样的状态?我们肯定需要调度程序、时间间隔和调度程序选项,以便我们可以设置计时器。

我们还需要一个 PassthroughSubject,我们可以使用它来将计时器触发转换为 scan 运算符的输入。

我们实际上无法取消并重新启动计时器,因此,当计时器触发时,我们将查看它是否应该重新启动。如果是这样,我们将启动另一个计时器。所以我们需要知道定时器是否运行,当定时器触发时发送什么输出,如果需要重启定时器的重启时间。

由于scan的输出是整个状态值,我们还需要状态包含要发送到下游的输出值(如果有的话)。

这是状态类型:

fileprivate struct DebounceState<Value, S: Scheduler> {
    let scheduler: S
    let interval: S.SchedulerTimeType.Stride
    let options: S.SchedulerOptions?

    let subject = PassthroughSubject<Void, Never>()

    enum TimerState {
        case notRunning
        case running(PendingOutput?)

        struct PendingOutput {
            var value: Value
            var earliestDeliveryTime: S.SchedulerTimeType
        }
    }

    var output: Value? = nil
    var timerState: TimerState = .notRunning
}

现在让我们看看如何实际使用 scan 和其他一些运算符来实现 kennyc 版本的去抖动:

extension Publisher {
    func kennycDebounce<S: Scheduler>(
        for dueTime: S.SchedulerTimeType.Stride,
        scheduler: S,
        options: S.SchedulerOptions? = nil
    ) -> AnyPublisher<Output, Failure>
    {
        let initialState = DebounceState<Output, S>(
            scheduler: scheduler,
            interval: dueTime,
            options: options)
        let timerEvents = initialState.subject
            .map { _ in DebounceEvent<Output>.timerFired }
            .setFailureType(to: Failure.self)
        return self
            .map { DebounceEvent.value([=12=]) }
            .merge(with: timerEvents)
            .scan(initialState) { [=12=].updated(with: ) }
            .compactMap { [=12=].output }
            .eraseToAnyPublisher()
    }
}

我们首先为 scan 运算符构造初始状态。

然后,我们创建一个发布者,将状态 PassthroughSubjectVoid 输出转换为 .timerFired 事件。

最后,我们构建了完整的管道,它有四个阶段:

  1. 将上游输出(来自 self)转换为 .value 事件。

  2. 将值事件与计时器事件合并。

  3. 使用 scan 使用值和计时器事件更新去抖动状态。实际工作是在 updated(with:) 方法中完成的,我们将在下面添加到 DebounceState

  4. 将完整状态映射到我们想要传递给下游的值,并丢弃空值(当上游事件被去抖抑制时发生)。

剩下的就是写updated(with:)方法了。它查看每个传入事件的类型(valuetimerFired)和计时器的状态,以决定新状态应该是什么,并在必要时设置一个新计时器。

extension DebounceState {
    func updated(with event: DebounceEvent<Value>) -> DebounceState<Value, S> {
        var answer = self
        switch (event, timerState) {
        case (.value(let value), .notRunning):
            answer.output = value
            answer.timerState = .running(nil)
            scheduler.schedule(after: scheduler.now.advanced(by: interval), tolerance: .zero, options: options) { [subject] in subject.send() }
        case (.value(let value), .running(_)):
            answer.output = nil
            answer.timerState = .running(.init(value: value, earliestDeliveryTime: scheduler.now.advanced(by: interval)))
        case (.timerFired, .running(nil)):
            answer.output = nil
            answer.timerState = .notRunning
        case (.timerFired, .running(.some(let pendingOutput))):
            let now = scheduler.now
            if pendingOutput.earliestDeliveryTime <= now {
                answer.output = pendingOutput.value
                answer.timerState = .notRunning
            } else {
                answer.output = nil
                scheduler.schedule(after: pendingOutput.earliestDeliveryTime, tolerance: .zero, options: options) { [subject] in subject.send() }
            }
        case (.timerFired, .notRunning):
            // Impossible!
            answer.output = nil
        }
        return answer
    }
}

有用吗?让我们测试一下:

import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true

let subject = PassthroughSubject<String, Never>()
let q = DispatchQueue.main
let start = DispatchTime.now()
let cfStart = CFAbsoluteTimeGetCurrent()
q.asyncAfter(deadline: start + .milliseconds(100)) { subject.send("A") }
// A should be delivered at start + 100ms.
q.asyncAfter(deadline: start + .milliseconds(200)) { subject.send("B") }
q.asyncAfter(deadline: start + .milliseconds(300)) { subject.send("C") }
// C should be delivered at start + 800ms.
q.asyncAfter(deadline: start + .milliseconds(1100)) { subject.send("D") }
// D should be delivered at start + 1100ms.
q.asyncAfter(deadline: start + .milliseconds(1800)) { subject.send("E") }
// E should be delivered at start + 1800ms.
q.asyncAfter(deadline: start + .milliseconds(1900)) { subject.send("F") }
q.asyncAfter(deadline: start + .milliseconds(2000)) { subject.send("G") }
// G should be delivered at start + 2500ms.

let ticket = subject
    .kennycDebounce(for: .milliseconds(500), scheduler: q)
    .sink {
        print("\([=14=]) \(((CFAbsoluteTimeGetCurrent() - cfStart) * 1000).rounded())") }

输出:

A 107.0
C 847.0
D 1167.0
E 1915.0
G 2714.0

我不知道为什么后面的活动这么晚。它可能只是游乐场的副作用。