结合 forkJoin 和 map 的 RxJs V4 Operator

RxJs V4 Operator that combines forkJoin and map

我正在尝试找到一种运算符,让我一步而不是两步处理 forkJoinmap 执行的操作。

我的输入是一个长度可变的可观察数组,我想等到所有这些都完成,然后根据我可以订阅的可观察输出创建一个计算结果。

我还没有遇到一个允许这些要求并以数组作为输入的。

下面是一个用例示例:

const observable1 = Rx.Observable.create(
  (observer) => {
    observer.onNext([1, 2, 3, 4]);
    observer.complete();
  }
);

const observable2 = Rx.Observable.create(
  (observer) => {
    observer.onNext([5, 6, 7, 8]);
    observer.complete();
  }
);

Rx.Observable.magigOperator([observable1, observable2])
   .subscribe(
     (result) => console.log
   );

现在我想要得到的结果是 [1,2,3,4,5,6,7,8] 的输出; 我可以通过使用 forkJoinmap 和 运行 一个展平函数来实现它,但我想知道是否有一个运算符可以让我一次性完成这项工作。

谢谢。

您不必使用 map,您可以将结果选择器函数作为 forkJoin 的最后一个参数传递,例如:

Rx.Observable.forkJoin([observable1, observable2], _.concat)

另请注意,您的示例中的 observable1observable2 未完成,这会阻止 forkJoin 发出结果。

forkJoin 的最后一个参数是一个可以操纵可观察结果的函数。尝试这样的事情:

const observable1 = Rx.Observable.create(
    (observer) => {
        observer.next([1, 2, 3, 4]);
        observer.complete();
    }
);

const observable2 = Rx.Observable.create(
    (observer) => {
        observer.next([5, 6, 7, 8]);
        observer.complete();
    }
);

Rx.Observable.forkJoin([observable1, observable2], (res1, res2) => [...res1, ...res2]).subscribe((res) => {
    console.log(res);
});

请注意观察者没有 next 方法。另外,如果你想 forkJoin 观察者必须完成。

顺便说一句,您可以像这样创建您的可观察对象(冷可观察对象在发出值时自动完成):

const observable1 = Rx.Observable.of([1,2,3,4]);
const observable2 = Rx.Observable.of([5,6,7,8]);

Observable.forkJoin(observable1, observable2, (...args)=> [].concat(...args));

谢谢kit的回答。