如何构建具有自定义数字和不同延迟的 Observable?

How to construct a Observable with custom numbers and different delays?

为了用 rxscala 测试我的反应式程序,我需要构建这样一个 Observable:

val numberStream: Observable[Int] = Observable.???()

哪个

  1. 发表人数1
  2. 然后等待 1s
  3. 发表人数4
  4. 然后等待 3s
  5. 发表人数2
  6. 然后等待 2s

我有一个丑陋的解决方案,ThreadReplaySubject:

val subject: Subject[Int] = ReplaySubject()
val numberStream: Observable = subject

new Thread(new Runnable {
    def run = {
        subject.onNext(1)
        Thread.sleep(1000)
        subject.onNext(4)
        Thread.sleep(3000)
        subject.onNext(2)
        Thread.sleep(2000)
    }
}).start()

有没有更好的解决方案?

您可以将多个具有延迟的 Observable 连接在一起,最终的 Observable 和 empty 具有订阅延迟。

val numberStream = (
     Observable.just(1) ++ 
     Observable.just(4).delay(1.second) ++
     Observable.just(2).delay(3.second) ++
     Observable.empty.delaySubscription(2.second))

附带说明一下,如果您正在测试,您应该使用 TestScheduler,您可以将其作为第二个参数传递给 delay