RxSwift - 如何限制缓冲区的时间跨度

RxSwift - How to throttle buffer's time span

我正在尝试重新创建一个代码片段,该代码片段基本上可以计算连续点击按钮的次数。代码在 RxJS 中,出于学习目的,我正在尝试将其转换为 RxSwift,但无法弄清楚缓冲区和节流部分。

You can see the js code on jsfiddle

目前我有这个

  tapButton.rx.tap      
  .buffer(timeSpan: 0.25, count: 10, scheduler: MainScheduler.instance)
  .map {[=10=].count}
  .filter { [=10=] >= 2 }
  .subscribe(onNext: { events in
    print(events)
  }).addDisposableTo(disposeBag)

而且我无法弄清楚如何延迟直到点击结束并收集自上次发射以来的所有值,就像在 RxJS 示例中一样。

您遇到的问题是因为 RxSwift buffer 运算符不像 RxJS buffer 运算符那样工作。它更像 RxJS bufferWithTimeOrCount 运算符。

目前,从 3.4.0 版开始,没有与 buffer 运算符等效的运算符。它的签名类似于 func buffer(_ boundary: Observer<BoundaryType>) -> Observable<[E]>

这是一个有趣的问题。我最终制作了一个缓冲区运算符,我在这个答案的底部提供了它。以下是我如何写出安德烈代码中定义的解决方案:

    let trigger = button.rx.tap.debounce(0.25, scheduler: MainScheduler.instance)
    let clickStream = button.rx.tap.asObservable()
        .buffer(trigger)
        .map { [=10=].count }
        .map { [=10=] == 1 ? "click" : "\([=10=])x clicks" }

    let clearStream = clickStream
        .debounce(10.0, scheduler: MainScheduler.instance)
        .map { _ in "" }

    Observable.merge([clickStream, clearStream])
        .bind(to: label.rx.text)
        .disposed(by: bag)

以上代码应该放在视图控制器的viewDidLoad方法中。我做了一个大的改变和一个小的改变。小的变化是我使用了 debounce 而不是 throttle。同样,我认为 RxJS 的节流阀与 RxSwift 的节流阀工作方式不同。比较大的变化是我把他的multiClickStream和singleClickStream结合起来了。我不完全确定他为什么要制作两个独立的流...

我所做的另一项更改是将所有影响标签的可观察对象滚动到标签可以绑定到的一个可观察对象中,而不是拥有不同的可观察对象。我觉得这样更干净。

下面是我定义的缓冲操作符

extension Observable {

    /// collects elements from the source sequence until the boundary sequence fires. Then it emits the elements as an array and begins collecting again.
    func buffer<U>(_ boundary: Observable<U>) -> Observable<[E]> {
        return Observable<[E]>.create { observer in
            var buffer: [E] = []
            let lock = NSRecursiveLock()
            let boundaryDisposable = boundary.subscribe { event in
                lock.lock(); defer { lock.unlock() }
                switch event {
                case .next:
                    observer.onNext(buffer)
                    buffer = []
                default:
                    break
                }
            }
            let disposable = self.subscribe { event in
                lock.lock(); defer { lock.unlock() }
                switch event {
                case .next(let element):
                    buffer.append(element)
                case .completed:
                    observer.onNext(buffer)
                    observer.onCompleted()
                case .error(let error):
                    observer.onError(error)
                    buffer = []
                }
            }
            return Disposables.create([disposable, boundaryDisposable])
        }
    }
}