RxJava:combineLatest 的行为

RxJava: behaviour of combineLatest

我有以下代码:

    final Observable<String> a = Observable.just("a1", "a2");   
    final Observable<String> b = Observable.just("b1");

    final Observable<String> c = Observable.combineLatest(a, b, (first, second) -> first + second);

    c.subscribe(res -> System.out.println(res));

预期输出是什么?我本以为

a1b1
a2b1

但实际输出是

a2b1

这有意义吗?生成预期序列的正确运算符是什么?

好问题!似乎可能是一种竞争条件。 combineLatest 在两个源都发出之前不会输出任何内容,并且似乎在 b 生成其输出时,a 已经移动到它的第二个项目。在具有时间间隔的异步事件的 "real world" 应用程序中,您可能会获得所需的行为。

如果你能忍受等待,一个解决方案是稍微延迟 a 的输出。通过更多的工作,您可以仅延迟第一个输出(请参阅 delay 运算符的各种重载)。此外,我刚刚注意到有一个运算符 delaySubscription 可能会起到作用(延迟您对 a 的订阅,直到 b 发出一些东西)。我确信还有其他也许更好的解决方案(我自己还在学习)。

顾名思义,它结合了每个来源的最新值。如果源是同步的或非常快,这可能意味着一个或多个源将 运行 它们完成并且操作员将只记住每个源的最后一个值。您必须通过某种方式交错源值,例如让异步源在项目之间留出足够的时间,并避免多个源的项目紧密重叠。

可以通过多种方式生成预期序列,具体取决于您的初衷。例如,如果您想要所有交叉组合,请使用 flatMap:

a.flatMap(aValue -> b, (aValue, bValue) -> first + second)
.subscribe(System.out::println);

如果 b 的重建成本很高,请将其缓存:

Observable<String> cachedB = b.cache();
a.flatMap(aValue -> cachedB, (aValue, bValue) -> first + second)
.subscribe(System.out::println);