如何在 takeUntil rxjs 函数之后采取行动
how to take an action after takeUntil rxjs function
因此,如果函数 运行 运行的时间足够长以触发计时器上 运行 的 takenUntil 函数,我将尝试使布尔值为真。
这是代码
start = this.http.get(environment.shochat_content_creator_set_valid_stream_start).pipe(
tap(() => console.log('Stream start'))
);
poll = this.http.get(environment.check_if_stream_is_active_on_mux).pipe(
tap(() => {
this.streamready = true;
return 0;
}
),
catchError(error => {
console.log(error);
return EMPTY;
})
);
startastream(){
const endtimer = timer(60000);
this.streampollsubscription = this.start.pipe(
switchMap(() => timer(0, 5000).pipe(
tap(() => console.log('Polling every 5s')),
mergeMap(() => this.poll)
)),
takeUntil(endtimer)
).subscribe();
}
本质上,如果 takeUntil 确实被触发,我希望将一个布尔值设置为 true。
timeout = true;
我一直在看这个 Whosebug post
但事情并没有我想的那么清楚。
takeUntil
完成可观察对象,因此要在完成后执行操作,您可以在几个地方执行此操作:
- 在
completion
处理程序中:
this.start.pipe(
switchMap(() => timer(0, 5000).pipe(
tap(() => console.log('Polling every 5s')),
mergeMap(() => this.poll))
),
takeUntil(endtimer)
)
.subscribe(
next => console.log('handling next value'),
error => console.log('handling error'),
() => this.timeout = true // <--- this fires after observable completes
);
- 使用
finalize
运算符:
this.start.pipe(
switchMap(() => timer(0, 5000).pipe(
tap(() => console.log('Polling every 5s')),
mergeMap(() => this.poll))
),
takeUntil(endtimer),
finalize(() => this.timeout = true)
)
.subscribe();
注意:这些解决方案并不完全您所要求的。当 takeUntil
触发时它们确实会触发,但它们也会因流完成的任何其他原因而触发。我认为这种区别对你的情况并不重要,但想在整个问题的背景下提及。
正如 Sang Dang 在评论中提到的那样,您也可以从“计时器关闭时(而不是我到目前为止提到的,” 当 observable 完成时"),您可以通过简单地向计时器添加 tap
来完成。
const endtimer = timer(60000).pipe(
tap(() => this.timeout = true)
);
您可以使用 merge 运算符并重复使用 takeUntil
条件以在条件 (endtimer
) 触发时创建映射的 true
值:
const { Subject, merge } = rxjs;
const { takeUntil, mapTo } = rxjs.operators;
const source$ = new Subject();
const condition$ = new Subject();
// Use the merge to work around your completed observable
const result$ = merge(
// When your condition fires map this event to true
condition$.pipe(mapTo(true)),
// Your already existing pipe in wich the takeUntil lives
source$.pipe(
takeUntil(condition$)
)
)
result$.subscribe(console.log)
source$.next(1);
source$.next(2);
condition$.next();
source$.next(3);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.3/rxjs.umd.min.js"></script>
仅供参考:我不确定此解决方案是否适用于您的应用程序上下文,因为您没有显示 timeout
的声明和设置位置。
因此,如果函数 运行 运行的时间足够长以触发计时器上 运行 的 takenUntil 函数,我将尝试使布尔值为真。
这是代码
start = this.http.get(environment.shochat_content_creator_set_valid_stream_start).pipe(
tap(() => console.log('Stream start'))
);
poll = this.http.get(environment.check_if_stream_is_active_on_mux).pipe(
tap(() => {
this.streamready = true;
return 0;
}
),
catchError(error => {
console.log(error);
return EMPTY;
})
);
startastream(){
const endtimer = timer(60000);
this.streampollsubscription = this.start.pipe(
switchMap(() => timer(0, 5000).pipe(
tap(() => console.log('Polling every 5s')),
mergeMap(() => this.poll)
)),
takeUntil(endtimer)
).subscribe();
}
本质上,如果 takeUntil 确实被触发,我希望将一个布尔值设置为 true。
timeout = true;
我一直在看这个 Whosebug post
但事情并没有我想的那么清楚。
takeUntil
完成可观察对象,因此要在完成后执行操作,您可以在几个地方执行此操作:
- 在
completion
处理程序中:
this.start.pipe(
switchMap(() => timer(0, 5000).pipe(
tap(() => console.log('Polling every 5s')),
mergeMap(() => this.poll))
),
takeUntil(endtimer)
)
.subscribe(
next => console.log('handling next value'),
error => console.log('handling error'),
() => this.timeout = true // <--- this fires after observable completes
);
- 使用
finalize
运算符:
this.start.pipe(
switchMap(() => timer(0, 5000).pipe(
tap(() => console.log('Polling every 5s')),
mergeMap(() => this.poll))
),
takeUntil(endtimer),
finalize(() => this.timeout = true)
)
.subscribe();
注意:这些解决方案并不完全您所要求的。当 takeUntil
触发时它们确实会触发,但它们也会因流完成的任何其他原因而触发。我认为这种区别对你的情况并不重要,但想在整个问题的背景下提及。
正如 Sang Dang 在评论中提到的那样,您也可以从“计时器关闭时(而不是我到目前为止提到的,” 当 observable 完成时"),您可以通过简单地向计时器添加 tap
来完成。
const endtimer = timer(60000).pipe(
tap(() => this.timeout = true)
);
您可以使用 merge 运算符并重复使用 takeUntil
条件以在条件 (endtimer
) 触发时创建映射的 true
值:
const { Subject, merge } = rxjs;
const { takeUntil, mapTo } = rxjs.operators;
const source$ = new Subject();
const condition$ = new Subject();
// Use the merge to work around your completed observable
const result$ = merge(
// When your condition fires map this event to true
condition$.pipe(mapTo(true)),
// Your already existing pipe in wich the takeUntil lives
source$.pipe(
takeUntil(condition$)
)
)
result$.subscribe(console.log)
source$.next(1);
source$.next(2);
condition$.next();
source$.next(3);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.3/rxjs.umd.min.js"></script>
仅供参考:我不确定此解决方案是否适用于您的应用程序上下文,因为您没有显示 timeout
的声明和设置位置。