尝试轮询服务器但是 rxjs 方法可能不正确,因为算法中断
attempting to poll a server however rxjs methods may be incorrect as algorithm breaks
所以我正在尝试轮询我的服务器。我试图每 5 秒轮询一次我的服务器。一分钟后投票超时。我在调试方法中有一些控制台日志,但即使在等待 5 秒后仍然触发的唯一 console.log 是 'validate stream start'
我按照 this 教程进行了操作,但没有创建单独的服务,因为我的应用程序中的这一页只需要它。
我敢打赌这是对这些 rxjs 运算符工作方式的简单错误或误解。
我做错了什么?
startastream(){
this.startastreamboolan = true;
let count = 12;
this.http.get(environment.shochat_content_creator_set_valid_stream_start)
.subscribe((req: any)=>{
console.log('validate stream start');
timer(5000).pipe(
switchMap(() =>{
console.log('timer start');
if(count > 0){
return this.http.get(environment.check_if_stream_is_active_on_mux)
.subscribe(
(req: any)=>{
this.streamready = true;
return 0;
},
error => {
count = count -1;
console.log(count);
});
}
}));
});
}
定时器
创建一个 Observable,它在 dueTime
后开始发射,并在之后的每个 period
时间后发射不断增加的数字。
timer(dueTime: number | Date = 0, periodOrScheduler?: number | SchedulerLike, scheduler?: SchedulerLike): Observable<number>
Its like interval
, but you can specify when should the emissions start.
在您的代码中,您忘记了 timer
运算符的第二个参数,这意味着它只会等待 5s
然后只发出一次。此外,您没有订阅 timer
,这就是它不再继续的原因。如果你想每隔 5s
轮询你的服务器,你需要使用 :
timer(0, 5000)
在这里,计时器不会等待,而是会每隔 5s
直接开始发射值。
接下来,我看到您正在使用 switchMap
创建内部流,这很好。但是,如果您的服务器处理请求的时间超过 5s
,您可能需要使用 mergeMap
,因为它不会取消之前正在进行的内部流,这取决于您。
这里的错误是switchMap
(或mergeMap
)接受了一个回调函数,必须 return一个Observable
,然后运营商会自己订阅它。在这里,您 return 正在使用 Subscription
。
以下是您可以执行的操作:
const start$ = this.http.get(startUrl).pipe(
tap(() => console.log('Stream start'))
);
const poll$ = this.http.get(pollUrl).pipe(
tap(() => (this.streamReady = true)),
catchError(error => {
console.log(error);
return EMPTY;
})
);
start$.pipe(
switchMap(() => timer(0, 5000).pipe(
tap(() => console.log('Polling every 5s')),
mergeMap(() => poll$)
))
).subscribe();
我在这里创建了两个 Observables
。 start$
负责启动流,poll$
负责轮询您的服务器。这里的想法是启动流,然后切换到 timer
内部流,它将每隔 5s
发出一次,然后再次切换到另一个将轮询服务器的内部流。
我没有在我的示例中包含 count
,因此使用此代码流将永远 运行。为此,您应该查看 takeUntil
运算符。
希望对您有所帮助,欢迎提问!
所以我正在尝试轮询我的服务器。我试图每 5 秒轮询一次我的服务器。一分钟后投票超时。我在调试方法中有一些控制台日志,但即使在等待 5 秒后仍然触发的唯一 console.log 是 'validate stream start'
我按照 this 教程进行了操作,但没有创建单独的服务,因为我的应用程序中的这一页只需要它。
我敢打赌这是对这些 rxjs 运算符工作方式的简单错误或误解。
我做错了什么?
startastream(){
this.startastreamboolan = true;
let count = 12;
this.http.get(environment.shochat_content_creator_set_valid_stream_start)
.subscribe((req: any)=>{
console.log('validate stream start');
timer(5000).pipe(
switchMap(() =>{
console.log('timer start');
if(count > 0){
return this.http.get(environment.check_if_stream_is_active_on_mux)
.subscribe(
(req: any)=>{
this.streamready = true;
return 0;
},
error => {
count = count -1;
console.log(count);
});
}
}));
});
}
定时器
创建一个 Observable,它在 dueTime
后开始发射,并在之后的每个 period
时间后发射不断增加的数字。
timer(dueTime: number | Date = 0, periodOrScheduler?: number | SchedulerLike, scheduler?: SchedulerLike): Observable<number>
Its like
interval
, but you can specify when should the emissions start.
在您的代码中,您忘记了 timer
运算符的第二个参数,这意味着它只会等待 5s
然后只发出一次。此外,您没有订阅 timer
,这就是它不再继续的原因。如果你想每隔 5s
轮询你的服务器,你需要使用 :
timer(0, 5000)
在这里,计时器不会等待,而是会每隔 5s
直接开始发射值。
接下来,我看到您正在使用 switchMap
创建内部流,这很好。但是,如果您的服务器处理请求的时间超过 5s
,您可能需要使用 mergeMap
,因为它不会取消之前正在进行的内部流,这取决于您。
这里的错误是switchMap
(或mergeMap
)接受了一个回调函数,必须 return一个Observable
,然后运营商会自己订阅它。在这里,您 return 正在使用 Subscription
。
以下是您可以执行的操作:
const start$ = this.http.get(startUrl).pipe(
tap(() => console.log('Stream start'))
);
const poll$ = this.http.get(pollUrl).pipe(
tap(() => (this.streamReady = true)),
catchError(error => {
console.log(error);
return EMPTY;
})
);
start$.pipe(
switchMap(() => timer(0, 5000).pipe(
tap(() => console.log('Polling every 5s')),
mergeMap(() => poll$)
))
).subscribe();
我在这里创建了两个 Observables
。 start$
负责启动流,poll$
负责轮询您的服务器。这里的想法是启动流,然后切换到 timer
内部流,它将每隔 5s
发出一次,然后再次切换到另一个将轮询服务器的内部流。
我没有在我的示例中包含 count
,因此使用此代码流将永远 运行。为此,您应该查看 takeUntil
运算符。
希望对您有所帮助,欢迎提问!