信号:在时间间隔内收集值

Signal: Collect values over time interval

这可能是一个微不足道的问题,但我无法为这个看似简单的任务找到解决方案。由于我是 ReactiveSwift 和反应式编程的新手,所以我可能只是错过了一些明显的东西。

基本上我想做的是这样的:

signal.collect(timeInterval: .seconds(5))

我想从信号中收集特定时间段内的所有值。生成的信号将每 x 秒产生一个事件,该事件将包含从第一个信号收集的事件数组。

在 ReactiveSwift 中执行此操作的最佳方法是什么?

ReactiveSwift 中没有用于此任务的内置运算符。相反,您可以使用以下方法编写扩展:

import Foundation
import ReactiveSwift
import Result
public extension Signal {
    public func completeAfter(after: TimeInterval, onScheduler : DateSchedulerProtocol = QueueScheduler() ) -> Signal {
        let pipe : (Signal<(), NoError>, ReactiveSwift.Observer<(), NoError>) = Signal<(), NoError>.pipe()
        onScheduler.schedule(after: Date(timeIntervalSinceNow: after)) {
            pipe.1.sendCompleted()
        }
        return Signal { observer in
            return self.observe { event in
                switch event {
                case let .value(value):
                    observer.send(value: value)
                case .completed:
                    observer.sendCompleted()
                case let .failed(error):
                    observer.send(error: error)
                case .interrupted:
                    observer.sendInterrupted()
                }
            }
        }.take(until: pipe.0)
    }

    public func collectUntil(until: TimeInterval) -> Signal<[Value], Error> {
        return self.completeAfter(after: until).collect()
    }
}

然后使用signal.collectUntil(5)方法。

另一种方法是使用 ReactiveSwift 中的 timer 函数。示例(添加到相同的扩展名,如上):

public func collectUntil2(until: TimeInterval) -> Signal<[Value], Error> {
    var signal: Signal<(), NoError>? = nil
    timer(interval: until, on: QueueScheduler()).startWithSignal { innerSignal, _ in
        signal = innerSignal.map { _ in () }.take(first: 1)
    }
    return self.take(until: signal!).collect()
}

但是,我不喜欢这种方法,因为它是 SignalProducer 类型提取内部信号的伪造性质。

Signal 类型本身也有 timeout 功能,但是由于它会产生错误,因此很难使用它。如何使用它的示例(仍然,添加到相同的扩展名):

public func completeOnError() -> Signal<Value, Error> {
    return Signal { observer in
        return self.observe { event in
            switch(event) {
            case .value(let v): observer.send(value: v)
            case .failed(_): observer.sendCompleted()
            case .interrupted: observer.sendInterrupted()
            case .completed: observer.sendCompleted()
            }
        }
    }
}

public func collectUntil3(until: TimeInterval) -> Signal<[Value], Error> {
    return self
        .timeout(after: until,
                 raising: NSError() as! Error,
                 on: QueueScheduler())
        .completeOnError()
        .collect()
}

P.S. 通过选择 3 个选项中的任何一个,注意传递正确的调度程序或使用正确的调度程序对您的解决方案进行参数化。

基于 (可惜这不是我想要的),我创建了一个扩展来解决我的问题。该扩展遵循 ReactiveSwift collect 函数的结构,以尽可能接近 ReactiveSwift 的意图。

它将收集给定 timeInterval 上所有发送的值,然后将它们作为数组发送。在终止事件中,它还会发送剩余值(如果有的话)。

extension Signal {
    func collect(timeInterval: DispatchTimeInterval,
                 on scheduler: QueueScheduler = QueueScheduler()) -> Signal<[Value], Error> {
        return Signal<[Value], Error> { observer in
            var values: [Value] = []
            let sendAction: () -> Void = {
                observer.send(value: values)

                values.removeAll(keepingCapacity: true)
            }
            let disposable = CompositeDisposable()
            let scheduleDisposable = scheduler.schedule(
                    after: Date(timeInterval: timeInterval.timeInterval, since: scheduler.currentDate),
                    interval: timeInterval,
                    action: sendAction
            )

            disposable += scheduleDisposable
            disposable += self.observe { (event: Event<Value, Error>) in
                if event.isTerminating {
                    if !values.isEmpty {
                        sendAction()
                    }

                    scheduleDisposable?.dispose()
                }

                switch event {
                case let .value(value):
                    values.append(value)
                case .completed:
                    observer.sendCompleted()
                case let .failed(error):
                    observer.send(error: error)
                case .interrupted:
                    observer.sendInterrupted()
                }
            }

            return disposable
        }
    }
}

extension SignalProducer {
    func collect(timeInterval: DispatchTimeInterval,
                 on scheduler: QueueScheduler = QueueScheduler()) -> SignalProducer<[Value], Error> {
        return lift { (signal: ProducedSignal) in
            signal.collect(timeInterval: timeInterval, on: scheduler)
        }
    }
}

extension DispatchTimeInterval {
    var timeInterval: TimeInterval {
        switch self {
        case let .seconds(s):
            return TimeInterval(s)
        case let .milliseconds(ms):
            return TimeInterval(TimeInterval(ms) / 1000.0)
        case let .microseconds(us):
            return TimeInterval(UInt64(us) * NSEC_PER_USEC) / TimeInterval(NSEC_PER_SEC)
        case let .nanoseconds(ns):
            return TimeInterval(ns) / TimeInterval(NSEC_PER_SEC)
        }
    }
}