OnSubscribe.call 方法调用了两次
OnSubscribe.call method invoked twice
背景:
我希望在 OnSubscribe 的调用方法中调用 Web 服务。有一个自定义订阅者 class 也是这个 Observable 的订阅者。 (以下代码是相同的近似值)
问题:
看到 OnSubscribe.call 方法被调用了两次。
我不清楚 'Observable.create(...)'、subscribe(...) 和最终将 observable 转换为 Blocking 之间的关系。似乎下面一行添加了行为并以某种方式再次调用 OnSubscribe.call 。我假设最终需要调用 toBlocking 调用 - 就在网络服务器返回结果之前(servlet/controller 本质上是非 Rx)。
observable.toBlocking().forEach(i-> System.out.println(i));
下面的完整代码
public static void main(String args[]){
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> observer) {
System.out.println("In OnSubscribe Create: "+observer.isUnsubscribed());
try {
if (!observer.isUnsubscribed()) {
for (int i = 1; i < 5; i++) {
observer.onNext(i);
}
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
});
observable.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
observable.toBlocking().forEach(i-> System.out.println(i));
}
输出
In OnSubscribe Create: false
Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.
In OnSubscribe Create: false
For Each Printing 1
For Each Printing 2
For Each Printing 3
For Each Printing 4
回答自己的问题:
我通过 forEach 处理结果的方式是错误的。它本身就是一个订阅者,因此最终会再次调用 call 方法。
执行我正在做的事情的正确方法是通过 CountDownLatch。
这也澄清了我的另一个疑问 - 在我们最终等待工作完成的情况下,就在 controller/servlet 返回响应之前意味着使用带超时的锁存器。
下面的代码。
public static void main(String args[]) throws Exception {
CountDownLatch latch = new CountDownLatch(4);
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> observer) {
System.out.println("In OnSubscribe Create: "+observer.isUnsubscribed());
try {
if (!observer.isUnsubscribed()) {
for (int i = 1; i < 5; i++) {
observer.onNext(i);
latch.countDown();
}
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
});
observable.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
latch.await();
System.out.println(" Wait for Latch Over");
}
这可能不是那么重要,因为您已经解决了您的问题,但使用原始 OnSubscribe 创建可观察对象通常容易出错。更好的方法是使用现有的助手之一 类:SyncOnSubscribe/AsyncOnSubscribe (since RxJava 1.0.15) or AbstractOnSubscribe(直到 RxJava v1.1.0)。它们为您提供了一个现成的框架来管理可观察对象的状态和背压,让您(主要)处理自己的逻辑。
toBlocking
可以简化为使用闩锁进行订阅,因此您的两个订阅。
但我认为您使用 toBlocking().forEach()
是正确的,应该保留该部分作为解决方案。
那么问题是您的第二次订阅,通过调用 subscribe
进行。这个 subscribe
真的有必要吗?如果它只是关于日志记录而不是实际修改数据流,您可以使用 doOnNext
、doOnError
和 doOnCompleted
。
如果您的用例比问题中反映的要复杂一些,并且您确实需要等待两个 "parallel" 异步处理,那么您可能需要使用 [=17= 加入并等待] (或类似的)...请记住,如果你想要并行性,RxJava 在这方面是不可知的,你需要使用 Schedulers
、subscribeOn
/observeOn
.[= 来驱动它21=]
背景: 我希望在 OnSubscribe 的调用方法中调用 Web 服务。有一个自定义订阅者 class 也是这个 Observable 的订阅者。 (以下代码是相同的近似值)
问题: 看到 OnSubscribe.call 方法被调用了两次。
我不清楚 'Observable.create(...)'、subscribe(...) 和最终将 observable 转换为 Blocking 之间的关系。似乎下面一行添加了行为并以某种方式再次调用 OnSubscribe.call 。我假设最终需要调用 toBlocking 调用 - 就在网络服务器返回结果之前(servlet/controller 本质上是非 Rx)。
observable.toBlocking().forEach(i-> System.out.println(i));
下面的完整代码
public static void main(String args[]){
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> observer) {
System.out.println("In OnSubscribe Create: "+observer.isUnsubscribed());
try {
if (!observer.isUnsubscribed()) {
for (int i = 1; i < 5; i++) {
observer.onNext(i);
}
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
});
observable.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
observable.toBlocking().forEach(i-> System.out.println(i));
}
输出
In OnSubscribe Create: false
Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.
In OnSubscribe Create: false
For Each Printing 1
For Each Printing 2
For Each Printing 3
For Each Printing 4
回答自己的问题: 我通过 forEach 处理结果的方式是错误的。它本身就是一个订阅者,因此最终会再次调用 call 方法。 执行我正在做的事情的正确方法是通过 CountDownLatch。
这也澄清了我的另一个疑问 - 在我们最终等待工作完成的情况下,就在 controller/servlet 返回响应之前意味着使用带超时的锁存器。
下面的代码。
public static void main(String args[]) throws Exception {
CountDownLatch latch = new CountDownLatch(4);
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> observer) {
System.out.println("In OnSubscribe Create: "+observer.isUnsubscribed());
try {
if (!observer.isUnsubscribed()) {
for (int i = 1; i < 5; i++) {
observer.onNext(i);
latch.countDown();
}
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
});
observable.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
latch.await();
System.out.println(" Wait for Latch Over");
}
这可能不是那么重要,因为您已经解决了您的问题,但使用原始 OnSubscribe 创建可观察对象通常容易出错。更好的方法是使用现有的助手之一 类:SyncOnSubscribe/AsyncOnSubscribe (since RxJava 1.0.15) or AbstractOnSubscribe(直到 RxJava v1.1.0)。它们为您提供了一个现成的框架来管理可观察对象的状态和背压,让您(主要)处理自己的逻辑。
toBlocking
可以简化为使用闩锁进行订阅,因此您的两个订阅。
但我认为您使用 toBlocking().forEach()
是正确的,应该保留该部分作为解决方案。
那么问题是您的第二次订阅,通过调用 subscribe
进行。这个 subscribe
真的有必要吗?如果它只是关于日志记录而不是实际修改数据流,您可以使用 doOnNext
、doOnError
和 doOnCompleted
。
如果您的用例比问题中反映的要复杂一些,并且您确实需要等待两个 "parallel" 异步处理,那么您可能需要使用 [=17= 加入并等待] (或类似的)...请记住,如果你想要并行性,RxJava 在这方面是不可知的,你需要使用 Schedulers
、subscribeOn
/observeOn
.[= 来驱动它21=]