RxJava 滑动 Window
RxJava Sliding Window
我有一个发射数据的 observable,我想最初缓冲它三秒钟,然后在初始缓冲后必须有一秒钟的滑动。这更像是 buffer(timespan,unit,skip)
跳过的地方是时间跨度。
样本:
ObservableData,TimeStamp : (5,1),(10,1.5),(30,2.8),(40,3.2),(60,3.8),(90,4.2)
ExpectedList : {5,10,30},{10,30,40,60},{30,40,60,90}
我可以通过创建自定义运算符来实现这一点。我只想知道有没有什么方法可以不依赖自定义运算符。
我觉得可以用内置运算符解决。下面的代码演示了其中一种方法,尽管当用于热源或非轻量级冷源时事情会变得棘手 - 我鼓励您将它用于教育/获得想法的目的,而不是生产用途
@Test
fun slidingWindow() {
val events = Flowable.just(
Data(5, 1.0),
Data(10, 1.5),
Data(30, 2.8),
Data(40, 3.2),
Data(60, 3.8),
Data(90, 4.2))
.observeOn(Schedulers.io())
val windows = window(windowSize = 3, slideSize = 1, data = events).toList().blockingGet()
Assert.assertNotNull(windows)
Assert.assertFalse(windows.isEmpty())
}
fun window(windowSize: Int, slideSize: Int, data: Flowable<Data>): Flowable<List<Int>> = window(
from = 0,
to = windowSize,
slideSize = slideSize,
data = data)
fun window(from: Int, to: Int, slideSize: Int, data: Flowable<Data>): Flowable<List<Int>> {
val window = data.takeWhile { it.time <= to }.skipWhile { it.time < from }.map { it.data }
val tail = data.skipWhile { it.time <= from + slideSize }
val nonEmptyWindow = window.toList().filter { !it.isEmpty() }
val nextWindow = nonEmptyWindow.flatMapPublisher {
window(from + slideSize, to + slideSize, slideSize, tail).observeOn(Schedulers.io())
}
return nonEmptyWindow.toFlowable().concatWith(nextWindow)
}
data class Data(val data: Int, val time: Double)
上面的测试产生
[[5, 10, 30],
[10, 30, 40, 60],
[30, 40, 60, 90],
[40, 60, 90],
[90]]
这可以通过运算符timestamp()
和scan()
来完成,扫描并检查它们内部的缓冲区元素。
这个想法的例子(它不缓冲最初的 3 秒),带有 compose()
运算符。
/**
* Sliding overlapping window. If exactCount of elements are within timespan
* then groups elements into List otherwise filtered out
*/
fun <T> slidingWindow(
timespan: Long,
timespanUnit: TimeUnit,
exactCount: Int
): (Observable<T>) -> Observable<List<T>> {
return { from ->
from.timestamp(timespanUnit)
.scan(
ArrayDeque<Timed<T>>(exactCount + 1),
{ queue: ArrayDeque<Timed<T>>, t: Timed<T>
->
queue.addLast(t)
if (queue.size > exactCount) queue.removeFirst()
queue.removeAll { timed -> queue.last().time() - timed.time() > timespan }
queue
}
)
.filter { it.size == exactCount }
.map { it.toTypedArray() }
.map { it.map { timed -> timed.value() } }
}
}
@Test
fun slidingWindow() {
val testSequence = "asXZYe".asIterable()
val timespan = 5000L
Observable.fromIterable(testSequence)
.compose(slidingWindow(timespan, TimeUnit.MILLISECONDS, 3))
.test()
.assertValues(
arrayListOf('a', 's', 'X'),
arrayListOf('s', 'X', 'Z'),
arrayListOf('X', 'Z', 'Y'),
arrayListOf('Z', 'Y', 'e')
)
}
我有一个发射数据的 observable,我想最初缓冲它三秒钟,然后在初始缓冲后必须有一秒钟的滑动。这更像是 buffer(timespan,unit,skip)
跳过的地方是时间跨度。
样本:
ObservableData,TimeStamp : (5,1),(10,1.5),(30,2.8),(40,3.2),(60,3.8),(90,4.2)
ExpectedList : {5,10,30},{10,30,40,60},{30,40,60,90}
我可以通过创建自定义运算符来实现这一点。我只想知道有没有什么方法可以不依赖自定义运算符。
我觉得可以用内置运算符解决。下面的代码演示了其中一种方法,尽管当用于热源或非轻量级冷源时事情会变得棘手 - 我鼓励您将它用于教育/获得想法的目的,而不是生产用途
@Test
fun slidingWindow() {
val events = Flowable.just(
Data(5, 1.0),
Data(10, 1.5),
Data(30, 2.8),
Data(40, 3.2),
Data(60, 3.8),
Data(90, 4.2))
.observeOn(Schedulers.io())
val windows = window(windowSize = 3, slideSize = 1, data = events).toList().blockingGet()
Assert.assertNotNull(windows)
Assert.assertFalse(windows.isEmpty())
}
fun window(windowSize: Int, slideSize: Int, data: Flowable<Data>): Flowable<List<Int>> = window(
from = 0,
to = windowSize,
slideSize = slideSize,
data = data)
fun window(from: Int, to: Int, slideSize: Int, data: Flowable<Data>): Flowable<List<Int>> {
val window = data.takeWhile { it.time <= to }.skipWhile { it.time < from }.map { it.data }
val tail = data.skipWhile { it.time <= from + slideSize }
val nonEmptyWindow = window.toList().filter { !it.isEmpty() }
val nextWindow = nonEmptyWindow.flatMapPublisher {
window(from + slideSize, to + slideSize, slideSize, tail).observeOn(Schedulers.io())
}
return nonEmptyWindow.toFlowable().concatWith(nextWindow)
}
data class Data(val data: Int, val time: Double)
上面的测试产生
[[5, 10, 30],
[10, 30, 40, 60],
[30, 40, 60, 90],
[40, 60, 90],
[90]]
这可以通过运算符timestamp()
和scan()
来完成,扫描并检查它们内部的缓冲区元素。
这个想法的例子(它不缓冲最初的 3 秒),带有 compose()
运算符。
/**
* Sliding overlapping window. If exactCount of elements are within timespan
* then groups elements into List otherwise filtered out
*/
fun <T> slidingWindow(
timespan: Long,
timespanUnit: TimeUnit,
exactCount: Int
): (Observable<T>) -> Observable<List<T>> {
return { from ->
from.timestamp(timespanUnit)
.scan(
ArrayDeque<Timed<T>>(exactCount + 1),
{ queue: ArrayDeque<Timed<T>>, t: Timed<T>
->
queue.addLast(t)
if (queue.size > exactCount) queue.removeFirst()
queue.removeAll { timed -> queue.last().time() - timed.time() > timespan }
queue
}
)
.filter { it.size == exactCount }
.map { it.toTypedArray() }
.map { it.map { timed -> timed.value() } }
}
}
@Test
fun slidingWindow() {
val testSequence = "asXZYe".asIterable()
val timespan = 5000L
Observable.fromIterable(testSequence)
.compose(slidingWindow(timespan, TimeUnit.MILLISECONDS, 3))
.test()
.assertValues(
arrayListOf('a', 's', 'X'),
arrayListOf('s', 'X', 'Z'),
arrayListOf('X', 'Z', 'Y'),
arrayListOf('Z', 'Y', 'e')
)
}