RxJava CompletableSubject toFlowable/ toObservable 不发射
RxJava CompletableSubject toFlowable/ toObservable doesn't emit
所以我有一个生产者/发布者是 CompletableSubject
我想要一个只读版本供订阅者/观察者使用
但是 toFLowable()
和 toObservable
都没有发出任何东西
我错过了什么?
fun main() {
val publisher = CompletableSubject.create()
val readOnlyStream = publisher.toFlowable<Any>()
println("1 ${publisher.hasComplete()}")
readOnlyStream.subscribe { item ->
println("yay, got it $item")
}
println("2 ${publisher.hasComplete()}")
publisher.onComplete()
println("3 ${publisher.hasComplete()}")
Thread.sleep(3000L)
}
输出:
1 false
2 false
3 true
我发现了奇怪的解决方法
fun main() {
val publisher = CompletableSubject.create()
val readOnlyStream = publisher.toSingle {
"doneee"
}
println("1 ${publisher.hasComplete()}")
readOnlyStream.subscribe { item ->
println("yay, got it $item")
}
println("2 ${publisher.hasComplete()}")
publisher.onComplete()
println("3 ${publisher.hasComplete()}")
Thread.sleep(3000L)
}
输出:
1 false
2 false
yay, got it doneee
3 true
正如 akarnokd 在评论中所说,readOnlyStream.subscribe { item ->
正在为一个项目添加回调,并且从未发出一个项目。
然而,这并不意味着 onComplete 事件没有传播到 Flowable。 onComplete 事件可以这样订阅:
readOnlyStream.subscribe(
{}, // onNext, called for each item
{}, // onError, called once when there is an error
{ println("yay, got onComplete") }, // onComplete, called once when completed
)
completableSubject 的订阅方法没有 onNext 方法,因为 Completables 不发出任何项目,只有 onError 或 onComplete 事件。
所以我有一个生产者/发布者是 CompletableSubject
我想要一个只读版本供订阅者/观察者使用
但是 toFLowable()
和 toObservable
都没有发出任何东西
我错过了什么?
fun main() {
val publisher = CompletableSubject.create()
val readOnlyStream = publisher.toFlowable<Any>()
println("1 ${publisher.hasComplete()}")
readOnlyStream.subscribe { item ->
println("yay, got it $item")
}
println("2 ${publisher.hasComplete()}")
publisher.onComplete()
println("3 ${publisher.hasComplete()}")
Thread.sleep(3000L)
}
输出:
1 false
2 false
3 true
我发现了奇怪的解决方法
fun main() {
val publisher = CompletableSubject.create()
val readOnlyStream = publisher.toSingle {
"doneee"
}
println("1 ${publisher.hasComplete()}")
readOnlyStream.subscribe { item ->
println("yay, got it $item")
}
println("2 ${publisher.hasComplete()}")
publisher.onComplete()
println("3 ${publisher.hasComplete()}")
Thread.sleep(3000L)
}
输出:
1 false
2 false
yay, got it doneee
3 true
正如 akarnokd 在评论中所说,readOnlyStream.subscribe { item ->
正在为一个项目添加回调,并且从未发出一个项目。
然而,这并不意味着 onComplete 事件没有传播到 Flowable。 onComplete 事件可以这样订阅:
readOnlyStream.subscribe(
{}, // onNext, called for each item
{}, // onError, called once when there is an error
{ println("yay, got onComplete") }, // onComplete, called once when completed
)
completableSubject 的订阅方法没有 onNext 方法,因为 Completables 不发出任何项目,只有 onError 或 onComplete 事件。