RxJava:如何模拟 withLatestFrom?
RxJava: how to emulate withLatestFrom?
根据文档,Java 实现中缺少 withLatestFrom
(与 combineLatest
略有不同)。关于如何模拟它的任何想法?
一些非常简单和天真的实现是:
@SuppressWarnings("unchecked")
public static <T, U, V> Observable<T> combineLatestFrom(
Observable<U> o1,
Observable<V> o2,
Func2<U, V, T> f) {
final Object nothing = new Object();
return Observable.create(s -> {
AtomicReference<V> val2 = new AtomicReference<V>((V) nothing);
o1.subscribe(v -> {
val2.getAndUpdate(current -> {
if (current != nothing) {
s.onNext(f.call(v, current));
}
return current;
});
}, s::onError, s::onCompleted);
o2.subscribe(val2::set, s::onError);
});
}
并且这个方法可以这样使用:
combineLatestFrom(numbers, letters, (n, l) -> n + l)
.subscribe(System.out::println);
如果这里的数字和字母是大理石图中的 Observables -> 结果将如预期。
给定 a
作为主要可观察对象,b
作为 'latest from' 可观察对象,这个伪 java8 lambda 应该做你想做的事:
a.publish(a' -> b.switchMap(y -> a'.map(x -> x + y)))
这首先将a
发布为a'
,这使得它可以在不重新启动流的情况下重复订阅。然后,每次从 b
发出新项目时,它都会重新订阅正在进行的 a
流,它将 b
的最新输出与 a
的每个输出相结合。
你可以很容易地将它包装成 RxJava 的 Transformer
的实现,像这样(也是半伪的,所以检查我的语法):
public class WithLatestFrom<T, U, V> implements Transformer<T, V> {
private final Func2<T, U, V> function;
private final Observable<U> latest;
private WithLatestFrom<T, U, V>(final Observable<U> latest, Func2<T, U, V> function) {
this.function = function;
this.latest = latest;
}
public static <T, U, V> WithLatestFrom<T, U, V> with(
final Observable<U> latest, Func2<T, U, V> function) {
return new WithLatestFrom<T, U, V>(latest, function);
}
@Override
public Observable<V> call(final Observable<T> source) {
return source.publish((publishedSource) -> latest.switchMap((y) ->
publishedSource.map((x) -> function.call(x, y)));
}
}
然后您可以在您的代码中重用它,例如:
a.compose(WithLatestFrom.with(b, (x, y) -> x + y));
根据文档,Java 实现中缺少 withLatestFrom
(与 combineLatest
略有不同)。关于如何模拟它的任何想法?
一些非常简单和天真的实现是:
@SuppressWarnings("unchecked")
public static <T, U, V> Observable<T> combineLatestFrom(
Observable<U> o1,
Observable<V> o2,
Func2<U, V, T> f) {
final Object nothing = new Object();
return Observable.create(s -> {
AtomicReference<V> val2 = new AtomicReference<V>((V) nothing);
o1.subscribe(v -> {
val2.getAndUpdate(current -> {
if (current != nothing) {
s.onNext(f.call(v, current));
}
return current;
});
}, s::onError, s::onCompleted);
o2.subscribe(val2::set, s::onError);
});
}
并且这个方法可以这样使用:
combineLatestFrom(numbers, letters, (n, l) -> n + l)
.subscribe(System.out::println);
如果这里的数字和字母是大理石图中的 Observables -> 结果将如预期。
给定 a
作为主要可观察对象,b
作为 'latest from' 可观察对象,这个伪 java8 lambda 应该做你想做的事:
a.publish(a' -> b.switchMap(y -> a'.map(x -> x + y)))
这首先将a
发布为a'
,这使得它可以在不重新启动流的情况下重复订阅。然后,每次从 b
发出新项目时,它都会重新订阅正在进行的 a
流,它将 b
的最新输出与 a
的每个输出相结合。
你可以很容易地将它包装成 RxJava 的 Transformer
的实现,像这样(也是半伪的,所以检查我的语法):
public class WithLatestFrom<T, U, V> implements Transformer<T, V> {
private final Func2<T, U, V> function;
private final Observable<U> latest;
private WithLatestFrom<T, U, V>(final Observable<U> latest, Func2<T, U, V> function) {
this.function = function;
this.latest = latest;
}
public static <T, U, V> WithLatestFrom<T, U, V> with(
final Observable<U> latest, Func2<T, U, V> function) {
return new WithLatestFrom<T, U, V>(latest, function);
}
@Override
public Observable<V> call(final Observable<T> source) {
return source.publish((publishedSource) -> latest.switchMap((y) ->
publishedSource.map((x) -> function.call(x, y)));
}
}
然后您可以在您的代码中重用它,例如:
a.compose(WithLatestFrom.with(b, (x, y) -> x + y));