如何 doOnNext 只消费 PublishSubject 的前 n 项?
How to doOnNext only consumes the first n items of a PublishSubject?
我有一个PublishSubject
:
val myPublishSubject = remember {
PublishSubject.create<Long>().apply {
doOnNext {
Logger.debug(TAG) { "Got new Long $it" }
}
.takeUntil(someObservable)
.subscribe()
}
}
我有一个服务可以获取长值流,在另一个函数的某处:
while(notFinished){
val newLong = getSomeLong()
myPublishSubject.onNext(newLong)
}
此处,上层 doOnNext{}
正在永久记录。例如,我怎样才能只允许前 20 个 Long 值? .take(20)
没有成功!
take(20)
应该可以,此测试通过:
@Test
fun consumeOnlyFirst20ValueTest() {
val ps = PublishSubject.create<Long>()
CoroutineScope(Dispatchers.Default).launch {
for (i in 1L..1000L) {
ps.onNext(i)
delay(50)
}
}
var i = 0
ps.take(20)
.doOnNext {
i++
}.subscribeOn(Schedulers.io())
.subscribe()
Thread.sleep(6000)
assertEquals(20, i)
}
我有一个PublishSubject
:
val myPublishSubject = remember {
PublishSubject.create<Long>().apply {
doOnNext {
Logger.debug(TAG) { "Got new Long $it" }
}
.takeUntil(someObservable)
.subscribe()
}
}
我有一个服务可以获取长值流,在另一个函数的某处:
while(notFinished){
val newLong = getSomeLong()
myPublishSubject.onNext(newLong)
}
此处,上层 doOnNext{}
正在永久记录。例如,我怎样才能只允许前 20 个 Long 值? .take(20)
没有成功!
take(20)
应该可以,此测试通过:
@Test
fun consumeOnlyFirst20ValueTest() {
val ps = PublishSubject.create<Long>()
CoroutineScope(Dispatchers.Default).launch {
for (i in 1L..1000L) {
ps.onNext(i)
delay(50)
}
}
var i = 0
ps.take(20)
.doOnNext {
i++
}.subscribeOn(Schedulers.io())
.subscribe()
Thread.sleep(6000)
assertEquals(20, i)
}