在 RxSwift 中滑动 windows
Sliding windows in RxSwift
来自 RxJava 背景,我无法想出在 RxSwift 中实现滑动的标准方法 windows。例如。我有以下事件序列:
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, ...
让我们想象事件发射在一秒钟内发生两次。我想要做的是将这个序列转换为一系列缓冲区,每个缓冲区包含最后三秒的数据。另外,每个缓冲区每秒发射一次。所以结果看起来像这样:
[1,2,3,4,5,6], [3,4,5,6,7,8], [5,6,7,8,9,10], ...
我会在 RxJava 中做的是我会像这样使用 buffer
方法的重载之一:
stream.buffer(3000, 1000, TimeUnit.MILLISECONDS)
这正是我需要完成的结果:缓冲区序列,每个缓冲区每秒发出一次,并包含最后三秒的数据。
我广泛地检查了 RxSwift 文档,但我没有发现 buffer
运算符的任何重载允许我这样做。我是否遗漏了一些不明显的(对于 RxJava 用户,ofc)运算符?
我最初使用自定义运算符编写解决方案。从那以后,我想出了如何使用标准运算符来完成它。
extension ObservableType {
func buffer(timeSpan: RxTimeInterval, timeShift: RxTimeInterval, scheduler: SchedulerType) -> Observable<[E]> {
let trigger = Observable<Int>.timer(timeSpan, period: timeShift, scheduler: scheduler)
.takeUntil(self.takeLast(1))
let buffer = self
.scan([Date: E]()) { previous, current in
var next = previous
let now = scheduler.now
next[now] = current
return next.filter { [=10=].key > now.addingTimeInterval(-timeSpan) }
}
return trigger.withLatestFrom(buffer)
.map { [=10=].sorted(by: { [=10=].key <= .key }).map { [=10=].value } }
}
}
我将我最初的解决方案留给后代:
编写自己的运算符是这里的解决方案。
extension ObservableType {
func buffer(timeSpan: RxTimeInterval, timeShift: RxTimeInterval, scheduler: SchedulerType) -> Observable<[E]> {
return Observable.create { observer in
var buf: [Date: E] = [:]
let lock = NSRecursiveLock()
let elementDispoable = self.subscribe { event in
lock.lock(); defer { lock.unlock() }
switch event {
case let .next(element):
buf[Date()] = element
case .completed:
observer.onCompleted()
case let .error(error):
observer.onError(error)
}
}
let spanDisposable = scheduler.schedulePeriodic((), startAfter: timeSpan, period: timeShift, action: { state in
lock.lock(); defer { lock.unlock() }
let now = Date()
buf = buf.filter { [=11=].key > now.addingTimeInterval(-timeSpan) }
observer.onNext(buf.sorted(by: { [=11=].key <= .key }).map { [=11=].value })
})
return Disposables.create([spanDisposable, elementDispoable])
}
}
}
来自 RxJava 背景,我无法想出在 RxSwift 中实现滑动的标准方法 windows。例如。我有以下事件序列:
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, ...
让我们想象事件发射在一秒钟内发生两次。我想要做的是将这个序列转换为一系列缓冲区,每个缓冲区包含最后三秒的数据。另外,每个缓冲区每秒发射一次。所以结果看起来像这样:
[1,2,3,4,5,6], [3,4,5,6,7,8], [5,6,7,8,9,10], ...
我会在 RxJava 中做的是我会像这样使用 buffer
方法的重载之一:
stream.buffer(3000, 1000, TimeUnit.MILLISECONDS)
这正是我需要完成的结果:缓冲区序列,每个缓冲区每秒发出一次,并包含最后三秒的数据。
我广泛地检查了 RxSwift 文档,但我没有发现 buffer
运算符的任何重载允许我这样做。我是否遗漏了一些不明显的(对于 RxJava 用户,ofc)运算符?
我最初使用自定义运算符编写解决方案。从那以后,我想出了如何使用标准运算符来完成它。
extension ObservableType {
func buffer(timeSpan: RxTimeInterval, timeShift: RxTimeInterval, scheduler: SchedulerType) -> Observable<[E]> {
let trigger = Observable<Int>.timer(timeSpan, period: timeShift, scheduler: scheduler)
.takeUntil(self.takeLast(1))
let buffer = self
.scan([Date: E]()) { previous, current in
var next = previous
let now = scheduler.now
next[now] = current
return next.filter { [=10=].key > now.addingTimeInterval(-timeSpan) }
}
return trigger.withLatestFrom(buffer)
.map { [=10=].sorted(by: { [=10=].key <= .key }).map { [=10=].value } }
}
}
我将我最初的解决方案留给后代:
编写自己的运算符是这里的解决方案。
extension ObservableType {
func buffer(timeSpan: RxTimeInterval, timeShift: RxTimeInterval, scheduler: SchedulerType) -> Observable<[E]> {
return Observable.create { observer in
var buf: [Date: E] = [:]
let lock = NSRecursiveLock()
let elementDispoable = self.subscribe { event in
lock.lock(); defer { lock.unlock() }
switch event {
case let .next(element):
buf[Date()] = element
case .completed:
observer.onCompleted()
case let .error(error):
observer.onError(error)
}
}
let spanDisposable = scheduler.schedulePeriodic((), startAfter: timeSpan, period: timeShift, action: { state in
lock.lock(); defer { lock.unlock() }
let now = Date()
buf = buf.filter { [=11=].key > now.addingTimeInterval(-timeSpan) }
observer.onNext(buf.sorted(by: { [=11=].key <= .key }).map { [=11=].value })
})
return Disposables.create([spanDisposable, elementDispoable])
}
}
}