Rx 运算符,开始时类似于 combineLatest,但随后的行为类似于 withLatestFrom
Rx operator that starts like combineLatest but then acts like withLatestFrom
我正在寻找一个运算符,它通过在 Observables
都发出一个元素(类似于 combineLatest
)之前不发出任何东西来组合两个 Observables
,但随后只发出元素将一个 Observable
中的元素与另一个 Observable
中最近发出的元素相结合(类似于 withLatestFrom
)。结果看起来像这样(y
可观察到的是 "control"):
有这样的运营商吗?
它不是很漂亮,但它有效(在 C# 中):
var xs = new Subject<string>();
var ys = new Subject<int>();
var query =
Observable
.Merge(
xs.Select(x => new { xt = true, yt = false, x, y = default(int) }),
ys.Select(y => new { xt = false, yt = true, x = default(string), y }))
.StartWith(new { xt = false, yt = false, x = default(string), y = default(int) })
.Scan((a, b) => new
{
xt = a.xt && a.yt ? false : a.xt || b.xt,
yt = a.xt && a.yt ? false : a.yt || b.yt,
x = b.xt ? b.x : a.x,
y = b.yt ? b.y : a.y
})
.Where(z => z.xt & z.yt)
.Select(z => z.y + z.x);
query.Subscribe(v => Console.WriteLine(v));
ys.OnNext(1);
ys.OnNext(2);
xs.OnNext("A");
xs.OnNext("B");
xs.OnNext("C");
xs.OnNext("D");
ys.OnNext(3);
xs.OnNext("E");
它给出:
2A
3D
我在 Java 中解决了这个问题,但同样的理论应该适用于你。
您拥有的实际上是两种基本模式; combineLatest
值后跟 withLatestFrom
值。如果 withLatestFrom
先触发,那么您要跳过 combineLatest
值。
首先让 withLatestFrom
可观察:
Observable<Result> wlf = o1.withLatestFrom(o2, f::apply);
接下来我们要创建发出单个值的 combineLatest
observable。我们还想在 wlf
触发时停止这个 observable:
Observable<Result> cl = Observable.combineLatest(o1, o2, f::apply)
.take(1).takeUntil(wlf);
最后将这两个 observable 添加到一起...为了方便,我制作了一个辅助方法来接受任何两个 observable 和一个 bi-function 运算符:
public static <Result,
Param1, Source1 extends Param1,
Param2, Source2 extends Param2>
Observable<Result> combineThenLatestFrom(
final Observable<Source1> o1,
final Observable<Source2> o2,
final BiFunction<Param1, Param2, Result> f
) {
final Observable<Result> base = o1
.withLatestFrom(o2, f::apply);
return Observable
.combineLatest(o1, o2, f::apply)
.take(1).takeUntil(base)
.mergeWith(base);
}
下面是我用来验证方法的测试代码:
public static void main(final String[] args) {
final TestScheduler scheduler = new TestScheduler();
final TestSubject<String> o1 = TestSubject.create(scheduler);
final TestSubject<String> o2 = TestSubject.create(scheduler);
final Observable<String> r = combineThenLatestFrom(o1, o2, (a, b) -> a + b);
r.subscribe(System.out::println);
o1.onNext("1");
o1.onNext("2");
o2.onNext("A");
o2.onNext("B");
o2.onNext("C");
o2.onNext("D");
o1.onNext("3");
o2.onNext("E");
scheduler.triggerActions();
}
输出:
2A
3D
我正在寻找一个运算符,它通过在 Observables
都发出一个元素(类似于 combineLatest
)之前不发出任何东西来组合两个 Observables
,但随后只发出元素将一个 Observable
中的元素与另一个 Observable
中最近发出的元素相结合(类似于 withLatestFrom
)。结果看起来像这样(y
可观察到的是 "control"):
有这样的运营商吗?
它不是很漂亮,但它有效(在 C# 中):
var xs = new Subject<string>();
var ys = new Subject<int>();
var query =
Observable
.Merge(
xs.Select(x => new { xt = true, yt = false, x, y = default(int) }),
ys.Select(y => new { xt = false, yt = true, x = default(string), y }))
.StartWith(new { xt = false, yt = false, x = default(string), y = default(int) })
.Scan((a, b) => new
{
xt = a.xt && a.yt ? false : a.xt || b.xt,
yt = a.xt && a.yt ? false : a.yt || b.yt,
x = b.xt ? b.x : a.x,
y = b.yt ? b.y : a.y
})
.Where(z => z.xt & z.yt)
.Select(z => z.y + z.x);
query.Subscribe(v => Console.WriteLine(v));
ys.OnNext(1);
ys.OnNext(2);
xs.OnNext("A");
xs.OnNext("B");
xs.OnNext("C");
xs.OnNext("D");
ys.OnNext(3);
xs.OnNext("E");
它给出:
2A 3D
我在 Java 中解决了这个问题,但同样的理论应该适用于你。
您拥有的实际上是两种基本模式; combineLatest
值后跟 withLatestFrom
值。如果 withLatestFrom
先触发,那么您要跳过 combineLatest
值。
首先让 withLatestFrom
可观察:
Observable<Result> wlf = o1.withLatestFrom(o2, f::apply);
接下来我们要创建发出单个值的 combineLatest
observable。我们还想在 wlf
触发时停止这个 observable:
Observable<Result> cl = Observable.combineLatest(o1, o2, f::apply)
.take(1).takeUntil(wlf);
最后将这两个 observable 添加到一起...为了方便,我制作了一个辅助方法来接受任何两个 observable 和一个 bi-function 运算符:
public static <Result,
Param1, Source1 extends Param1,
Param2, Source2 extends Param2>
Observable<Result> combineThenLatestFrom(
final Observable<Source1> o1,
final Observable<Source2> o2,
final BiFunction<Param1, Param2, Result> f
) {
final Observable<Result> base = o1
.withLatestFrom(o2, f::apply);
return Observable
.combineLatest(o1, o2, f::apply)
.take(1).takeUntil(base)
.mergeWith(base);
}
下面是我用来验证方法的测试代码:
public static void main(final String[] args) {
final TestScheduler scheduler = new TestScheduler();
final TestSubject<String> o1 = TestSubject.create(scheduler);
final TestSubject<String> o2 = TestSubject.create(scheduler);
final Observable<String> r = combineThenLatestFrom(o1, o2, (a, b) -> a + b);
r.subscribe(System.out::println);
o1.onNext("1");
o1.onNext("2");
o2.onNext("A");
o2.onNext("B");
o2.onNext("C");
o2.onNext("D");
o1.onNext("3");
o2.onNext("E");
scheduler.triggerActions();
}
输出:
2A
3D