OnSubscribe.call 方法调用了两次

OnSubscribe.call method invoked twice

背景: 我希望在 OnSubscribe 的调用方法中调用 Web 服务。有一个自定义订阅者 class 也是这个 Observable 的订阅者。 (以下代码是相同的近似值)

问题: 看到 OnSubscribe.call 方法被调用了两次。

我不清楚 'Observable.create(...)'、subscribe(...) 和最终将 observable 转换为 Blocking 之间的关系。似乎下面一行添加了行为并以某种方式再次调用 OnSubscribe.call 。我假设最终需要调用 toBlocking 调用 - 就在网络服务器返回结果之前(servlet/controller 本质上是非 Rx)。

observable.toBlocking().forEach(i-> System.out.println(i));

下面的完整代码

 public static void main(String args[]){
   Observable<Integer> observable =  Observable.create(new Observable.OnSubscribe<Integer>() {
       @Override
       public void call(Subscriber<? super Integer> observer) {
           System.out.println("In OnSubscribe Create: "+observer.isUnsubscribed());
           try {
               if (!observer.isUnsubscribed()) {
                   for (int i = 1; i < 5; i++) {
                      observer.onNext(i);
                   }
                   observer.onCompleted();
               }
           } catch (Exception e) {
               observer.onError(e);
           }
       }
});

 observable.subscribe(new Subscriber<Integer>() {
    @Override
    public void onNext(Integer item) {
        System.out.println("Next: " + item);
    }

    @Override
    public void onError(Throwable error) {
        System.err.println("Error: " + error.getMessage());
    }

    @Override
    public void onCompleted() {
        System.out.println("Sequence complete.");
    }
});
    observable.toBlocking().forEach(i-> System.out.println(i));
}

输出

In OnSubscribe Create: false
Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.
In OnSubscribe Create: false
For Each Printing 1
For Each Printing 2
For Each Printing 3
For Each Printing 4

回答自己的问题: 我通过 forEach 处理结果的方式是错误的。它本身就是一个订阅者,因此最终会再次调用 call 方法。 执行我正在做的事情的正确方法是通过 CountDownLatch。

这也澄清了我的另一个疑问 - 在我们最终等待工作完成的情况下,就在 controller/servlet 返回响应之前意味着使用带超时的锁存器。

下面的代码。

public static void main(String args[]) throws Exception {
    CountDownLatch latch = new CountDownLatch(4);
    Observable<Integer> observable =  Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> observer) {
            System.out.println("In OnSubscribe Create: "+observer.isUnsubscribed());
            try {
                if (!observer.isUnsubscribed()) {
                    for (int i = 1; i < 5; i++) {
                        observer.onNext(i);
                        latch.countDown();

                    }
                    observer.onCompleted();
                }
            } catch (Exception e) {
                observer.onError(e);
            }
        }
    });

    observable.subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });

    latch.await();
    System.out.println(" Wait for Latch Over");
}

这可能不是那么重要,因为您已经解决了您的问题,但使用原始 OnSubscribe 创建可观察对象通常容易出错。更好的方法是使用现有的助手之一 类:SyncOnSubscribe/AsyncOnSubscribe (since RxJava 1.0.15) or AbstractOnSubscribe(直到 RxJava v1.1.0)。它们为您提供了一个现成的框架来管理可观察对象的状态和背压,让您(主要)处理自己的逻辑。

toBlocking 可以简化为使用闩锁进行订阅,因此您的两个订阅。

但我认为您使用 toBlocking().forEach() 是正确的,应该保留该部分作为解决方案。

那么问题是您的第二次订阅,通过调用 subscribe 进行。这个 subscribe 真的有必要吗?如果它只是关于日志记录而不是实际修改数据流,您可以使用 doOnNextdoOnErrordoOnCompleted

如果您的用例比问题中反映的要复杂一些,并且您确实需要等待两个 "parallel" 异步处理,那么您可能需要使用 [=17= 加入并等待] (或类似的)...请记住,如果你想要并行性,RxJava 在这方面是不可知的,你需要使用 SchedulerssubscribeOn/observeOn.[= 来驱动它21=]