如何使用 RxJava 从另一个流实现过滤器
How to implement a filter from another stream using RxJava
所以我有两个像这样的蒸汽:
--1--2--3--4--5--6-|
-----A-----B-------|
我的目标是拥有这样的直播
--1----3------5--6-|
我尝试过使用像 takeUntil 或 skipUntil 这样的运算符,但我无法生成有用的东西。你能帮我一些忙吗?
谢谢
一般来说,2和A之间存在重合的问题,你必须定义一个window,其中第二个流可以防止第一个流的值被发出。例如,这将等待每个项目 1 毫秒 (2.x):
import java.util.concurrent.TimeUnit;
import io.reactivex.*;
import io.reactivex.schedulers.TestScheduler;
public class Coincidence {
public static void main(String[] args) {
TestScheduler sch = new TestScheduler();
Flowable.interval(100, TimeUnit.MILLISECONDS, sch)
.onBackpressureBuffer()
.compose(coincide(Flowable.interval(300, TimeUnit.MILLISECONDS, sch), sch))
.take(7)
.subscribe(v ->
System.out.printf("%d - %d%n", sch.now(TimeUnit.MILLISECONDS), v));
sch.advanceTimeBy(1001, TimeUnit.MILLISECONDS);
}
static <T, U> FlowableTransformer<T, T> coincide(
Flowable<U> other, Scheduler scheduler) {
return f -> {
return other.publish(g -> {
return f.flatMap(v -> {
return Flowable.just(v)
.delay(1, TimeUnit.MILLISECONDS, scheduler)
.takeUntil(g)
;
}, 1)
.takeUntil(g.ignoreElements())
;
});
};
};
}
我认为 akarnokd 给出了唯一可能的解决方案,如果你需要处理 "coincidence"。
顺便说一下,在大多数情况下,您可能不需要事件同时发生,或者在特定时间发生 window,但您只需要检查某个事件是否发生在项目被发射之前发生。例如,您的场景可以这样调整:
--1-----2----3------4------5--6-|
-------A----------B-------------|
这意味着A(发生在2之后)将阻止它通过。 4和B也是一样。
如果是这种情况,我将按以下方式解决:
合并这两个 Observables
然后使用扫描来确定应该向下游传播哪个项目,并最终过滤结果。
Observable.merge(Observable.just(1, 2, 3, 4, 5, 6).map(integer -> new ValueObject(integer)),
Observable.just("A", "B").map(string -> new BarrierObject(string)))
.scan((myObject, myObject2) -> {
if (myObject instanceof BarrierObject) {
myObject2.setFilter();
}
return myObject2;
})
.filter(myObject -> (myObject instanceof ValueObject) && myObject.isValid())
.map(myObject -> (Integer) myObject.getValue())
.subscribe(integer -> Log.d("test", "value: " + String.valueOf(integer)));
其中 ValueObject
和 BarrierObject
扩展了以下 class:
abstract class MyObject {
boolean toBeFiltered;
Object value;
public void setFilter() {
toBeFiltered = true;
}
public boolean isValid() {
return !toBeFiltered;
}
public Object getValue() {
return value;
}
}
所以我有两个像这样的蒸汽:
--1--2--3--4--5--6-|
-----A-----B-------|
我的目标是拥有这样的直播
--1----3------5--6-|
我尝试过使用像 takeUntil 或 skipUntil 这样的运算符,但我无法生成有用的东西。你能帮我一些忙吗?
谢谢
一般来说,2和A之间存在重合的问题,你必须定义一个window,其中第二个流可以防止第一个流的值被发出。例如,这将等待每个项目 1 毫秒 (2.x):
import java.util.concurrent.TimeUnit;
import io.reactivex.*;
import io.reactivex.schedulers.TestScheduler;
public class Coincidence {
public static void main(String[] args) {
TestScheduler sch = new TestScheduler();
Flowable.interval(100, TimeUnit.MILLISECONDS, sch)
.onBackpressureBuffer()
.compose(coincide(Flowable.interval(300, TimeUnit.MILLISECONDS, sch), sch))
.take(7)
.subscribe(v ->
System.out.printf("%d - %d%n", sch.now(TimeUnit.MILLISECONDS), v));
sch.advanceTimeBy(1001, TimeUnit.MILLISECONDS);
}
static <T, U> FlowableTransformer<T, T> coincide(
Flowable<U> other, Scheduler scheduler) {
return f -> {
return other.publish(g -> {
return f.flatMap(v -> {
return Flowable.just(v)
.delay(1, TimeUnit.MILLISECONDS, scheduler)
.takeUntil(g)
;
}, 1)
.takeUntil(g.ignoreElements())
;
});
};
};
}
我认为 akarnokd 给出了唯一可能的解决方案,如果你需要处理 "coincidence"。
顺便说一下,在大多数情况下,您可能不需要事件同时发生,或者在特定时间发生 window,但您只需要检查某个事件是否发生在项目被发射之前发生。例如,您的场景可以这样调整:
--1-----2----3------4------5--6-|
-------A----------B-------------|
这意味着A(发生在2之后)将阻止它通过。 4和B也是一样。
如果是这种情况,我将按以下方式解决:
合并这两个 Observables
然后使用扫描来确定应该向下游传播哪个项目,并最终过滤结果。
Observable.merge(Observable.just(1, 2, 3, 4, 5, 6).map(integer -> new ValueObject(integer)),
Observable.just("A", "B").map(string -> new BarrierObject(string)))
.scan((myObject, myObject2) -> {
if (myObject instanceof BarrierObject) {
myObject2.setFilter();
}
return myObject2;
})
.filter(myObject -> (myObject instanceof ValueObject) && myObject.isValid())
.map(myObject -> (Integer) myObject.getValue())
.subscribe(integer -> Log.d("test", "value: " + String.valueOf(integer)));
其中 ValueObject
和 BarrierObject
扩展了以下 class:
abstract class MyObject {
boolean toBeFiltered;
Object value;
public void setFilter() {
toBeFiltered = true;
}
public boolean isValid() {
return !toBeFiltered;
}
public Object getValue() {
return value;
}
}