如何构建具有自定义数字和不同延迟的 Observable?
How to construct a Observable with custom numbers and different delays?
为了用 rxscala 测试我的反应式程序,我需要构建这样一个 Observable
:
val numberStream: Observable[Int] = Observable.???()
哪个
- 发表人数
1
- 然后等待
1s
- 发表人数
4
- 然后等待
3s
- 发表人数
2
- 然后等待
2s
我有一个丑陋的解决方案,Thread
和 ReplaySubject
:
val subject: Subject[Int] = ReplaySubject()
val numberStream: Observable = subject
new Thread(new Runnable {
def run = {
subject.onNext(1)
Thread.sleep(1000)
subject.onNext(4)
Thread.sleep(3000)
subject.onNext(2)
Thread.sleep(2000)
}
}).start()
有没有更好的解决方案?
您可以将多个具有延迟的 Observable 连接在一起,最终的 Observable 和 empty
具有订阅延迟。
val numberStream = (
Observable.just(1) ++
Observable.just(4).delay(1.second) ++
Observable.just(2).delay(3.second) ++
Observable.empty.delaySubscription(2.second))
附带说明一下,如果您正在测试,您应该使用 TestScheduler
,您可以将其作为第二个参数传递给 delay
。
为了用 rxscala 测试我的反应式程序,我需要构建这样一个 Observable
:
val numberStream: Observable[Int] = Observable.???()
哪个
- 发表人数
1
- 然后等待
1s
- 发表人数
4
- 然后等待
3s
- 发表人数
2
- 然后等待
2s
我有一个丑陋的解决方案,Thread
和 ReplaySubject
:
val subject: Subject[Int] = ReplaySubject()
val numberStream: Observable = subject
new Thread(new Runnable {
def run = {
subject.onNext(1)
Thread.sleep(1000)
subject.onNext(4)
Thread.sleep(3000)
subject.onNext(2)
Thread.sleep(2000)
}
}).start()
有没有更好的解决方案?
您可以将多个具有延迟的 Observable 连接在一起,最终的 Observable 和 empty
具有订阅延迟。
val numberStream = (
Observable.just(1) ++
Observable.just(4).delay(1.second) ++
Observable.just(2).delay(3.second) ++
Observable.empty.delaySubscription(2.second))
附带说明一下,如果您正在测试,您应该使用 TestScheduler
,您可以将其作为第二个参数传递给 delay
。