如何 doOnNext 只消费 PublishSubject 的前 n 项?

How to doOnNext only consumes the first n items of a PublishSubject?

我有一个PublishSubject:

val myPublishSubject = remember {
    PublishSubject.create<Long>().apply {
        doOnNext {
            Logger.debug(TAG) { "Got new Long $it" }
        }
            .takeUntil(someObservable)
            .subscribe()
    }
}

我有一个服务可以获取长值流,在另一个函数的某处:

while(notFinished){
    val newLong = getSomeLong()
    myPublishSubject.onNext(newLong)
}

此处,上层 doOnNext{} 正在永久记录。例如,我怎样才能只允许前 20 个 Long 值? .take(20) 没有成功!

take(20) 应该可以,此测试通过:

@Test
fun consumeOnlyFirst20ValueTest() {
    val ps = PublishSubject.create<Long>()

    CoroutineScope(Dispatchers.Default).launch {
        for (i in 1L..1000L) {
            ps.onNext(i)
            delay(50)
        }
    }

    var i = 0
    ps.take(20)
            .doOnNext {
                i++
            }.subscribeOn(Schedulers.io())
            .subscribe()

    Thread.sleep(6000)
    assertEquals(20, i)
}