在 RxJava 中订阅 Observable 时遇到问题

Trouble with subscribe to Observable in RxJava

我的程序无法从 subscribe() 方法中的 Observable 获得结果。

我正在尝试构建简单的控制台应用程序,它向服务器运行请求,然后打印控制台结果。正如我所见,一切正常,但我无法在 subsribe() 中得到结果。应用程序似乎在结果返回到方法之前完成。

这是我的代码 运行 请求:

coffeeShopApi.getCoffeeShops("")
    .subscribeOn(Schedulers.io())
    .subscribe({
         state.onNext(CoffeeShopViewState.CoffeeShopsLoaded(it))
    }, {
         it.printStackTrace()
         state.onNext(CoffeeShopViewState.Error(it.localizedMessage))
    })

执行此代码后,程序以退出代码 0 结束。并且它从主线程中的 main 函数运行。这可能是什么问题?

您的代码存在的问题是您订阅了另一个线程作为当前线程(主线程)。因此,您的主线程在调用单个项目的订阅之前完成。

这是一个带有堆栈跟踪的示例代码:

System.out.println(Thread.currentThread().getName() + ": start...");

Observable<String> stream = Observable.fromIterable(List.of("a", "b", "c"));
stream.subscribeOn(Schedulers.io())
      .subscribe(next -> System.out.println(Thread.currentThread().getName() + ": " + next), 
                 error -> error.printStackTrace());

Thread.sleep(1000L);

System.out.println(Thread.currentThread().getName() + ": stop...");

堆栈跟踪:

main: start...
RxCachedThreadScheduler-1: a
RxCachedThreadScheduler-1: b
RxCachedThreadScheduler-1: c
main: stop...

表明observable是在main线程中创建的。订阅的执行在不同的线程中完成 RxCachedThreadScheduler-1.

在同一线程上执行的示例:

System.out.println(Thread.currentThread().getName() + ": start...");

Observable<String> stream = Observable.fromIterable(List.of("a", "b", "c"));
stream.subscribe(next -> System.out.println(Thread.currentThread().getName() + ": " + next), 
                 error -> error.printStackTrace());

System.out.println(Thread.currentThread().getName() + ": stop...");

堆栈跟踪:

main: start...
main: a
main: b
main: c
main: stop...

根据用例,可能需要等待流执行完成才能进一步处理该方法,例如通过使用相同的线程或 FutureObserver。在其他用例中,将昂贵的计算操作外包到不同的线程是一个可行的解决方案。


问题是它是守护线程吗?答案是。我附上了测试代码:

String message = String.format("%s-isDaemon?%s: start...", Thread.currentThread().getName(), Thread.currentThread().isDaemon());
System.out.println(message);

Observable<String> stream = Observable.fromIterable(List.of("a", "b", "c"));
stream.subscribeOn(Schedulers.io())
      .subscribe(next -> {
                 final String innerMessage = String.format("%s-isDaemon?%s: %s", Thread.currentThread().getName(), Thread.currentThread().isDaemon(), next);
                 System.out.println(innerMessage);
                 },
                 Throwable::printStackTrace);

Thread.sleep(1000L);

message = String.format("%s-isDaemon?%s: stop...", Thread.currentThread().getName(), Thread.currentThread().isDaemon());
System.out.println(message);

stackrace 的输出:

main-isDaemon?false: start...
RxCachedThreadScheduler-1-isDaemon?true: a
RxCachedThreadScheduler-1-isDaemon?true: b
RxCachedThreadScheduler-1-isDaemon?true: c
main-isDaemon?false: stop...