RxJava:组合两个可选的可观察对象
RxJava: combine two optional observables
我有两个Observable
,我们称它们为PeanutButter
和Jelly
。我想将它们组合成 Sandwich
Observable
。我可以使用:
Observable<PeanutButter> peanutButterObservable = ...;
Observable<Jelly> jellyObservable = ...;
Observable<Sandwich> sandwichObservable = Observable.combineLatest(
peanutButterObservable,
jellyObservable,
(pb, j) -> makeSandwich(pb, j))
问题是 RX 等待第一个 PeanutButter
和第一个 Jelly
在发出第一个组合 Sandwich
之前发出,但 Jelly
可能永远不会发出这意味着我 从来没有 得到第一个 Sandwich
.
我想合并这两个提要,以便在任一提要发出第一个项目时立即发出组合项目,而不管另一个提要是否尚未发出任何内容,我该怎么做在 RxJava 中?
一种可能的方法是使用 startWith
运算符在订阅时触发从每个流发出已知值。这样 combineLatest()
将在任一流发出值时触发。您只需要注意在 onNext
消费者中寻找 initial/signal 值。
像这样...:
@Test
public void sandwiches() {
final Observable<String> peanutButters = Observable.just("chunky", "smooth")
.startWith("--initial--");
final Observable<String> jellies = Observable.just("strawberry", "blackberry", "raspberry")
.startWith("--initial--");
Observable.combineLatest(peanutButters, jellies, (peanutButter, jelly) -> {
return new Pair<>(peanutButter, jelly);
})
.subscribe(
next -> {
final String peanutButter = next.getFirst();
final String jelly = next.getSecond();
if(peanutButter.equals("--initial--") && jelly.equals("--initial--")) {
// initial emissions
} else if(peanutButter.equals("--initial--")) {
// jelly emission
} else if(jelly.equals("--initial--")) {
// peanut butter emission
} else {
// peanut butter + jelly emissions
}
},
error -> {
System.err.println("## onError(" + error.getMessage() + ")");
},
() -> {
System.out.println("## onComplete()");
}
);
}
我认为这个问题可以通过使用 merge
和 scan
运算符来解决:
public class RxJavaUnitTestJava {
public Observable<Sandwich> getSandwich(Observable<Jelly> jelly, Observable<PeanutButter> peanutButter) {
return Observable.merge(jelly, peanutButter)
.scan(new Sandwich(null, null), (BiFunction<Object, Object, Object>) (prevResult, newItem) -> {
Sandwich prevSandwich = (Sandwich) prevResult;
if (newItem instanceof Jelly) {
System.out.println("emitted: " + ((Jelly) newItem).tag);
return new Sandwich((Jelly) newItem, prevSandwich.peanutButter);
} else {
System.out.println("emitted: " + ((PeanutButter) newItem).tag);
return new Sandwich(prevSandwich.jelly, (PeanutButter) newItem);
}
})
.skip(1) // skip emitting scan's default item
.cast(Sandwich.class);
}
@Test
public void testGetSandwich() {
PublishSubject<Jelly> jelly = PublishSubject.create();
PublishSubject<PeanutButter> peanutButter = PublishSubject.create();
getSandwich(jelly, peanutButter).subscribe(new Observer<Sandwich>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(Sandwich sandwich) {
System.out.println("onNext: Sandwich: " + sandwich.toString());
}
@Override
public void onError(Throwable e) {
System.out.println("onError: " + e.toString());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
jelly.onNext(new Jelly("jelly1"));
jelly.onNext(new Jelly("jelly2"));
peanutButter.onNext(new PeanutButter("peanutButter1"));
jelly.onNext(new Jelly("jelly3"));
peanutButter.onNext(new PeanutButter("peanutButter2"));
}
class Jelly {
String tag;
public Jelly(String tag) {
this.tag = tag;
}
}
class PeanutButter {
String tag;
public PeanutButter(String tag) {
this.tag = tag;
}
}
class Sandwich {
Jelly jelly;
PeanutButter peanutButter;
public Sandwich(Jelly jelly, PeanutButter peanutButter) {
this.jelly = jelly;
this.peanutButter = peanutButter;
}
@Override
public String toString() {
String jellyResult = (jelly != null) ? jelly.tag : "no jelly";
String peanutButterResult = (peanutButter != null) ? peanutButter.tag : "no peanutButter";
return jellyResult + " | " + peanutButterResult;
}
}
}
输出:
onSubscribe
emitted: jelly1
onNext: Sandwich: jelly1 | no peanutButter
emitted: jelly2
onNext: Sandwich: jelly2 | no peanutButter
emitted: peanutButter1
onNext: Sandwich: jelly2 | peanutButter1
emitted: jelly3
onNext: Sandwich: jelly3 | peanutButter1
emitted: peanutButter2
onNext: Sandwich: jelly3 | peanutButter2
Jelly
、PeanutButter
和 Sandwich
都是独立的类型这一事实使得 scan
中的转换和可空性变得更加复杂。如果您能控制这些类型,则可以进一步改进此解决方案。
我有两个Observable
,我们称它们为PeanutButter
和Jelly
。我想将它们组合成 Sandwich
Observable
。我可以使用:
Observable<PeanutButter> peanutButterObservable = ...;
Observable<Jelly> jellyObservable = ...;
Observable<Sandwich> sandwichObservable = Observable.combineLatest(
peanutButterObservable,
jellyObservable,
(pb, j) -> makeSandwich(pb, j))
问题是 RX 等待第一个 PeanutButter
和第一个 Jelly
在发出第一个组合 Sandwich
之前发出,但 Jelly
可能永远不会发出这意味着我 从来没有 得到第一个 Sandwich
.
我想合并这两个提要,以便在任一提要发出第一个项目时立即发出组合项目,而不管另一个提要是否尚未发出任何内容,我该怎么做在 RxJava 中?
一种可能的方法是使用 startWith
运算符在订阅时触发从每个流发出已知值。这样 combineLatest()
将在任一流发出值时触发。您只需要注意在 onNext
消费者中寻找 initial/signal 值。
像这样...:
@Test
public void sandwiches() {
final Observable<String> peanutButters = Observable.just("chunky", "smooth")
.startWith("--initial--");
final Observable<String> jellies = Observable.just("strawberry", "blackberry", "raspberry")
.startWith("--initial--");
Observable.combineLatest(peanutButters, jellies, (peanutButter, jelly) -> {
return new Pair<>(peanutButter, jelly);
})
.subscribe(
next -> {
final String peanutButter = next.getFirst();
final String jelly = next.getSecond();
if(peanutButter.equals("--initial--") && jelly.equals("--initial--")) {
// initial emissions
} else if(peanutButter.equals("--initial--")) {
// jelly emission
} else if(jelly.equals("--initial--")) {
// peanut butter emission
} else {
// peanut butter + jelly emissions
}
},
error -> {
System.err.println("## onError(" + error.getMessage() + ")");
},
() -> {
System.out.println("## onComplete()");
}
);
}
我认为这个问题可以通过使用 merge
和 scan
运算符来解决:
public class RxJavaUnitTestJava {
public Observable<Sandwich> getSandwich(Observable<Jelly> jelly, Observable<PeanutButter> peanutButter) {
return Observable.merge(jelly, peanutButter)
.scan(new Sandwich(null, null), (BiFunction<Object, Object, Object>) (prevResult, newItem) -> {
Sandwich prevSandwich = (Sandwich) prevResult;
if (newItem instanceof Jelly) {
System.out.println("emitted: " + ((Jelly) newItem).tag);
return new Sandwich((Jelly) newItem, prevSandwich.peanutButter);
} else {
System.out.println("emitted: " + ((PeanutButter) newItem).tag);
return new Sandwich(prevSandwich.jelly, (PeanutButter) newItem);
}
})
.skip(1) // skip emitting scan's default item
.cast(Sandwich.class);
}
@Test
public void testGetSandwich() {
PublishSubject<Jelly> jelly = PublishSubject.create();
PublishSubject<PeanutButter> peanutButter = PublishSubject.create();
getSandwich(jelly, peanutButter).subscribe(new Observer<Sandwich>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(Sandwich sandwich) {
System.out.println("onNext: Sandwich: " + sandwich.toString());
}
@Override
public void onError(Throwable e) {
System.out.println("onError: " + e.toString());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
jelly.onNext(new Jelly("jelly1"));
jelly.onNext(new Jelly("jelly2"));
peanutButter.onNext(new PeanutButter("peanutButter1"));
jelly.onNext(new Jelly("jelly3"));
peanutButter.onNext(new PeanutButter("peanutButter2"));
}
class Jelly {
String tag;
public Jelly(String tag) {
this.tag = tag;
}
}
class PeanutButter {
String tag;
public PeanutButter(String tag) {
this.tag = tag;
}
}
class Sandwich {
Jelly jelly;
PeanutButter peanutButter;
public Sandwich(Jelly jelly, PeanutButter peanutButter) {
this.jelly = jelly;
this.peanutButter = peanutButter;
}
@Override
public String toString() {
String jellyResult = (jelly != null) ? jelly.tag : "no jelly";
String peanutButterResult = (peanutButter != null) ? peanutButter.tag : "no peanutButter";
return jellyResult + " | " + peanutButterResult;
}
}
}
输出:
onSubscribe
emitted: jelly1
onNext: Sandwich: jelly1 | no peanutButter
emitted: jelly2
onNext: Sandwich: jelly2 | no peanutButter
emitted: peanutButter1
onNext: Sandwich: jelly2 | peanutButter1
emitted: jelly3
onNext: Sandwich: jelly3 | peanutButter1
emitted: peanutButter2
onNext: Sandwich: jelly3 | peanutButter2
Jelly
、PeanutButter
和 Sandwich
都是独立的类型这一事实使得 scan
中的转换和可空性变得更加复杂。如果您能控制这些类型,则可以进一步改进此解决方案。