RxJS 等待订阅 Observable 完成
RxJS Waits until Subscribe Observable Finish
我正在尝试使用 NestJS 实现 gRPC 并使用 RxJS Observerable 来处理服务器流。
在下面的代码中,我尝试将值从 observable
放入 results
数组。函数 findAllRepos
return 空列表 而 console.log(value)
in subscribe
正确打印所有结果.
findAllRepos(): Repository[] {
const results: Repository[] = [];
const observable = this.mainService.findAllRepos({});
observable.subscribe({
next: (value) => {
console.log(value);
results.push(value);
},
error: (err) => console.log(err),
complete: () => console.log(results)
});
return results;
}
我认为问题在于函数 returns 在 subscribe
完成之前的值。有什么办法可以解决这个问题吗?谢谢!
使用带有订阅的方法不是很好的做法,您会使反应式代码完全不反应。为什么不使用像
这样的东西
findAllRepos(): Observable<Repository[]> {
return this.mainService.findAllRepos({}).pipe(catchError((e) => {
console.log('e ', e);
}), finalize(() => {
console.log('completed');
}))
}
然后在客户端中 findAllRepos().subscribe
?您将在此方法中拥有更大的灵活性,您可以从多个位置调用此方法并根据用例映射结果。
问题
I think the problem is that the function returns value before subscribe finish.
函数(next
、error
、和complete
)都被可观察对象(不是你)调用了在将来。当您深入了解 RxJS 的核心时,您可以控制这些函数的调用方式,但为了简单起见,最好想象这是由 RxJS 库不透明地处理的。
因此,无法如愿以偿。
问题比这更严重。您遇到的问题存在于同步和异步代码之间的任何交互中。您不能同步 运行 异步代码。在单线程环境下(比如JavaScript)试图让一段同步代码等待,会立即导致整个程序死锁。
考虑以下代码:您可能希望此代码输出 "a 等于 0" 1000 毫秒,然后开始输出 "a 等于 1"此后永远。然而,实际发生的是 setTimeout
中的代码永远不会有机会 运行 因为线程将陷入无限循环打印 "a equals 0"。 =41=]
// Start with a = 0
let a = 0;
// After 1 second, set a = 1
setTimeout(() => a = 1, 1000);
// loop forever and print something based on value of a
while(true){
if(a == 0) console.log("a equals 0");
else console.log("a equals 1");
}
解决方案
管理异步代码的两种最流行的方法是通过 promises 或 observables。如果你想要一个函数 return 一些异步的东西,那么让它 return 一个承诺或一个可观察的。
你的情况:
findAllRepos(): Observable<Repository[]> {
const observable = this.mainService.findAllRepos({});
return observable.pipe(
tap(value => console.log("Repository value being added to array: ", value)),
toArray(),
tap({
next: value => console.log("Result Array (Repository[]) : ", value),
error: console.log,
complete: () => console.log("findAllRepos observable complete")
})
);
}
然后在别处获取实际值:
findAllRepos().subscribe(repositories => {
/* Do something with your array of repositories */
});
我正在尝试使用 NestJS 实现 gRPC 并使用 RxJS Observerable 来处理服务器流。
在下面的代码中,我尝试将值从 observable
放入 results
数组。函数 findAllRepos
return 空列表 而 console.log(value)
in subscribe
正确打印所有结果.
findAllRepos(): Repository[] {
const results: Repository[] = [];
const observable = this.mainService.findAllRepos({});
observable.subscribe({
next: (value) => {
console.log(value);
results.push(value);
},
error: (err) => console.log(err),
complete: () => console.log(results)
});
return results;
}
我认为问题在于函数 returns 在 subscribe
完成之前的值。有什么办法可以解决这个问题吗?谢谢!
使用带有订阅的方法不是很好的做法,您会使反应式代码完全不反应。为什么不使用像
这样的东西findAllRepos(): Observable<Repository[]> {
return this.mainService.findAllRepos({}).pipe(catchError((e) => {
console.log('e ', e);
}), finalize(() => {
console.log('completed');
}))
}
然后在客户端中 findAllRepos().subscribe
?您将在此方法中拥有更大的灵活性,您可以从多个位置调用此方法并根据用例映射结果。
问题
I think the problem is that the function returns value before subscribe finish.
函数(next
、error
、和complete
)都被可观察对象(不是你)调用了在将来。当您深入了解 RxJS 的核心时,您可以控制这些函数的调用方式,但为了简单起见,最好想象这是由 RxJS 库不透明地处理的。
因此,无法如愿以偿。
问题比这更严重。您遇到的问题存在于同步和异步代码之间的任何交互中。您不能同步 运行 异步代码。在单线程环境下(比如JavaScript)试图让一段同步代码等待,会立即导致整个程序死锁。
考虑以下代码:您可能希望此代码输出 "a 等于 0" 1000 毫秒,然后开始输出 "a 等于 1"此后永远。然而,实际发生的是 setTimeout
中的代码永远不会有机会 运行 因为线程将陷入无限循环打印 "a equals 0"。 =41=]
// Start with a = 0
let a = 0;
// After 1 second, set a = 1
setTimeout(() => a = 1, 1000);
// loop forever and print something based on value of a
while(true){
if(a == 0) console.log("a equals 0");
else console.log("a equals 1");
}
解决方案
管理异步代码的两种最流行的方法是通过 promises 或 observables。如果你想要一个函数 return 一些异步的东西,那么让它 return 一个承诺或一个可观察的。
你的情况:
findAllRepos(): Observable<Repository[]> {
const observable = this.mainService.findAllRepos({});
return observable.pipe(
tap(value => console.log("Repository value being added to array: ", value)),
toArray(),
tap({
next: value => console.log("Result Array (Repository[]) : ", value),
error: console.log,
complete: () => console.log("findAllRepos observable complete")
})
);
}
然后在别处获取实际值:
findAllRepos().subscribe(repositories => {
/* Do something with your array of repositories */
});