RxJS Observable - return 如果第一个事件在前 x 分钟内未发出,否则继续 x+y 分钟

RxJS Observable - return if first event not emitted in first x mins, otherwise continue for x+y mins

RxJS/Angular高手,需要你input/pointers

我正在尝试在 Observable 上添加一个逻辑(从 http 调用创建)

如果第一个事件在前 3 分钟内未从 Observable 发出,我想 return(结束对 Observable 的等待并停止订阅)

但是如果第一个事件在前 3 分钟内发出,我想继续订阅并获取总共 10 分钟的数据(从订阅开始的时间算起)

截至目前,使用 takeUntil(timer(1000 * 60 * 10) 等待总共 10 分钟,但想将此等待分成 3 分钟,如果有响应 -> 则仅延长 7 分钟

伪代码

Observable
.pipe(
    takeUntil(timer(10* 60 * 1000)),
    map (res => {}),
    filter()
)
.subscribe(

)

注意:以上调用在 EventSource SSE 调用上回绕

我想使用开箱即用的 RxJS 超时,但不符合我的要求

结合使用超时和 takeUntil 也可以解决您的问题。 看看下面这个简单的例子:

    //emit value every 4s
    const source = interval(4000);
    //after 11 seconds, emit value
    const myTimer = timer(11000);
    //modify timeout time to see what happens if no value comes from the interval
    // i.e. set timeout to 3s
    const example = source.pipe(
      timeout(5000),
      catchError(error => of("Error while request")),
      takeUntil(myTimer));

    //output: 0,1
    const subscribe = example.subscribe(val => console.log(val));

如果 source 没有数据,timeout 将触发。 takeUntil是限时阅读source。如果 timer 触发,则 observable 完成。

A working stackblitz example

使用race operator for the first x mins condition, takeUntil 传递到第二个条件的http请求中:

 fistTimerBenchMark$ = timer(3000).pipe(switchMap(x => throwError('ended before 3s')));

 //Mock http request
 mockHttp$ = timer(2000,2000).pipe(map(x => 'http respones'),takeUntil(timer(10000)));

 ngOnInit(){
      race(this.fistTimerBenchMark, this.mockHttp$)
      .subscribe( x => console.log(x),  err => console.log(err))
 }

此外,我使用 throwError 来完成第一个 x 分钟条件的可观察对象,因为在可观察对象的管道或订阅者中取消订阅不是一个好的约定。