RxJS - 只订阅一次但不完成 Observable

RxJS - subscribe only once but do not complete Observable

想象一下当您有一些 Observable 包含实时变化的数据时的情况,如下例...

interface User {
   name: string;
   projectId: string;
   dataThatChangesALotInRealTime: Object;
}

userData: Observable<User>

这个userData observable 用于组件中,用来显示一些实时变化的数据。例如

<p>
{{ (userData | async)?.dataThatChangesALotInRealTime }}
</p>

现在我想根据 userData observable 中的当前数据向数据库中插入一些数据。这是函数

addToDatabase() {
  let sub = this.userData.subscribe(data => {
     this.exampleDatabase.doc(`test/${data.dataThatChangesALotInRealTime.id}`)
          .add({ test: 'hello'})
     sub.unsubscribe() // <- This
  })
}

问题

这是取消订阅内部订阅以避免多次插入数据库的正确解决方案吗?有 different/better 的方法吗?

这只是一个简单的例子,如果您有任何问题或我的解释不力,请在评论中告诉我,我会更新我的问题。谢谢

您可以使用 first 运算符:

this.userData.pipe(first()).subscribe(...);

这将在发出第一个值后自动完成(并因此取消订阅)。

请注意,您应确保它在完成之前至少发出一次,否则将引发错误。如果不能保证这一点,可以用take(1)代替:

this.userData.pipe(take(1)).subscribe(...);

请注意,这实际上并没有直接修改 userData observable,因此其他对其的订阅将继续发出。这是因为 rxjs 中的运算符不会修改可观察对象,而是 return 一个新的可观察对象。