根据 Java rx 中的最后两个当前值进行过滤
Filtering based on the last two current values in Java rx
我正在尝试使用 java 响应式扩展构建一个简单的应用程序。我有两个连续发出温度值的流,我想检测并过滤掉可能是错误的感测温度尖峰,为此我也需要考虑先例值,以便我可以像这样考虑变化:
我仍然无法在文档中找到合适的运算符。有人知道我该如何完成任务吗?我应该自定义运算符吗?
这些是我的流:
double min = 50, max = 75, spikeFreq = 0.01;
Observable<Double> tempStream1 = Observable.create((
Subscriber<? super Double> subscriber) -> {
new TempStream(subscriber, min, max, spikeFreq).start();
});
Observable<Double> tempStream2 = Observable.create((
Subscriber<? super Double> subscriber) -> {
new TempStream(subscriber, min, max, spikeFreq).start();
});
public class TempStream extends Thread{
private Subscriber<? super Double> subscriber;
private TempSensor sensor;
public TempStream(Subscriber<? super Double> subscriber, double min,
double max, double spikeFreq) {
this.subscriber = subscriber;
sensor = new TempSensor(min, max, spikeFreq);
}
@Override
public void run() {
Random gen = new Random(System.currentTimeMillis());
while (!subscriber.isUnsubscribed()) {
try {
subscriber.onNext(sensor.getCurrentValue());
Thread.sleep(1000 + gen.nextInt() % 1000);
} catch (Exception ex) {
subscriber.onError(ex);
}
}
subscriber.onCompleted();
}
}
也许 buffer
运算符 (http://reactivex.io/documentation/operators/buffer.html) 在这种情况下可能会有所帮助。
您想将 buffer
与 count = 2
和 skip = 1
一起使用。这样,您将在流中获得一个元素的 "lookahead"。
例如:
stream.buffer(2,1).filter(buf -> buf.size() == 2 && buf.get(0) - buf.get(1) < max);
请注意,此示例还检查是否缓冲了两个值,因为在流完成时可能只发出一个值。
可以使用scan
运算符来实现:
val max = `your value`
stream
.scan(Pair(Int.MAX_VALUE, Int.MAX_VALUE)) { t1, t2 -> t1.second to t2 }
.skip(1)
.filter { (previous, current) -> current - previous < max }
.map { it.second }
所以在当前之前构建项目对。
- 跳过初始值
- 第一个元素
previous
将是 Int.MAX_VALUE
,它总是通过过滤器 current - previous < max
。
- 其他元素是真实的 prev/current 对
- 获取通过过滤器的对的当前项目
不太清楚,但封装:
/** emits first item, then emits only if current value passes [filter] */
fun <T : Any> Observable<T>.filterWithPrevious(filter: (previous: T, current: T) -> Boolean): Observable<T> =
scan(Pair(null, null)) { t1: Pair<T?, T?>, t2: T -> t1.second to t2 }
.skip(1)
.filter { (first, second) -> first?.let { filter(it, second!!) } ?: true }
.map { it.second!! }
然后:
stream.filterWithPrevious { previous, current -> current - previous < max }
PS:我已经在 kotlin 上写了它,因为 java 它会是一样的,但是很冗长。
我正在尝试使用 java 响应式扩展构建一个简单的应用程序。我有两个连续发出温度值的流,我想检测并过滤掉可能是错误的感测温度尖峰,为此我也需要考虑先例值,以便我可以像这样考虑变化:
我仍然无法在文档中找到合适的运算符。有人知道我该如何完成任务吗?我应该自定义运算符吗?
这些是我的流:
double min = 50, max = 75, spikeFreq = 0.01;
Observable<Double> tempStream1 = Observable.create((
Subscriber<? super Double> subscriber) -> {
new TempStream(subscriber, min, max, spikeFreq).start();
});
Observable<Double> tempStream2 = Observable.create((
Subscriber<? super Double> subscriber) -> {
new TempStream(subscriber, min, max, spikeFreq).start();
});
public class TempStream extends Thread{
private Subscriber<? super Double> subscriber;
private TempSensor sensor;
public TempStream(Subscriber<? super Double> subscriber, double min,
double max, double spikeFreq) {
this.subscriber = subscriber;
sensor = new TempSensor(min, max, spikeFreq);
}
@Override
public void run() {
Random gen = new Random(System.currentTimeMillis());
while (!subscriber.isUnsubscribed()) {
try {
subscriber.onNext(sensor.getCurrentValue());
Thread.sleep(1000 + gen.nextInt() % 1000);
} catch (Exception ex) {
subscriber.onError(ex);
}
}
subscriber.onCompleted();
}
}
也许 buffer
运算符 (http://reactivex.io/documentation/operators/buffer.html) 在这种情况下可能会有所帮助。
您想将 buffer
与 count = 2
和 skip = 1
一起使用。这样,您将在流中获得一个元素的 "lookahead"。
例如:
stream.buffer(2,1).filter(buf -> buf.size() == 2 && buf.get(0) - buf.get(1) < max);
请注意,此示例还检查是否缓冲了两个值,因为在流完成时可能只发出一个值。
可以使用scan
运算符来实现:
val max = `your value`
stream
.scan(Pair(Int.MAX_VALUE, Int.MAX_VALUE)) { t1, t2 -> t1.second to t2 }
.skip(1)
.filter { (previous, current) -> current - previous < max }
.map { it.second }
所以在当前之前构建项目对。
- 跳过初始值
- 第一个元素
previous
将是Int.MAX_VALUE
,它总是通过过滤器current - previous < max
。 - 其他元素是真实的 prev/current 对
- 获取通过过滤器的对的当前项目
不太清楚,但封装:
/** emits first item, then emits only if current value passes [filter] */
fun <T : Any> Observable<T>.filterWithPrevious(filter: (previous: T, current: T) -> Boolean): Observable<T> =
scan(Pair(null, null)) { t1: Pair<T?, T?>, t2: T -> t1.second to t2 }
.skip(1)
.filter { (first, second) -> first?.let { filter(it, second!!) } ?: true }
.map { it.second!! }
然后:
stream.filterWithPrevious { previous, current -> current - previous < max }
PS:我已经在 kotlin 上写了它,因为 java 它会是一样的,但是很冗长。