Rx(Java 的响应式扩展)带时间间隔的 Zip 运算符
Rx (Reactive Extensions for Java) Zip operator with time interval
我是 RxJava 的新手,我已经研究了一段时间的运算符。
我看到这个在短时间间隔 (1s) 后发出项目的小例子:
Observable<String> data = Observable.just("one", "two", "three", "four", "five");
Observable.zip(data, Observable.interval(1, TimeUnit.SECONDS), (d, t) -> {
return d + " " + t;
}).toBlocking().forEach(System.out::println);
这有效,但是当我删除将源变成 BlockingObservable 的 toBlocking()
时,程序执行并结束时没有输出。
我通常会看弹珠图来正确理解事物:
http://reactivex.io/documentation/operators/zip.html
最后一句话说:它只会发出与发出最少项目的源 Observable 发出的项目数量一样多的项目。
这是否意味着 data
Observable 在不到 1 秒的时间内发出所有项目并在打印每个 Observable 的前两个项目之前结束?因为每个 Observable 本身都是异步的?
我需要清楚地了解正在发生的事情,以及是否有其他方法可以处理类似的情况。有人吗?
基本上你的主程序在可观察对象有机会发出任何东西之前退出,这就是你看不到任何输出的原因。修复它的方法是以某种方式阻塞,直到 Observable
发出所有项目,这是使用 CountDownLatch
:
的一种方法
CountDownLatch latch = new CountDownLatch(1);
Observable<String> data = Observable.just("one", "two", "three", "four", "five");
Observable.zip(data, Observable.interval(1, TimeUnit.SECONDS), (d, t) -> {
return d + " " + t;
}).finallyDo(latch::countDown).forEach(System.out::println);
latch.await(10, TimeUnit.SECONDS);
Observable.interval
在幕后使用 Scheduler
。它将从另一个线程发出。与此同时,主线程已经完成了所有的组合,即将退出。大概您在 main
方法中有此代码,这就是您的程序退出的原因。
在真实系统中这应该不是问题(除非你的真实系统是一个包含这段代码的 main
方法)。
在示例程序中,您可以通过从标准输入读取一个字节来导致主线程阻塞。像这样:
Observable<String> data = Observable.just("one", "two", "three", "four", "five");
Observable.zip(data, Observable.interval(1, TimeUnit.SECONDS), (d, t) -> d + " " + t)
.subscribe(System.out::println);
System.in.read();
我是 RxJava 的新手,我已经研究了一段时间的运算符。
我看到这个在短时间间隔 (1s) 后发出项目的小例子:
Observable<String> data = Observable.just("one", "two", "three", "four", "five");
Observable.zip(data, Observable.interval(1, TimeUnit.SECONDS), (d, t) -> {
return d + " " + t;
}).toBlocking().forEach(System.out::println);
这有效,但是当我删除将源变成 BlockingObservable 的 toBlocking()
时,程序执行并结束时没有输出。
我通常会看弹珠图来正确理解事物: http://reactivex.io/documentation/operators/zip.html
最后一句话说:它只会发出与发出最少项目的源 Observable 发出的项目数量一样多的项目。
这是否意味着 data
Observable 在不到 1 秒的时间内发出所有项目并在打印每个 Observable 的前两个项目之前结束?因为每个 Observable 本身都是异步的?
我需要清楚地了解正在发生的事情,以及是否有其他方法可以处理类似的情况。有人吗?
基本上你的主程序在可观察对象有机会发出任何东西之前退出,这就是你看不到任何输出的原因。修复它的方法是以某种方式阻塞,直到 Observable
发出所有项目,这是使用 CountDownLatch
:
CountDownLatch latch = new CountDownLatch(1);
Observable<String> data = Observable.just("one", "two", "three", "four", "five");
Observable.zip(data, Observable.interval(1, TimeUnit.SECONDS), (d, t) -> {
return d + " " + t;
}).finallyDo(latch::countDown).forEach(System.out::println);
latch.await(10, TimeUnit.SECONDS);
Observable.interval
在幕后使用 Scheduler
。它将从另一个线程发出。与此同时,主线程已经完成了所有的组合,即将退出。大概您在 main
方法中有此代码,这就是您的程序退出的原因。
在真实系统中这应该不是问题(除非你的真实系统是一个包含这段代码的 main
方法)。
在示例程序中,您可以通过从标准输入读取一个字节来导致主线程阻塞。像这样:
Observable<String> data = Observable.just("one", "two", "three", "four", "five");
Observable.zip(data, Observable.interval(1, TimeUnit.SECONDS), (d, t) -> d + " " + t)
.subscribe(System.out::println);
System.in.read();