我如何在 Rx Observable 上 `await`?

How can I `await` on an Rx Observable?

我希望能够等待一个可观察对象,例如

const source = Rx.Observable.create(/* ... */)
//...
await source;

天真的尝试会导致 await 立即解析而不阻止执行

编辑: 我的完整预期用例的伪代码是:

if (condition) {
  await observable;
}
// a bunch of other code

我知道我可以将其他代码移到另一个单独的函数中并将其传递给订阅回调,但我希望能够避免这种情况。

您必须向 await 传递承诺。将可观察对象的下一个事件转换为承诺并等待它。

if (condition) {
  await observable.first().toPromise();
}

编辑说明:该答案最初使用 .take(1) 但已更改为使用 .first() ,这避免了如果流在值通过之前结束则 Promise 永远不会解决的问题。

从 RxJS v8 开始,toPromise 将被删除。相反,上面可以替换为 await firstValueFrom(observable)

您需要 await 一个承诺,因此您需要使用 toPromise()。有关 toPromise() 的更多详细信息,请参阅 this

可能必须

await observable.first().toPromise();

正如之前在评论中指出的那样,当存在空的已完成可观察对象时,take(1)first() 运算符之间存在实质性差异。

Observable.empty().first().toPromise() 将导致拒绝 EmptyError 可以相应地处理,因为确实没有价值。

并且 Observable.empty().take(1).toPromise() 将导致具有 undefined 值的分辨率。

编辑:

.toPromise() 现在已在 RxJS 7 中弃用(来源:https://rxjs.dev/deprecations/to-promise

新答案:

As a replacement to the deprecated toPromise() method, you should use one of the two built in static conversion functions firstValueFrom or lastValueFrom.

示例:

import { interval, lastValueFrom } from 'rxjs';
import { take } from 'rxjs/operators';
 
async function execute() {
  const source$ = interval(2000).pipe(take(10));
  const finalNumber = await lastValueFrom(source$);
  console.log(`The final number is ${finalNumber}`);
}
 
execute();
 
// Expected output:
// "The final number is 9"

旧答案:

如果 toPromise 已被弃用,您可以使用 .pipe(take(1)).toPromise,但如您所见 here 它并未被弃用。

所以请按照说明使用 toPromise (RxJs 6):

//return basic observable
const sample = val => Rx.Observable.of(val).delay(5000);
//convert basic observable to promise
const example = sample('First Example')
  .toPromise()
  //output: 'First Example'
  .then(result => {
    console.log('From Promise:', result);
  });

async/await 示例:

//return basic observable
const sample = val => Rx.Observable.of(val).delay(5000);
//convert basic observable to promise
const example = await sample('First Example').toPromise()
// output: 'First Example'
console.log('From Promise:', result);

阅读更多here

使用新的 firstValueFrom()lastValueFrom() 而不是 toPromise(),正如此处指出的那样,从 RxJS 7 开始不推荐使用,并将在 RxJS 8 中删除。

import { firstValueFrom} from 'rxjs';
import { lastValueFrom } from 'rxjs';

this.myProp = await firstValueFrom(myObservable$);
this.myProp = await lastValueFrom(myObservable$);

这在 RxJS 7+ 中可用

参见:https://indepth.dev/rxjs-heads-up-topromise-is-being-deprecated/

不推荐使用 toPromise(),因为它在 RxJs 7 之后的版本中会贬值。您可以使用 RxJs 7 lastValueFrom()firstValueFrom() 中的两个新运算符。可以找到更多详细信息here

const result = await lastValueFrom(myObservable$);

Beta 版的实现可在此处获得:

我正在使用 RxJS V 6.4.0,因此我应该在 V 7.x.x toPromise() 中使用已弃用的版本。受其他答案的启发,这是我对 toPromise()

所做的
import { first, ... } from 'rxjs/operators';

...

if (condition) {
  await observable$.pipe(first()).toPromise();
}

...

请注意我是如何在 pipe() 中使用 last() 的。因为在我身上 observable.first() 不像 macil

提到的那样工作

希望这对像我一样使用 RxJS V 6.x.x 的其他人有所帮助:)。

谢谢。