rxjs/Observable: 运行 在获取第一个流后执行一次函数(连续可观察)

rxjs/Observable: Running a function once after getting first stream (continuous observable)

首先,抱歉标题太长了。

我正在尝试使用 forEach from angularfire2 订阅一个连续流数组,但我还想 运行 在确认第一组数据已经进来之后的一个函数:

this.people.forEach((person) => {
    person.items = this.database.list('/items' + person.key);
    person.items.subscribe((data) => {person.itemsList = data});
});

myIntendedFunction();

有没有办法放置 myIntendedFunction() 这样:

  1. 在每个 person
  2. 收到第一个 data 流后 运行s
  3. 它运行只有一次?

这不完全是您要实现的目标。 据我了解,您正试图在收到第一人称后致电 "myIntendedFunction"。 您可以多次订阅您的可观察对象(订阅顺序很重要)并使用 first() 运算符仅获取第一个值然后取消订阅。

this.people.subscribe((person) => {
    person.items= this.database.list('/items');
    person.items.subscribe((data) => {person.itemsList = data});
});

this.people.first().subscribe(myIntendedFunction);

如果您不希望第一个请求发生两次,那么完成起来会稍微复杂一些:

const connectables: ConnectableObservable<any>[] = [];

this.people.forEach(person => {
    person.items = this.database.list('/items' + person.key);
    const connectable = person.items.publish();
    connectables.push(connectable);
    connectable.subscribe((data) => {person.itemsList = data});
});

Observable.zip(...connectables).take(1).subscribe(myIntendedFunction);

connectables.forEach(c => c.connect());

这里发生的事情是这样的:publish()的效果基本上就是可以多次订阅同一个数据流。此外,在您调用 connect() 之前,订阅功能不会被调用。 如果我们使用 person.items.share(),它是 person.items.publish().connect() 的糖,它会立即发出请求,并且我们的应用程序可能会由于竞争条件而出现错误。

zip() 等待每个传递的 observable 发出一个项目并将这些项目作为数组一次发出。我们只希望这发生在第一组项目上,所以我们只是 take(1).

在 firebase 列表上使用 import 'rxjs/add/operator/first'.first() 运算符将获取列表中的初始值。如果没有此运算符,您的函数将被多次调用并可能导致浏览器崩溃。 .toPromise() 运算符 import 'rxjs/add/operator/toPromise' 会将获取的列表observable 转换为promise。您可以使用 .then() 来使用对 运行 所需功能的承诺。

    this.forEach.person((people) => {
        person.items = this.database.list('/items' + person.key);
        person.items
            .first()
            .toPromise()
            .then(() => {
                myIntendedFunction();
         });
    })