RxJava - 即使调用了 onNext() 如何阻止 PublishSubject 发布

RxJava - how to stop PublishSubject from publishing even if onNext() is called

我看看我在其中调用以下内容:

// class member
var myPublishSubject = PublishSubject.create<SomeObservable>()
// later on in the class somewhere:    
while(true){
   myPublishSubject.onNext(someObservable)
}

我想停止发射,但让 while 循环永远持续下去。所以我希望 onNext 调用什么也不做。但我担心如果我调用 myPublishSubject.onComplete() 最终主题将为空并且我将获得 NPE。即使重复调用 onNext() ,是否还有只是让它静音。退订是最好的方法吗?

几个笔记

这种情况非常罕见,但如果您能向我们展示您对 Observable 的真实意图,我们可能会帮助您构建它,即使不是最好,也可能更好。

你能做什么

对于我的示例,我只使用了一个非常简单的标志变量,这可以根据您的项目的任何触发器进行更改。

选项 1

您可以直接对主题发布者

调用onComplete
val maxEmittedItemCount = 10
var currentEmittedItemCount = 0
val someStringValue = "Some observable" // Create whatever observable you have
val publishSubject = PublishSubject.create<String>()

publishSubject.subscribe({
    currentEmittedItemCount++
    println(it)
}, {
    println(it)
})

while (currentEmittedItemCount != maxEmittedItemCount) {
    // Print indication that the loop is still running
    println("Still looping")

    // Publish value on the subject
    publishSubject.onNext(someStringValue)

    // Test flag for trigger
    if (currentEmittedItemCount == maxEmittedItemCount) publishSubject.onComplete()
}

选项 2

您还可以保留对订阅的引用,然后再处理它,这比前一个更语义化,因为它将在处理资源时不调用 onNext(t) 来执行代码块。

lateinit var disposable: Disposable // Will hold reference to the subscription
var maxEmittedItemCount = 10
var currentEmittedItemCount = 0
var someStringValue = "Some observable" // Create whatever observable you have
var publishSubject = PublishSubject.create<String>()

disposable = publishSubject.subscribeWith(object : DisposableObserver<String>() {
    override fun onComplete() {
        // Print indication of completion for the subject publisher
        System.out.println("Complete")
    }

    override fun onNext(t: String) {
        // Test flag count synchonizer
        currentEmittedItemCount++

        // Print out current emitted item count
        System.out.println(currentEmittedItemCount)

        // Print current string
        System.out.println(t)
    }

    override fun onError(e: Throwable) {
        // Print error
        System.out.println(e)
    }
})

while (currentEmittedItemCount != maxEmittedItemCount) {
    // Publish value on the subject
    if (!disposable.isDisposed) publishSubject.onNext(someStringValue)

    // Test flag for trigger
    if (currentEmittedItemCount == maxEmittedItemCount) {
        publishSubject.onComplete() // optional if you need to invoke `onComplete()` block on the subject
        disposable.dispose()
    }

    // Print indication that the loop is still running
    System.out.println("Still looping")
}

阅读更多

由于订阅了观察者,我们必须调用取消订阅以避免多次 onNext 调用。

我建议在 subject.onNext() 的工作完成后调用 onComplete:

这里有一个例子

PublishSubject<Integer> source = PublishSubject.create();

source.onNext(1);
source.onComplete();

source.subscribe(getObserver());

然后在观察者中,我们重新创建另一个PublishSubject实例

 source.subscribe(new Observer<Boolean>() {
                                        @Override
                                        public void onSubscribe(Disposable d) {

                                    }

                                    @Override
                                    public void onNext(Integer value) {

                                    }

                                    @Override
                                    public void onError(Throwable e) {

                                    }

                                    @Override
                                    public void onComplete() {
                                        source = PublishSubject.create();
                                    }
                                });

希望这种方法对您的请求有所帮助