为什么我的 RxJava Observable 除非阻塞,否则不会发出或完成?
Why does my RxJava Observable not emit or complete unless it's blocking?
背景
我有许多 RxJava Observables(从 Jersey 客户端生成,或者使用 Observable.just(someObject)
存根)。它们都应该只发出一个值。我有一个模拟所有 Jersey 客户端并使用 Observable.just(someObject)
的组件测试,我看到那里的行为与 运行 生产代码时相同。
我有几个 classes 作用于这些可观察量,执行一些计算(和一些副作用 - 我可能会让它们稍后直接 return 值)和 return 空void observables.
有一次,在这样的 class 中,我试图压缩我的几个源可观察对象,然后映射它们 - 如下所示:
public Observable<Void> doCalculation() {
return Observable.zip(
getObservable1(),
getObservable2(),
getObservable3(),
UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
}
// in Unifying Object
public Observable<Void> processToNewObservable() {
// ... do some calculation ...
return Observable.empty();
}
然后计算 classes 全部合并并等待:
// Wait for rule computations to complete
List<Observable<Void>> calculations = ...;
Observable.zip(calculations, results -> results)
.toBlocking().lastOrDefault(null);
问题
问题是,processToNewObservable()
永远不会被执行。通过消除过程,我可以看到问题出在 getObservable1()
- 如果我用 Observable.just(null)
替换它,一切都会按照我想象的那样执行(但在我想要一个真实值的地方有一个空值)。
重申一下,getObservable1()
return在生产代码中是来自 Jersey 客户端的 Observable,但在我的测试中该客户端是 Mockito 模拟 returning Observable.just(someValue)
.
调查
如果我将 getObservable1()
转换为阻塞,然后将第一个值包装在 just()
中,同样,一切都按照我的想象执行(但我不想引入阻塞步骤) :
Observable.zip(
Observable.just(getObservable1().toBlocking().first()),
getObservable2(),
getObservable3(),
UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
我的第一个想法是,也许其他东西正在消耗我的 observable 发出的值,并且 zip
看到它已经完成,因此确定压缩它们的结果应该是一个空的 observable。我已经尝试将 .cache()
添加到我认为相关的每个可观察源上,但是,这并没有改变行为。
我还尝试在 zip 之前在 getObservable1 上添加 next / error / complete / finally 处理程序(不将其转换为阻塞),但是 none 其中执行了:
getObservable1()
.doOnNext(...)
.doOnCompleted(...)
.doOnError(...)
.finallyDo(...);
Observable.zip(
getObservable1(),
getObservable2(),
getObservable3(),
UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
问题
我是 RxJava 的新手,所以我很确定我错过了一些基本的东西。问题是:我能做什么蠢事?如果从我目前所说的来看这不是很明显,我可以做些什么来帮助诊断问题?
Observable 必须发射才能启动链。您必须将您的管道视为 Observable 发出时会发生什么的声明。
您没有分享实际观察到的内容,但 Observable.just() 导致 Observable 立即发出包装对象。
根据评论中的回复,getObservable
中的任何一个都没有 return 任何值但只是完成,或者 Mockito 模拟做错了。以下独立示例对我有用。你能检查它并开始慢慢改变它看看哪里出了问题吗?
Observable.zip(
Observable.just(1),
Observable.just(2),
Observable.just(3),
(a, b, c) -> new Integer[] { a, b, c })
.concatMap(a -> Observable.from(a))
.subscribe(System.out::println)
;
注意:我在这里发现我的答案不是很令人满意,所以我进一步挖掘并发现了一个小得多的复制案例,所以我问了一个新的问题在这里:Why does my RxJava Observable emit only to the first consumer?
我至少已经解决了部分问题(并且,向所有试图回答的人道歉,根据我的解释,我认为您没有太多机会)。
执行这些计算的各种 类 都是 returning Observable.empty()
(按照我原来示例中的 processToNewObservable()
)。据我所知,Observable.zip()
不会订阅它正在压缩的第 N 个可观察对象,直到第 N-1 个可观察对象发出一个值。
我原来的例子声称 getObservable1()
行为不当 - 实际上有点不准确,后来在参数列表中观察到了。据我了解,之所以让它阻塞,然后将该值转换为 Observable 再次起作用,是因为让它阻塞并首先调用强制执行,我得到了我想要的副作用。
如果我将所有计算分类更改为 return Observable.just(null)
,一切正常:所有计算的最终 zip()
类' observables 通过它们全部工作, 所以所有预期的副作用都会发生。
从设计的角度来看,返回 null Void 似乎我肯定做错了什么,但至少这个特定问题得到了回答。
背景
我有许多 RxJava Observables(从 Jersey 客户端生成,或者使用 Observable.just(someObject)
存根)。它们都应该只发出一个值。我有一个模拟所有 Jersey 客户端并使用 Observable.just(someObject)
的组件测试,我看到那里的行为与 运行 生产代码时相同。
我有几个 classes 作用于这些可观察量,执行一些计算(和一些副作用 - 我可能会让它们稍后直接 return 值)和 return 空void observables.
有一次,在这样的 class 中,我试图压缩我的几个源可观察对象,然后映射它们 - 如下所示:
public Observable<Void> doCalculation() {
return Observable.zip(
getObservable1(),
getObservable2(),
getObservable3(),
UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
}
// in Unifying Object
public Observable<Void> processToNewObservable() {
// ... do some calculation ...
return Observable.empty();
}
然后计算 classes 全部合并并等待:
// Wait for rule computations to complete
List<Observable<Void>> calculations = ...;
Observable.zip(calculations, results -> results)
.toBlocking().lastOrDefault(null);
问题
问题是,processToNewObservable()
永远不会被执行。通过消除过程,我可以看到问题出在 getObservable1()
- 如果我用 Observable.just(null)
替换它,一切都会按照我想象的那样执行(但在我想要一个真实值的地方有一个空值)。
重申一下,getObservable1()
return在生产代码中是来自 Jersey 客户端的 Observable,但在我的测试中该客户端是 Mockito 模拟 returning Observable.just(someValue)
.
调查
如果我将 getObservable1()
转换为阻塞,然后将第一个值包装在 just()
中,同样,一切都按照我的想象执行(但我不想引入阻塞步骤) :
Observable.zip(
Observable.just(getObservable1().toBlocking().first()),
getObservable2(),
getObservable3(),
UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
我的第一个想法是,也许其他东西正在消耗我的 observable 发出的值,并且 zip
看到它已经完成,因此确定压缩它们的结果应该是一个空的 observable。我已经尝试将 .cache()
添加到我认为相关的每个可观察源上,但是,这并没有改变行为。
我还尝试在 zip 之前在 getObservable1 上添加 next / error / complete / finally 处理程序(不将其转换为阻塞),但是 none 其中执行了:
getObservable1()
.doOnNext(...)
.doOnCompleted(...)
.doOnError(...)
.finallyDo(...);
Observable.zip(
getObservable1(),
getObservable2(),
getObservable3(),
UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
问题
我是 RxJava 的新手,所以我很确定我错过了一些基本的东西。问题是:我能做什么蠢事?如果从我目前所说的来看这不是很明显,我可以做些什么来帮助诊断问题?
Observable 必须发射才能启动链。您必须将您的管道视为 Observable 发出时会发生什么的声明。
您没有分享实际观察到的内容,但 Observable.just() 导致 Observable 立即发出包装对象。
根据评论中的回复,getObservable
中的任何一个都没有 return 任何值但只是完成,或者 Mockito 模拟做错了。以下独立示例对我有用。你能检查它并开始慢慢改变它看看哪里出了问题吗?
Observable.zip(
Observable.just(1),
Observable.just(2),
Observable.just(3),
(a, b, c) -> new Integer[] { a, b, c })
.concatMap(a -> Observable.from(a))
.subscribe(System.out::println)
;
注意:我在这里发现我的答案不是很令人满意,所以我进一步挖掘并发现了一个小得多的复制案例,所以我问了一个新的问题在这里:Why does my RxJava Observable emit only to the first consumer?
我至少已经解决了部分问题(并且,向所有试图回答的人道歉,根据我的解释,我认为您没有太多机会)。
执行这些计算的各种 类 都是 returning Observable.empty()
(按照我原来示例中的 processToNewObservable()
)。据我所知,Observable.zip()
不会订阅它正在压缩的第 N 个可观察对象,直到第 N-1 个可观察对象发出一个值。
我原来的例子声称 getObservable1()
行为不当 - 实际上有点不准确,后来在参数列表中观察到了。据我了解,之所以让它阻塞,然后将该值转换为 Observable 再次起作用,是因为让它阻塞并首先调用强制执行,我得到了我想要的副作用。
如果我将所有计算分类更改为 return Observable.just(null)
,一切正常:所有计算的最终 zip()
类' observables 通过它们全部工作, 所以所有预期的副作用都会发生。
从设计的角度来看,返回 null Void 似乎我肯定做错了什么,但至少这个特定问题得到了回答。