在 RxJava 中调用 dispose() 后在 onNext() 中接收项目
Receiving items in onNext() after calling dispose() in RxJava
这是我读到的 dispose()
here:
In a nutshell, when the Disposable (which is implemented by the
TestObserver) gets disposed, the Observer (also TestObserver) will no
longer receive values from the Observable.
这是我的代码:
private fun createObservableWithDisposable() {
Observable
.create { e: ObservableEmitter<String> ->
val worker = Schedulers.io().createWorker()
e.setDisposable(worker)
worker.schedule {
for (i in 1..5) {
if (i == 3) {
worker.dispose()
// https://medium.com/@vanniktech/rxjava-2-disposable-under-the-hood-f842d2373e64
// After calling dispose(), the subscriber no longer receives items passed in OnNext().
// But it doesn't work in my code
}
e.onNext("Event $i on thread ${Thread.currentThread().name}")
}
}
}
.subscribe(
{ s ->
Log.d(TAG, "createObservableWithDisposable onNext msg=$s")
},
{ e ->
Log.d(TAG, "createObservableWithDisposable", e)
},
{
Log.d(TAG, "createObservableWithDisposable onComplete")
}
)
}
这就是我在 Logcat 中看到的:
2019-02-25 08:10:53.414 12071-12101/ru.sample D/RxJavaSamples: createObservableWithDisposable onNext msg=Event 1 on thread RxCachedThreadScheduler-1
2019-02-25 08:10:53.414 12071-12101/ru.sample D/RxJavaSamples: createObservableWithDisposable onNext msg=Event 2 on thread RxCachedThreadScheduler-1
2019-02-25 08:10:53.414 12071-12101/ru.sample D/RxJavaSamples: createObservableWithDisposable onNext msg=Event 3 on thread RxCachedThreadScheduler-1
2019-02-25 08:10:53.414 12071-12101/ru.sample D/RxJavaSamples: createObservableWithDisposable onNext msg=Event 4 on thread RxCachedThreadScheduler-1
2019-02-25 08:10:53.414 12071-12101/ru.sample D/RxJavaSamples: createObservableWithDisposable onNext msg=Event 5 on thread RxCachedThreadScheduler-1
我预计只会看到前两个排放。也就是说,我认为在调用 dispose()
之后 onNext()
不会被调用。
您处置了提供物品的工作人员,而不是订户。
要停止接收物品,请尝试
val compositeDisposable = CompositeDisposable()
compositeDisposable.add(
Observable
.create { e: ObservableEmitter<String> ->
val worker = Schedulers.io().createWorker()
e.setDisposable(worker)
worker.schedule {
for (i in 1..5) {
if (i == 3) {
compositeDisposable.dispose() //changed here
}
e.onNext("Event $i on thread ${Thread.currentThread().name}")
}
}
}
.subscribe(
{ s ->
Log.d(TAG, "createObservableWithDisposable onNext msg=$s")
},
{ e ->
Log.d(TAG, "createObservableWithDisposable", e)
},
{
Log.d(TAG, "createObservableWithDisposable onComplete")
}
)
)
这是我读到的 dispose()
here:
In a nutshell, when the Disposable (which is implemented by the TestObserver) gets disposed, the Observer (also TestObserver) will no longer receive values from the Observable.
这是我的代码:
private fun createObservableWithDisposable() {
Observable
.create { e: ObservableEmitter<String> ->
val worker = Schedulers.io().createWorker()
e.setDisposable(worker)
worker.schedule {
for (i in 1..5) {
if (i == 3) {
worker.dispose()
// https://medium.com/@vanniktech/rxjava-2-disposable-under-the-hood-f842d2373e64
// After calling dispose(), the subscriber no longer receives items passed in OnNext().
// But it doesn't work in my code
}
e.onNext("Event $i on thread ${Thread.currentThread().name}")
}
}
}
.subscribe(
{ s ->
Log.d(TAG, "createObservableWithDisposable onNext msg=$s")
},
{ e ->
Log.d(TAG, "createObservableWithDisposable", e)
},
{
Log.d(TAG, "createObservableWithDisposable onComplete")
}
)
}
这就是我在 Logcat 中看到的:
2019-02-25 08:10:53.414 12071-12101/ru.sample D/RxJavaSamples: createObservableWithDisposable onNext msg=Event 1 on thread RxCachedThreadScheduler-1
2019-02-25 08:10:53.414 12071-12101/ru.sample D/RxJavaSamples: createObservableWithDisposable onNext msg=Event 2 on thread RxCachedThreadScheduler-1
2019-02-25 08:10:53.414 12071-12101/ru.sample D/RxJavaSamples: createObservableWithDisposable onNext msg=Event 3 on thread RxCachedThreadScheduler-1
2019-02-25 08:10:53.414 12071-12101/ru.sample D/RxJavaSamples: createObservableWithDisposable onNext msg=Event 4 on thread RxCachedThreadScheduler-1
2019-02-25 08:10:53.414 12071-12101/ru.sample D/RxJavaSamples: createObservableWithDisposable onNext msg=Event 5 on thread RxCachedThreadScheduler-1
我预计只会看到前两个排放。也就是说,我认为在调用 dispose()
之后 onNext()
不会被调用。
您处置了提供物品的工作人员,而不是订户。
要停止接收物品,请尝试
val compositeDisposable = CompositeDisposable()
compositeDisposable.add(
Observable
.create { e: ObservableEmitter<String> ->
val worker = Schedulers.io().createWorker()
e.setDisposable(worker)
worker.schedule {
for (i in 1..5) {
if (i == 3) {
compositeDisposable.dispose() //changed here
}
e.onNext("Event $i on thread ${Thread.currentThread().name}")
}
}
}
.subscribe(
{ s ->
Log.d(TAG, "createObservableWithDisposable onNext msg=$s")
},
{ e ->
Log.d(TAG, "createObservableWithDisposable", e)
},
{
Log.d(TAG, "createObservableWithDisposable onComplete")
}
)
)