当另一个 Observable(通知程序)发出时,从源 Observable 发出下一个值

Emit next value from the source Observable when another Observable, the notifier, emits

我希望我的情况很常见,但找不到合适的。我想在 Angular2 / RxJS 5 中实现的是:

source:   ---1--2--3--4---------5--------6-|-->
notifier: -o------------o-----o---o--o-o------>
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
output:   ---1----------2-----3---4--5---6-|-->

所以,我有一个发射值的源 Observable,我希望每个 Observable 仅在第二个 Observable(称为通知器)发射时才进入输出。就像来自通知者的一个事件意味着 "allow next to pass through".

我试过 delayWhen,但我的主要问题是所有源值都在等待来自通知程序的同一个事件,例如,如果 3 个源值是 "queued" 和通知程序发出一次,所有 3 个值都通过,这不是我想要的。

我认为 zip 运算符就是您要找的:

sourceSubject:Subject = new Subject();
notifierSubject:Subject = new Subject();

index = 1;

constructor() {
  Observable.zip(
    this.sourceSubject, this.notifierSubject
  )
  .map(data => data[0])
  .subscribe(data => {
    console.log('>> output = '+data.id);
  });
}

emit() {
  this.sourceSubject.next({id: this.index});
  this.index++;
}

notify() {
  this.notifierSubject.next();
}

看到这个 plunkr:https://plnkr.co/edit/MK30JR2qK8aJIGwNqMZ5?p=preview

另见这个问题:

答案是zip:

const valueStream = 
    Rx.Observable.from([0, 1, 2, 3, 4, 5, 6]);

const notificationStream = 
    Rx.Observable.interval(1000).take(7);


Rx.Observable
    .zip(valueStream, notificationStream, (val, notification) => val)
    .subscribe(val => console.log(val));

工作示例here

当从两个流中生成一对时,这会生成一个值。因此,当 notificationStream 产生一个值时,该示例将从 valueStream 打印一个值。