reactivex java 创建一个自定义观察者,在未来的某个随机时间发出事件
reactivex java creating a custom observer that emits events at some random time in the future
我想弄清楚如何在未来某个时间创建异步观察者和 trigger/emit
事件,而无需重建可观察者和订阅者列表。
寻找类似的东西:
MyAsyncObservable o = new MyAsyncObservable();
o.subscribe(s);
o.subscribe(s2);
while(scanner.hasNext()){
o.emit(scanner.nextInt()); // emit T to subscribers.
}
其中 MyAsyncObservable
可能只是 Observable.fromAsync(emitter,buffermode)
而不是
while(scanner.hasNext(){
Observable<Integer> o = Observable.just(scanner.nextInt());
o.subscribe(s);
o.subscribe(s2);
}
如果您的 observable 很冷,只需使用 .delay();参见 f.e。这个:
Observable.just("Hello!").delay(10, TimeUnit.SECONDS).subscribe(System.out::println);
如果它是一个热 observable,只需添加一个缓冲区(假设您不希望在 10 秒内发射超过 10 次:
Observable
.just("Hello!")
.onBackpressureBuffer(10)
.delay(10, TimeUnit.SECONDS)
.subscribe(System.out::println);
编辑:哦,我明白了 - 你想要 Subject
- PublishSubject
或 BehaviorSubject
。创建它们并通过通常的 onNext/onComplete/onError.
向它们提供数据
我想弄清楚如何在未来某个时间创建异步观察者和 trigger/emit
事件,而无需重建可观察者和订阅者列表。
寻找类似的东西:
MyAsyncObservable o = new MyAsyncObservable();
o.subscribe(s);
o.subscribe(s2);
while(scanner.hasNext()){
o.emit(scanner.nextInt()); // emit T to subscribers.
}
其中 MyAsyncObservable
可能只是 Observable.fromAsync(emitter,buffermode)
而不是
while(scanner.hasNext(){
Observable<Integer> o = Observable.just(scanner.nextInt());
o.subscribe(s);
o.subscribe(s2);
}
如果您的 observable 很冷,只需使用 .delay();参见 f.e。这个:
Observable.just("Hello!").delay(10, TimeUnit.SECONDS).subscribe(System.out::println);
如果它是一个热 observable,只需添加一个缓冲区(假设您不希望在 10 秒内发射超过 10 次:
Observable
.just("Hello!")
.onBackpressureBuffer(10)
.delay(10, TimeUnit.SECONDS)
.subscribe(System.out::println);
编辑:哦,我明白了 - 你想要 Subject
- PublishSubject
或 BehaviorSubject
。创建它们并通过通常的 onNext/onComplete/onError.