两个有序可观察量的完全外部连接
Full outer join of two ordered observables
假设我们有两个 observable Observable<Integer> o1
和 Observable<Integer> o2
并且每个 observable 都产生严格递增的序列。
任务是对这两个可观察对象执行等效的完全外部连接。例如加入
Observable.just(0, 2, 3, 6)
Observable.just(1, 2, 3, 4, 5, 6)
应该生产
[ [0, _], [_, 1], [2, 2], [3, 3], [_, 4], [_, 5], [6, 6] ]
连接应该是高效的,并且可以很好地处理非常大或无限的流。
在pull 场景下很容易解决。有没有惯用的 rx 方法来实现这个?
没有单一的运算符,但可以从标准和扩展运算符组合行为:
static abstract class Pair implements Comparable<Pair> {
int value;
@Override
public int compareTo(Pair o) {
return Integer.compare(value, o.value);
}
}
static final class Left extends Pair {
Left(int value) {
this.value = value;
}
@Override
public String toString() {
return "[" + value + ", _]";
}
}
static final class Right extends Pair {
Right(int value) {
this.value = value;
}
@Override
public String toString() {
return "[_, " + value + "]";
}
}
static final class Both extends Pair {
Both(int value) {
this.value = value;
}
@Override
public int hashCode() {
return value;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof Both) {
return ((Both)obj).value == value;
}
return false;
}
@Override
public String toString() {
return "[" + value + ", " + value + "]";
}
}
@SuppressWarnings("unchecked")
@Test
public void test() {
Flowable<Integer> a = Flowable.just(0, 2, 3, 6);
Flowable<Integer> b = Flowable.just(1, 2, 3, 4, 5, 6);
Flowable.defer(() -> {
boolean[] skip = { false };
return Flowables.<Pair>orderedMerge(
a.<Pair>map(Left::new), b.<Pair>map(Right::new)
)
.distinctUntilChanged()
.buffer(2, 1)
.flatMapIterable(buf -> {
if (skip[0]) {
skip[0] = false;
return Collections.emptyList();
}
if (buf.size() == 2) {
if (buf.get(0).value == buf.get(1).value) {
skip[0] = true;
return Collections.singletonList(new Both(buf.get(0).value));
}
return buf.subList(0, 1);
}
return buf;
});
})
.subscribe(System.out::println);
}
其中 Flowables.orderedMerge
在 RxJava 2 Extensions library.
假设我们有两个 observable Observable<Integer> o1
和 Observable<Integer> o2
并且每个 observable 都产生严格递增的序列。
任务是对这两个可观察对象执行等效的完全外部连接。例如加入
Observable.just(0, 2, 3, 6)
Observable.just(1, 2, 3, 4, 5, 6)
应该生产
[ [0, _], [_, 1], [2, 2], [3, 3], [_, 4], [_, 5], [6, 6] ]
连接应该是高效的,并且可以很好地处理非常大或无限的流。
在pull 场景下很容易解决。有没有惯用的 rx 方法来实现这个?
没有单一的运算符,但可以从标准和扩展运算符组合行为:
static abstract class Pair implements Comparable<Pair> {
int value;
@Override
public int compareTo(Pair o) {
return Integer.compare(value, o.value);
}
}
static final class Left extends Pair {
Left(int value) {
this.value = value;
}
@Override
public String toString() {
return "[" + value + ", _]";
}
}
static final class Right extends Pair {
Right(int value) {
this.value = value;
}
@Override
public String toString() {
return "[_, " + value + "]";
}
}
static final class Both extends Pair {
Both(int value) {
this.value = value;
}
@Override
public int hashCode() {
return value;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof Both) {
return ((Both)obj).value == value;
}
return false;
}
@Override
public String toString() {
return "[" + value + ", " + value + "]";
}
}
@SuppressWarnings("unchecked")
@Test
public void test() {
Flowable<Integer> a = Flowable.just(0, 2, 3, 6);
Flowable<Integer> b = Flowable.just(1, 2, 3, 4, 5, 6);
Flowable.defer(() -> {
boolean[] skip = { false };
return Flowables.<Pair>orderedMerge(
a.<Pair>map(Left::new), b.<Pair>map(Right::new)
)
.distinctUntilChanged()
.buffer(2, 1)
.flatMapIterable(buf -> {
if (skip[0]) {
skip[0] = false;
return Collections.emptyList();
}
if (buf.size() == 2) {
if (buf.get(0).value == buf.get(1).value) {
skip[0] = true;
return Collections.singletonList(new Both(buf.get(0).value));
}
return buf.subList(0, 1);
}
return buf;
});
})
.subscribe(System.out::println);
}
其中 Flowables.orderedMerge
在 RxJava 2 Extensions library.