RxJS Observable - return 如果第一个事件在前 x 分钟内未发出,否则继续 x+y 分钟
RxJS Observable - return if first event not emitted in first x mins, otherwise continue for x+y mins
RxJS/Angular高手,需要你input/pointers
我正在尝试在 Observable 上添加一个逻辑(从 http 调用创建)
如果第一个事件在前 3 分钟内未从 Observable 发出,我想 return(结束对 Observable 的等待并停止订阅)
但是如果第一个事件在前 3 分钟内发出,我想继续订阅并获取总共 10 分钟的数据(从订阅开始的时间算起)
截至目前,使用 takeUntil(timer(1000 * 60 * 10) 等待总共 10 分钟,但想将此等待分成 3 分钟,如果有响应 -> 则仅延长 7 分钟
伪代码
Observable
.pipe(
takeUntil(timer(10* 60 * 1000)),
map (res => {}),
filter()
)
.subscribe(
)
注意:以上调用在 EventSource SSE 调用上回绕
我想使用开箱即用的 RxJS 超时,但不符合我的要求
结合使用超时和 takeUntil 也可以解决您的问题。
看看下面这个简单的例子:
//emit value every 4s
const source = interval(4000);
//after 11 seconds, emit value
const myTimer = timer(11000);
//modify timeout time to see what happens if no value comes from the interval
// i.e. set timeout to 3s
const example = source.pipe(
timeout(5000),
catchError(error => of("Error while request")),
takeUntil(myTimer));
//output: 0,1
const subscribe = example.subscribe(val => console.log(val));
如果 source
没有数据,timeout
将触发。 takeUntil
是限时阅读source
。如果 timer
触发,则 observable 完成。
使用race
operator for the first x mins condition, takeUntil
传递到第二个条件的http请求中:
fistTimerBenchMark$ = timer(3000).pipe(switchMap(x => throwError('ended before 3s')));
//Mock http request
mockHttp$ = timer(2000,2000).pipe(map(x => 'http respones'),takeUntil(timer(10000)));
ngOnInit(){
race(this.fistTimerBenchMark, this.mockHttp$)
.subscribe( x => console.log(x), err => console.log(err))
}
此外,我使用 throwError
来完成第一个 x 分钟条件的可观察对象,因为在可观察对象的管道或订阅者中取消订阅不是一个好的约定。
RxJS/Angular高手,需要你input/pointers
我正在尝试在 Observable 上添加一个逻辑(从 http 调用创建)
如果第一个事件在前 3 分钟内未从 Observable 发出,我想 return(结束对 Observable 的等待并停止订阅)
但是如果第一个事件在前 3 分钟内发出,我想继续订阅并获取总共 10 分钟的数据(从订阅开始的时间算起)
截至目前,使用 takeUntil(timer(1000 * 60 * 10) 等待总共 10 分钟,但想将此等待分成 3 分钟,如果有响应 -> 则仅延长 7 分钟
伪代码
Observable
.pipe(
takeUntil(timer(10* 60 * 1000)),
map (res => {}),
filter()
)
.subscribe(
)
注意:以上调用在 EventSource SSE 调用上回绕
我想使用开箱即用的 RxJS 超时,但不符合我的要求
结合使用超时和 takeUntil 也可以解决您的问题。 看看下面这个简单的例子:
//emit value every 4s
const source = interval(4000);
//after 11 seconds, emit value
const myTimer = timer(11000);
//modify timeout time to see what happens if no value comes from the interval
// i.e. set timeout to 3s
const example = source.pipe(
timeout(5000),
catchError(error => of("Error while request")),
takeUntil(myTimer));
//output: 0,1
const subscribe = example.subscribe(val => console.log(val));
如果 source
没有数据,timeout
将触发。 takeUntil
是限时阅读source
。如果 timer
触发,则 observable 完成。
使用race
operator for the first x mins condition, takeUntil
传递到第二个条件的http请求中:
fistTimerBenchMark$ = timer(3000).pipe(switchMap(x => throwError('ended before 3s')));
//Mock http request
mockHttp$ = timer(2000,2000).pipe(map(x => 'http respones'),takeUntil(timer(10000)));
ngOnInit(){
race(this.fistTimerBenchMark, this.mockHttp$)
.subscribe( x => console.log(x), err => console.log(err))
}
此外,我使用 throwError
来完成第一个 x 分钟条件的可观察对象,因为在可观察对象的管道或订阅者中取消订阅不是一个好的约定。