如何从一个流中获取在另一个流的最后一个事件之后发生的事件

How to get events from one stream which happened after the last event from other stream

有两个流,永远不会完成:

--a---b-c-d---e--f---->

-----1-------2------3-->

我想从第一个流中获取在第二个流中的最后一个事件之后发生的事件。

一开始是这样的:

--a->
 (?)
---->
 (=)
--a->

第二个流发出第一个事件后,它看起来像这样:

--a---b-c-d->
    (?)
-----1------>
    (=)
------b-c-d->

在第二个流中的新事件之后:

--a---b-c-d---e--f->
        (?)
-----1-------2----->
        (=)
--------------e--f->

等等...需要哪一组运算符来执行此操作?

您可以使用 CombineLatest 来 return 事件,例如:

/* Have staggering intervals */
var source1 = Rx.Observable.interval(1000)
    .map(function(i) {

        return {
            data: (i + 1),

            time: new Date()
        };
    });

var source2 = Rx.Observable.interval(3000).startWith(-1)
    .map(function(i) {

        return {
            data: (i + 1),
            time: new Date()
        };
    });

// Combine latest of source1 and source2 whenever either gives a value with a selector
var source = Rx.Observable.combineLatest(
    source1,
    source2,
    function(s1, s2) {

        if (s1.time > s2.time) {
            return s1.data + ', ' + s2.data;
        }

        return undefined;
    }
).filter(x => x !== undefined).take(10);

var subscription = source.subscribe(
    function(x) {
        console.log('Next: %s', x);
    },
    function(err) {
        console.log('Error: %s', err);
    },
    function() {
        console.log('Completed');
    });

我不是 100% 相信你的说法,因为它们看起来有些矛盾,但我建议使用 window or buffer 的替代方法,如果你需要的只是将发生在第二个来源。

var count = 0;
//Converts the source into an Observable of Observable windows
source1.window(source2).subscribe(w => {
  var local = count++;
  //Just to show that this is indeed an Observable
  //Subscribe to the inner Observable
  w.subscribe(x => console.log(local + ': ' + x));
});

请注意,这只会发出来自第一个来源的值,如果您希望它们与其他来源的值结合使用,请务必按照其他答案的建议使用 combineLatestwithLatestFrom