RxJava:组合两个可选的可观察对象

RxJava: combine two optional observables

我有两个Observable,我们称它们为PeanutButterJelly。我想将它们组合成 Sandwich Observable。我可以使用:

Observable<PeanutButter> peanutButterObservable = ...;
Observable<Jelly> jellyObservable = ...;
Observable<Sandwich> sandwichObservable = Observable.combineLatest(
    peanutButterObservable,
    jellyObservable,
    (pb, j) -> makeSandwich(pb, j))

问题是 RX 等待第一个 PeanutButter 和第一个 Jelly 在发出第一个组合 Sandwich 之前发出,但 Jelly 可能永远不会发出这意味着我 从来没有 得到第一个 Sandwich.

我想合并这两个提要,以便在任一提要发出第一个项目时立即发出组合项目,而不管另一个提要是否尚未发出任何内容,我该怎么做在 RxJava 中?

一种可能的方法是使用 startWith 运算符在订阅时触发从每个流发出已知值。这样 combineLatest() 将在任一流发出值时触发。您只需要注意在 onNext 消费者中寻找 initial/signal 值。

像这样...:

@Test
public void sandwiches() {
    final Observable<String> peanutButters = Observable.just("chunky", "smooth")
        .startWith("--initial--");

    final Observable<String> jellies = Observable.just("strawberry", "blackberry", "raspberry")
        .startWith("--initial--");

    Observable.combineLatest(peanutButters, jellies, (peanutButter, jelly) -> {
        return new Pair<>(peanutButter, jelly);
    })
    .subscribe(
        next -> {
            final String peanutButter = next.getFirst();
            final String jelly = next.getSecond();

            if(peanutButter.equals("--initial--") && jelly.equals("--initial--")) {
                // initial emissions
            } else if(peanutButter.equals("--initial--")) {
                // jelly emission
            } else if(jelly.equals("--initial--")) {
                // peanut butter emission
            } else {
                // peanut butter + jelly emissions
            }
        },
        error -> {
            System.err.println("## onError(" + error.getMessage() + ")");
        },
        () -> {
            System.out.println("## onComplete()");
        }
    );
}

我认为这个问题可以通过使用 mergescan 运算符来解决:

public class RxJavaUnitTestJava {
    public Observable<Sandwich> getSandwich(Observable<Jelly> jelly, Observable<PeanutButter> peanutButter) {
        return Observable.merge(jelly, peanutButter)
                .scan(new Sandwich(null, null), (BiFunction<Object, Object, Object>) (prevResult, newItem) -> {
                    Sandwich prevSandwich = (Sandwich) prevResult;

                    if (newItem instanceof Jelly) {
                        System.out.println("emitted: " + ((Jelly) newItem).tag);
                        return new Sandwich((Jelly) newItem, prevSandwich.peanutButter);
                    } else {
                        System.out.println("emitted: " + ((PeanutButter) newItem).tag);
                        return new Sandwich(prevSandwich.jelly, (PeanutButter) newItem);
                    }
                })
                .skip(1) // skip emitting scan's default item
                .cast(Sandwich.class);
    }

    @Test
    public void testGetSandwich() {
        PublishSubject<Jelly> jelly = PublishSubject.create();
        PublishSubject<PeanutButter> peanutButter = PublishSubject.create();

        getSandwich(jelly, peanutButter).subscribe(new Observer<Sandwich>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(Sandwich sandwich) {
                System.out.println("onNext: Sandwich: " + sandwich.toString());
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError: " + e.toString());
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });

        jelly.onNext(new Jelly("jelly1"));
        jelly.onNext(new Jelly("jelly2"));
        peanutButter.onNext(new PeanutButter("peanutButter1"));
        jelly.onNext(new Jelly("jelly3"));
        peanutButter.onNext(new PeanutButter("peanutButter2"));
    }

    class Jelly {
        String tag;

        public Jelly(String tag) { 
            this.tag = tag;
        }
    }

    class PeanutButter {
        String tag;

        public PeanutButter(String tag) { 
            this.tag = tag; 
        }
    }

    class Sandwich {
        Jelly jelly;
        PeanutButter peanutButter;

        public Sandwich(Jelly jelly, PeanutButter peanutButter) {
            this.jelly = jelly;
            this.peanutButter = peanutButter;
        }

        @Override
        public String toString() {
            String jellyResult = (jelly != null) ? jelly.tag : "no jelly";
            String peanutButterResult = (peanutButter != null) ? peanutButter.tag : "no peanutButter";

            return jellyResult + " | " + peanutButterResult;
        }
    }
}

输出:

onSubscribe
emitted: jelly1
onNext: Sandwich: jelly1 | no peanutButter
emitted: jelly2
onNext: Sandwich: jelly2 | no peanutButter
emitted: peanutButter1
onNext: Sandwich: jelly2 | peanutButter1
emitted: jelly3
onNext: Sandwich: jelly3 | peanutButter1
emitted: peanutButter2
onNext: Sandwich: jelly3 | peanutButter2

JellyPeanutButterSandwich 都是独立的类型这一事实使得 scan 中的转换和可空性变得更加复杂。如果您能控制这些类型,则可以进一步改进此解决方案。