Rx Java: Sleep before OnNext (Sleep before emitting from Observable)
Rx Java: Sleep before OnNext (Sleep before emitting from Observable)
根据我的 Observable 中的条件,我想延迟 onNext / onError。我的代码如下:
fun check3(){
val list = arrayListOf(1,2,3,4,5,6,7, null)
val obs = Observable.create<Int> { subscriber ->
list.filter {
it != null
}.map {
if (it!! %2 == 0 ) {
Thread.sleep(3000)
subscriber.onError(IllegalArgumentException("Mod is true"))
} else {
subscriber.onNext(it)
subscriber.onComplete()
}
}
}
}
这里很痛 Thread.sleep(3000)
有更好的方法吗?基本上,如果满足 if(it %2) 条件,我想延迟向订阅者发送 onError 通知
您可以使用concatMap
将休眠变成非阻塞延迟:
Observable.fromIterable(list.filter { it != null })
.concatMap {
if (it!! % 2 == 0) {
return@concatMap Observable.error(IllegalArgumentException("Mod is true"))
.delay(3, TimeUnit.SECONDS, true)
}
Observable.just(it)
}
.take(1)
根据我的 Observable 中的条件,我想延迟 onNext / onError。我的代码如下:
fun check3(){
val list = arrayListOf(1,2,3,4,5,6,7, null)
val obs = Observable.create<Int> { subscriber ->
list.filter {
it != null
}.map {
if (it!! %2 == 0 ) {
Thread.sleep(3000)
subscriber.onError(IllegalArgumentException("Mod is true"))
} else {
subscriber.onNext(it)
subscriber.onComplete()
}
}
}
}
这里很痛 Thread.sleep(3000)
有更好的方法吗?基本上,如果满足 if(it %2) 条件,我想延迟向订阅者发送 onError 通知
您可以使用concatMap
将休眠变成非阻塞延迟:
Observable.fromIterable(list.filter { it != null })
.concatMap {
if (it!! % 2 == 0) {
return@concatMap Observable.error(IllegalArgumentException("Mod is true"))
.delay(3, TimeUnit.SECONDS, true)
}
Observable.just(it)
}
.take(1)