在不调用 onComplete 的情况下创建 RX Observable 是否有效?

Creating a RX Observable without calling onComplete is it Valid?

我不是 Rx 方面的专家,抱歉,如果这是一个微不足道的问题 so.I 我正在做一个轮询操作,我必须等待更新,我为此创建了 Rx Observable;但是,我从不调用 onComplete。但在 onDestroy 中我退订了。请看下面的代码。

@Reusable
class PollingExample @Inject constructor() {
    var itemObservable: Observable<List<Item>>
        private set
    private lateinit var itemObservableEmitter: WeakReference<ObservableEmitter<List<Item>>>

    init {
        itemObservable = Observable.create { e -> itemObservableEmitter = WeakReference(e) }
    }

    fun submitData(items: List<Item>) {
        itemObservableEmitter.get()?.onNext(items)
    }
}

这样做是否有效

经过一些搜索我认为只要我不想调用 onError()onComplete() 最好使用这个库 https://github.com/JakeWharton/RxRelay 它保证一切都将继续工作而无需担心意外触发终端状态