RxJava - 即使调用了 onNext() 如何阻止 PublishSubject 发布
RxJava - how to stop PublishSubject from publishing even if onNext() is called
我看看我在其中调用以下内容:
// class member
var myPublishSubject = PublishSubject.create<SomeObservable>()
// later on in the class somewhere:
while(true){
myPublishSubject.onNext(someObservable)
}
我想停止发射,但让 while 循环永远持续下去。所以我希望 onNext 调用什么也不做。但我担心如果我调用 myPublishSubject.onComplete() 最终主题将为空并且我将获得 NPE。即使重复调用 onNext() ,是否还有只是让它静音。退订是最好的方法吗?
几个笔记
这种情况非常罕见,但如果您能向我们展示您对 Observable
的真实意图,我们可能会帮助您构建它,即使不是最好,也可能更好。
你能做什么
对于我的示例,我只使用了一个非常简单的标志变量,这可以根据您的项目的任何触发器进行更改。
选项 1
您可以直接对主题发布者
调用onComplete
val maxEmittedItemCount = 10
var currentEmittedItemCount = 0
val someStringValue = "Some observable" // Create whatever observable you have
val publishSubject = PublishSubject.create<String>()
publishSubject.subscribe({
currentEmittedItemCount++
println(it)
}, {
println(it)
})
while (currentEmittedItemCount != maxEmittedItemCount) {
// Print indication that the loop is still running
println("Still looping")
// Publish value on the subject
publishSubject.onNext(someStringValue)
// Test flag for trigger
if (currentEmittedItemCount == maxEmittedItemCount) publishSubject.onComplete()
}
选项 2
您还可以保留对订阅的引用,然后再处理它,这比前一个更语义化,因为它将在处理资源时不调用 onNext(t)
来执行代码块。
lateinit var disposable: Disposable // Will hold reference to the subscription
var maxEmittedItemCount = 10
var currentEmittedItemCount = 0
var someStringValue = "Some observable" // Create whatever observable you have
var publishSubject = PublishSubject.create<String>()
disposable = publishSubject.subscribeWith(object : DisposableObserver<String>() {
override fun onComplete() {
// Print indication of completion for the subject publisher
System.out.println("Complete")
}
override fun onNext(t: String) {
// Test flag count synchonizer
currentEmittedItemCount++
// Print out current emitted item count
System.out.println(currentEmittedItemCount)
// Print current string
System.out.println(t)
}
override fun onError(e: Throwable) {
// Print error
System.out.println(e)
}
})
while (currentEmittedItemCount != maxEmittedItemCount) {
// Publish value on the subject
if (!disposable.isDisposed) publishSubject.onNext(someStringValue)
// Test flag for trigger
if (currentEmittedItemCount == maxEmittedItemCount) {
publishSubject.onComplete() // optional if you need to invoke `onComplete()` block on the subject
disposable.dispose()
}
// Print indication that the loop is still running
System.out.println("Still looping")
}
阅读更多
由于订阅了观察者,我们必须调用取消订阅以避免多次 onNext 调用。
我建议在 subject.onNext() 的工作完成后调用 onComplete:
这里有一个例子
PublishSubject<Integer> source = PublishSubject.create();
source.onNext(1);
source.onComplete();
source.subscribe(getObserver());
然后在观察者中,我们重新创建另一个PublishSubject实例
source.subscribe(new Observer<Boolean>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
source = PublishSubject.create();
}
});
希望这种方法对您的请求有所帮助
我看看我在其中调用以下内容:
// class member
var myPublishSubject = PublishSubject.create<SomeObservable>()
// later on in the class somewhere:
while(true){
myPublishSubject.onNext(someObservable)
}
我想停止发射,但让 while 循环永远持续下去。所以我希望 onNext 调用什么也不做。但我担心如果我调用 myPublishSubject.onComplete() 最终主题将为空并且我将获得 NPE。即使重复调用 onNext() ,是否还有只是让它静音。退订是最好的方法吗?
几个笔记
这种情况非常罕见,但如果您能向我们展示您对 Observable
的真实意图,我们可能会帮助您构建它,即使不是最好,也可能更好。
你能做什么
对于我的示例,我只使用了一个非常简单的标志变量,这可以根据您的项目的任何触发器进行更改。
选项 1
您可以直接对主题发布者
调用onComplete
val maxEmittedItemCount = 10
var currentEmittedItemCount = 0
val someStringValue = "Some observable" // Create whatever observable you have
val publishSubject = PublishSubject.create<String>()
publishSubject.subscribe({
currentEmittedItemCount++
println(it)
}, {
println(it)
})
while (currentEmittedItemCount != maxEmittedItemCount) {
// Print indication that the loop is still running
println("Still looping")
// Publish value on the subject
publishSubject.onNext(someStringValue)
// Test flag for trigger
if (currentEmittedItemCount == maxEmittedItemCount) publishSubject.onComplete()
}
选项 2
您还可以保留对订阅的引用,然后再处理它,这比前一个更语义化,因为它将在处理资源时不调用 onNext(t)
来执行代码块。
lateinit var disposable: Disposable // Will hold reference to the subscription
var maxEmittedItemCount = 10
var currentEmittedItemCount = 0
var someStringValue = "Some observable" // Create whatever observable you have
var publishSubject = PublishSubject.create<String>()
disposable = publishSubject.subscribeWith(object : DisposableObserver<String>() {
override fun onComplete() {
// Print indication of completion for the subject publisher
System.out.println("Complete")
}
override fun onNext(t: String) {
// Test flag count synchonizer
currentEmittedItemCount++
// Print out current emitted item count
System.out.println(currentEmittedItemCount)
// Print current string
System.out.println(t)
}
override fun onError(e: Throwable) {
// Print error
System.out.println(e)
}
})
while (currentEmittedItemCount != maxEmittedItemCount) {
// Publish value on the subject
if (!disposable.isDisposed) publishSubject.onNext(someStringValue)
// Test flag for trigger
if (currentEmittedItemCount == maxEmittedItemCount) {
publishSubject.onComplete() // optional if you need to invoke `onComplete()` block on the subject
disposable.dispose()
}
// Print indication that the loop is still running
System.out.println("Still looping")
}
阅读更多
由于订阅了观察者,我们必须调用取消订阅以避免多次 onNext 调用。
我建议在 subject.onNext() 的工作完成后调用 onComplete:
这里有一个例子
PublishSubject<Integer> source = PublishSubject.create();
source.onNext(1);
source.onComplete();
source.subscribe(getObserver());
然后在观察者中,我们重新创建另一个PublishSubject实例
source.subscribe(new Observer<Boolean>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
source = PublishSubject.create();
}
});
希望这种方法对您的请求有所帮助