当底层数据源有新值时理解 RxJava observable
Understanding RxJava observable when underlying data source has new values
我正在尝试试验 RxJava 可观察和观察者代码。我的 objective 是检查当底层源接收到新数据值时事情是如何工作的。我的代码如下:
List<Integer> numbers = new ArrayList<>();
Runnable r = new Runnable() {
@Override
public void run() {
int i = 100;
while(i < 110) {
numbers.add(i);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
i++;
}
}
};
numbers.add(0);
numbers.add(1);
numbers.add(2);
Observable.fromIterable(numbers)
.observeOn(Schedulers.io())
.subscribe(i -> System.out.println("Received "+i+ " on "+ Thread.currentThread().getName()),
e -> e.printStackTrace());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Thread t = new Thread(r);
t.start();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
所以我有一个数字列表。然后我有一个 runnable,它将新数字添加到这个列表中,添加之间有时间间隔。我还没有开始线程。我将 0,1,2 添加到列表中,然后用它创建一个可观察对象,将观察者调度到池中的一个线程上,最后订阅可观察对象。当订阅发生时,observable 发出值 0、1、2 并调用观察者(执行传递给订阅的 lambda)。然后我在主线程上引入了 1 秒的延迟,然后我使用之前创建的可运行对象生成了一个新线程,并添加了一个最终延迟,这样应用程序就不会立即退出。
我期望的是,当新数字添加到列表中时,必须调用观察者,从而打印消息。但那不会发生。当然,我的理解有误。我还需要将 observable 放在调度程序上吗?
带有onComplete
参数的Observable.fromIterable()
method is a "one time" load of values for an observable each time a subscription is build. What happens "after" building the subscription has no affect anymore. When you use the subscribe(onNext, onError, onComplete)
方法你会看到订阅已经完全消费,三个初始值已经打印出来。
您可以在使用 onNext()
方法的地方使用 Subject
(something like a PublishSubject
) 添加“新值”,而之前构建的订阅仍处于活动状态(且未完成)。这样您就可以先构建订阅,然后继续调用 onNext()
获取主题中的新值,直到完成并调用 onCompleted()
.
我正在尝试试验 RxJava 可观察和观察者代码。我的 objective 是检查当底层源接收到新数据值时事情是如何工作的。我的代码如下:
List<Integer> numbers = new ArrayList<>();
Runnable r = new Runnable() {
@Override
public void run() {
int i = 100;
while(i < 110) {
numbers.add(i);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
i++;
}
}
};
numbers.add(0);
numbers.add(1);
numbers.add(2);
Observable.fromIterable(numbers)
.observeOn(Schedulers.io())
.subscribe(i -> System.out.println("Received "+i+ " on "+ Thread.currentThread().getName()),
e -> e.printStackTrace());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Thread t = new Thread(r);
t.start();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
所以我有一个数字列表。然后我有一个 runnable,它将新数字添加到这个列表中,添加之间有时间间隔。我还没有开始线程。我将 0,1,2 添加到列表中,然后用它创建一个可观察对象,将观察者调度到池中的一个线程上,最后订阅可观察对象。当订阅发生时,observable 发出值 0、1、2 并调用观察者(执行传递给订阅的 lambda)。然后我在主线程上引入了 1 秒的延迟,然后我使用之前创建的可运行对象生成了一个新线程,并添加了一个最终延迟,这样应用程序就不会立即退出。
我期望的是,当新数字添加到列表中时,必须调用观察者,从而打印消息。但那不会发生。当然,我的理解有误。我还需要将 observable 放在调度程序上吗?
带有onComplete
参数的Observable.fromIterable()
method is a "one time" load of values for an observable each time a subscription is build. What happens "after" building the subscription has no affect anymore. When you use the subscribe(onNext, onError, onComplete)
方法你会看到订阅已经完全消费,三个初始值已经打印出来。
您可以在使用 onNext()
方法的地方使用 Subject
(something like a PublishSubject
) 添加“新值”,而之前构建的订阅仍处于活动状态(且未完成)。这样您就可以先构建订阅,然后继续调用 onNext()
获取主题中的新值,直到完成并调用 onCompleted()
.