RxJava CompletableSubject toFlowable/ toObservable 不发射

RxJava CompletableSubject toFlowable/ toObservable doesn't emit

所以我有一个生产者/发布者是 CompletableSubject

我想要一个只读版本供订阅者/观察者使用 但是 toFLowable()toObservable 都没有发出任何东西

我错过了什么?

fun main() {
    val publisher = CompletableSubject.create()

    val readOnlyStream = publisher.toFlowable<Any>()

    println("1 ${publisher.hasComplete()}")
    readOnlyStream.subscribe { item ->
        println("yay, got it $item")
    }

    println("2 ${publisher.hasComplete()}")
    publisher.onComplete()
    println("3 ${publisher.hasComplete()}")

    Thread.sleep(3000L)
}

输出:

1 false
2 false
3 true

我发现了奇怪的解决方法

fun main() {
    val publisher = CompletableSubject.create()

    val readOnlyStream = publisher.toSingle {
        "doneee"
    }

    println("1 ${publisher.hasComplete()}")
    readOnlyStream.subscribe { item ->
        println("yay, got it $item")
    }

    println("2 ${publisher.hasComplete()}")
    publisher.onComplete()
    println("3 ${publisher.hasComplete()}")

    Thread.sleep(3000L)
}

输出:

1 false
2 false
yay, got it doneee
3 true

正如 akarnokd 在评论中所说,readOnlyStream.subscribe { item -> 正在为一个项目添加回调,并且从未发出一个项目。 然而,这并不意味着 onComplete 事件没有传播到 Flowable。 onComplete 事件可以这样订阅:

readOnlyStream.subscribe(
    {}, // onNext, called for each item
    {}, // onError, called once when there is an error
    { println("yay, got onComplete") }, // onComplete, called once when completed
)

completableSubject 的订阅方法没有 onNext 方法,因为 Completables 不发出任何项目,只有 onError 或 onComplete 事件。