RXJS 扩展从不退出递归调用
RXJS expand never exit from recursive call
我在将文件上传到 firebase 后创建了对侦听 GCP 触发器日志的递归调用,这基本上运行良好。
我的问题是,我的递归扩展函数永远不会退出。
正如您从代码中看到的那样,我不止一次检查了这些值,似乎一切都很好,我也在控制台日志中看到了 ENTERED
消息,但是递归调用从未完成,没有任何错误。
如果条件为真,我想中断或强制中断递归调用,我该怎么办?
public getLog(filePath: string): Observable<object[]> {
try {
...
return this.getLogChunk()
.pipe(
expand((data: any) => {
if (!environment.production && data && data.entries && data.entries.length > 0) {
console.groupCollapsed('GCP Service Info [getLog]');
console.info('[fileName]', fileName);
console.info('[some - Finished]', data.entries.some((x: any) => x.textPayload.includes('Finished')));
console.info('[some - Filename]', data.entries.some((x: any) => x.textPayload.includes(fileName)));
console.info('[some - Finished - Filename]', data.entries.some((x: any) => x.textPayload.includes('Finished') && x.textPayload.includes(fileName)));
console.info('[filter - Filename]', data.entries.filter((x: any) => x.textPayload.includes(fileName)));
console.groupEnd();
}
if (data &&
data.entries &&
data.entries.some((x: any) => x.textPayload.includes('Finished') && x.textPayload.includes(fileName))) {
console.log('ENTERED!!!!!');
return of({});
}
return this.getLogChunk().pipe(delay(2000));
}),
map(res => res),
reduce((acc: object[], val: any) => acc.concat(val), new Array<object>())
)
}
return new Observable<object[]>();
} catch (e) {
if (!environment.production) {
console.groupCollapsed('GCP Service Error [getLog]');
console.error('[error]', e);
console.groupEnd();
}
throw new Error(e);
}
}
在我调用 getlog 的地方,(你可以看到我检查了快照更改以仅触发一次 getlog,上传完成后,我对其进行了调试,没问题) 因此[complete] ENTERED
从未触发,因为 getlog 中的无限递归循环。
...
const uploadTask: AngularFireUploadTask = this.storage.upload(filePath, fileToUpload);
...
uploadTask.snapshotChanges()
.pipe(
filter(task => {
if (task) {
return task.state === firebase.storage.TaskState.SUCCESS
}
return false;
})
)
.subscribe(val => {
this.gcpService.getLog(filePath)
.subscribe({
next: data => console.log('[next] ENTERED'),
complete: () => {
console.log('[complete] ENTERED');
}
});
})
...
我发现问题出在 return of({})
调用中 - 这也扩展了,因为它提供了一个值。如果您希望扩展结束,请改为调用 return EMPTY
(从 'Rxjs' 导入)。 EMTPY
没有 return 值并立即完成,因此 expand 不会继续监听。
我在将文件上传到 firebase 后创建了对侦听 GCP 触发器日志的递归调用,这基本上运行良好。
我的问题是,我的递归扩展函数永远不会退出。
正如您从代码中看到的那样,我不止一次检查了这些值,似乎一切都很好,我也在控制台日志中看到了 ENTERED
消息,但是递归调用从未完成,没有任何错误。
如果条件为真,我想中断或强制中断递归调用,我该怎么办?
public getLog(filePath: string): Observable<object[]> {
try {
...
return this.getLogChunk()
.pipe(
expand((data: any) => {
if (!environment.production && data && data.entries && data.entries.length > 0) {
console.groupCollapsed('GCP Service Info [getLog]');
console.info('[fileName]', fileName);
console.info('[some - Finished]', data.entries.some((x: any) => x.textPayload.includes('Finished')));
console.info('[some - Filename]', data.entries.some((x: any) => x.textPayload.includes(fileName)));
console.info('[some - Finished - Filename]', data.entries.some((x: any) => x.textPayload.includes('Finished') && x.textPayload.includes(fileName)));
console.info('[filter - Filename]', data.entries.filter((x: any) => x.textPayload.includes(fileName)));
console.groupEnd();
}
if (data &&
data.entries &&
data.entries.some((x: any) => x.textPayload.includes('Finished') && x.textPayload.includes(fileName))) {
console.log('ENTERED!!!!!');
return of({});
}
return this.getLogChunk().pipe(delay(2000));
}),
map(res => res),
reduce((acc: object[], val: any) => acc.concat(val), new Array<object>())
)
}
return new Observable<object[]>();
} catch (e) {
if (!environment.production) {
console.groupCollapsed('GCP Service Error [getLog]');
console.error('[error]', e);
console.groupEnd();
}
throw new Error(e);
}
}
在我调用 getlog 的地方,(你可以看到我检查了快照更改以仅触发一次 getlog,上传完成后,我对其进行了调试,没问题) 因此[complete] ENTERED
从未触发,因为 getlog 中的无限递归循环。
...
const uploadTask: AngularFireUploadTask = this.storage.upload(filePath, fileToUpload);
...
uploadTask.snapshotChanges()
.pipe(
filter(task => {
if (task) {
return task.state === firebase.storage.TaskState.SUCCESS
}
return false;
})
)
.subscribe(val => {
this.gcpService.getLog(filePath)
.subscribe({
next: data => console.log('[next] ENTERED'),
complete: () => {
console.log('[complete] ENTERED');
}
});
})
...
我发现问题出在 return of({})
调用中 - 这也扩展了,因为它提供了一个值。如果您希望扩展结束,请改为调用 return EMPTY
(从 'Rxjs' 导入)。 EMTPY
没有 return 值并立即完成,因此 expand 不会继续监听。