RxJava:如何模拟 withLatestFrom?

RxJava: how to emulate withLatestFrom?

根据文档,Java 实现中缺少 withLatestFrom(与 combineLatest 略有不同)。关于如何模拟它的任何想法?

一些非常简单和天真的实现是:

@SuppressWarnings("unchecked")
public static <T, U, V> Observable<T> combineLatestFrom(
  Observable<U> o1,
  Observable<V> o2,
  Func2<U, V, T> f) {

    final Object nothing = new Object();
    return Observable.create(s -> {

        AtomicReference<V> val2 = new AtomicReference<V>((V) nothing);

        o1.subscribe(v -> {
          val2.getAndUpdate(current -> {
            if (current != nothing) {
              s.onNext(f.call(v, current));
            }
            return current;
          });   
        }, s::onError, s::onCompleted);

        o2.subscribe(val2::set, s::onError);

    });
}

并且这个方法可以这样使用:

combineLatestFrom(numbers, letters, (n, l) -> n + l)
  .subscribe(System.out::println);

如果这里的数字和字母是大理石图中的 Observables -> 结果将如预期。

给定 a 作为主要可观察对象,b 作为 'latest from' 可观察对象,这个伪 java8 lambda 应该做你想做的事:

a.publish(a' -> b.switchMap(y -> a'.map(x -> x + y)))

这首先将a发布为a',这使得它可以在不重新启动流的情况下重复订阅。然后,每次从 b 发出新项目时,它都会重新订阅正在进行的 a 流,它将 b 的最新输出与 a 的每个输出相结合。

你可以很容易地将它包装成 RxJava 的 Transformer 的实现,像这样(也是半伪的,所以检查我的语法):

 public class WithLatestFrom<T, U, V> implements Transformer<T, V> {
   private final Func2<T, U, V> function;
   private final Observable<U> latest;

   private WithLatestFrom<T, U, V>(final Observable<U> latest, Func2<T, U, V> function) {
     this.function = function;
     this.latest = latest;
   }

   public static <T, U, V> WithLatestFrom<T, U, V> with(
       final Observable<U> latest, Func2<T, U, V> function) {
     return new WithLatestFrom<T, U, V>(latest, function);
   }

   @Override
   public Observable<V> call(final Observable<T> source) {
     return source.publish((publishedSource) -> latest.switchMap((y) ->
         publishedSource.map((x) -> function.call(x, y)));
   }

}

然后您可以在您的代码中重用它,例如:

a.compose(WithLatestFrom.with(b, (x, y) -> x + y));