在 RxJava 中订阅 Observable 时遇到问题
Trouble with subscribe to Observable in RxJava
我的程序无法从 subscribe()
方法中的 Observable
获得结果。
我正在尝试构建简单的控制台应用程序,它向服务器运行请求,然后打印控制台结果。正如我所见,一切正常,但我无法在 subsribe()
中得到结果。应用程序似乎在结果返回到方法之前完成。
这是我的代码 运行 请求:
coffeeShopApi.getCoffeeShops("")
.subscribeOn(Schedulers.io())
.subscribe({
state.onNext(CoffeeShopViewState.CoffeeShopsLoaded(it))
}, {
it.printStackTrace()
state.onNext(CoffeeShopViewState.Error(it.localizedMessage))
})
执行此代码后,程序以退出代码 0 结束。并且它从主线程中的 main 函数运行。这可能是什么问题?
您的代码存在的问题是您订阅了另一个线程作为当前线程(主线程)。因此,您的主线程在调用单个项目的订阅之前完成。
这是一个带有堆栈跟踪的示例代码:
System.out.println(Thread.currentThread().getName() + ": start...");
Observable<String> stream = Observable.fromIterable(List.of("a", "b", "c"));
stream.subscribeOn(Schedulers.io())
.subscribe(next -> System.out.println(Thread.currentThread().getName() + ": " + next),
error -> error.printStackTrace());
Thread.sleep(1000L);
System.out.println(Thread.currentThread().getName() + ": stop...");
堆栈跟踪:
main: start...
RxCachedThreadScheduler-1: a
RxCachedThreadScheduler-1: b
RxCachedThreadScheduler-1: c
main: stop...
表明observable是在main
线程中创建的。订阅的执行在不同的线程中完成 RxCachedThreadScheduler-1
.
在同一线程上执行的示例:
System.out.println(Thread.currentThread().getName() + ": start...");
Observable<String> stream = Observable.fromIterable(List.of("a", "b", "c"));
stream.subscribe(next -> System.out.println(Thread.currentThread().getName() + ": " + next),
error -> error.printStackTrace());
System.out.println(Thread.currentThread().getName() + ": stop...");
堆栈跟踪:
main: start...
main: a
main: b
main: c
main: stop...
根据用例,可能需要等待流执行完成才能进一步处理该方法,例如通过使用相同的线程或 FutureObserver
。在其他用例中,将昂贵的计算操作外包到不同的线程是一个可行的解决方案。
问题是它是守护线程吗?答案是是。我附上了测试代码:
String message = String.format("%s-isDaemon?%s: start...", Thread.currentThread().getName(), Thread.currentThread().isDaemon());
System.out.println(message);
Observable<String> stream = Observable.fromIterable(List.of("a", "b", "c"));
stream.subscribeOn(Schedulers.io())
.subscribe(next -> {
final String innerMessage = String.format("%s-isDaemon?%s: %s", Thread.currentThread().getName(), Thread.currentThread().isDaemon(), next);
System.out.println(innerMessage);
},
Throwable::printStackTrace);
Thread.sleep(1000L);
message = String.format("%s-isDaemon?%s: stop...", Thread.currentThread().getName(), Thread.currentThread().isDaemon());
System.out.println(message);
stackrace 的输出:
main-isDaemon?false: start...
RxCachedThreadScheduler-1-isDaemon?true: a
RxCachedThreadScheduler-1-isDaemon?true: b
RxCachedThreadScheduler-1-isDaemon?true: c
main-isDaemon?false: stop...
我的程序无法从 subscribe()
方法中的 Observable
获得结果。
我正在尝试构建简单的控制台应用程序,它向服务器运行请求,然后打印控制台结果。正如我所见,一切正常,但我无法在 subsribe()
中得到结果。应用程序似乎在结果返回到方法之前完成。
这是我的代码 运行 请求:
coffeeShopApi.getCoffeeShops("")
.subscribeOn(Schedulers.io())
.subscribe({
state.onNext(CoffeeShopViewState.CoffeeShopsLoaded(it))
}, {
it.printStackTrace()
state.onNext(CoffeeShopViewState.Error(it.localizedMessage))
})
执行此代码后,程序以退出代码 0 结束。并且它从主线程中的 main 函数运行。这可能是什么问题?
您的代码存在的问题是您订阅了另一个线程作为当前线程(主线程)。因此,您的主线程在调用单个项目的订阅之前完成。
这是一个带有堆栈跟踪的示例代码:
System.out.println(Thread.currentThread().getName() + ": start...");
Observable<String> stream = Observable.fromIterable(List.of("a", "b", "c"));
stream.subscribeOn(Schedulers.io())
.subscribe(next -> System.out.println(Thread.currentThread().getName() + ": " + next),
error -> error.printStackTrace());
Thread.sleep(1000L);
System.out.println(Thread.currentThread().getName() + ": stop...");
堆栈跟踪:
main: start...
RxCachedThreadScheduler-1: a
RxCachedThreadScheduler-1: b
RxCachedThreadScheduler-1: c
main: stop...
表明observable是在main
线程中创建的。订阅的执行在不同的线程中完成 RxCachedThreadScheduler-1
.
在同一线程上执行的示例:
System.out.println(Thread.currentThread().getName() + ": start...");
Observable<String> stream = Observable.fromIterable(List.of("a", "b", "c"));
stream.subscribe(next -> System.out.println(Thread.currentThread().getName() + ": " + next),
error -> error.printStackTrace());
System.out.println(Thread.currentThread().getName() + ": stop...");
堆栈跟踪:
main: start...
main: a
main: b
main: c
main: stop...
根据用例,可能需要等待流执行完成才能进一步处理该方法,例如通过使用相同的线程或 FutureObserver
。在其他用例中,将昂贵的计算操作外包到不同的线程是一个可行的解决方案。
问题是它是守护线程吗?答案是是。我附上了测试代码:
String message = String.format("%s-isDaemon?%s: start...", Thread.currentThread().getName(), Thread.currentThread().isDaemon());
System.out.println(message);
Observable<String> stream = Observable.fromIterable(List.of("a", "b", "c"));
stream.subscribeOn(Schedulers.io())
.subscribe(next -> {
final String innerMessage = String.format("%s-isDaemon?%s: %s", Thread.currentThread().getName(), Thread.currentThread().isDaemon(), next);
System.out.println(innerMessage);
},
Throwable::printStackTrace);
Thread.sleep(1000L);
message = String.format("%s-isDaemon?%s: stop...", Thread.currentThread().getName(), Thread.currentThread().isDaemon());
System.out.println(message);
stackrace 的输出:
main-isDaemon?false: start...
RxCachedThreadScheduler-1-isDaemon?true: a
RxCachedThreadScheduler-1-isDaemon?true: b
RxCachedThreadScheduler-1-isDaemon?true: c
main-isDaemon?false: stop...