RxJs:你能把运算符作为参数传播到管道运算符中吗

RxJs: Can you spread operators as arguments into pipe operator

我有两个可观察流,它们执行非常独立的映射逻辑,但最终以以下 3 个运算符结束:

  this.selection
    .pipe(
      ..Custom mapping operators
      tap(_ => this.devicesLoading = true),
      switchMap(d => this.mapService.findLocationForDevices(d)),
      map(loc => marker([loc.latitude, loc.longitude])
    )
    .subscribe(markers => this.plotMarkers(markers));

我想将最后的 tap, switchMap, map 运算符移动到一个通用函数中,这样我就可以将它们应用到我的两个可观察流中。

我想这样做:

  private resolveLocationsAndConvertToMarkers = (devices: String[]) => [
    tap(_ => this.devicesLoading = true),
    switchMap((devices: string[]) => this.mapService.findLocationForDevices(devices)),
    map(loc => marker([loc.latitude, loc.longitude])
  ];

但我不确定如何将这些运算符散布到管道参数中,例如:#

      this.selection
        .pipe(
          // Custom mapping operators
          ... this.resolveLocationsAndConvertToMarkers
        )
        .subscribe(markers => this.plotMarkers(markers));

这个错误 there are no overloads that expect 3 or 5 arguments..

您可以尝试使用原生 .apply()

this.selection
    .pipe.apply(null,this.resolveLocationsAndConvertToMarkers)

或将运算符列表包装在pipe()

  private resolveLocationsAndConvertToMarkers = (devices: String[]) => pipe(
    tap(_ => this.devicesLoading = true),
    switchMap((devices: string[]) => this.mapService.findLocationForDevices(devices)),
    map(loc => marker([loc.latitude, loc.longitude])
  );

或return高阶函数

private resolveLocationsAndConvertToMarkers = (devices: String[]) => source=>source.pipe(
        tap(_ => this.devicesLoading = true),
        switchMap((devices: string[]) => this.mapService.findLocationForDevices(devices)),
        map(loc => marker([loc.latitude, loc.longitude])
      );

您可以尝试一种反应式方法(除非真正隔离,否则没有副作用):

const preSelection$ = this.selection
  .pipe
  //..Custom mapping operators
  ();

const selection$: Observable<Marker> = preSelection$.pipe(
  switchMap(preSelection =>
    concat(
      of(null),
      of(preSelection).pipe(
        switchMap(d => this.mapService.findLocationForDevices(d)),
        map(loc => marker([loc.latitude, loc.longitude]))
      )
    )
  ),
  shareReplay({ bufferSize: 1, refCount: true })
);

const isLoading$: Observable<boolean> = selection$.pipe(map(x => !!x));

const sideEffectUpdatePlotMarkers$ = selection$.pipe(
  tap(markers => this.plotMarkers(markers))
);

// isolate `subscribe` calls and side effects as much as possible
sideEffectUpdatePlotMarkers$.subscribe();

我希望这个答案能帮助其他遇到这个问题的人。接受的答案对我来说并不完全有效,主要原因是 null 作为第一个参数传递给 .apply() 而不是我的可观察函数。这是一个类似于我在我的项目中成功实现的示例。

private pipeActions = [
   filter(...),
   map(...),
];

private myObservable = combineLatest(...);

doThing(): Observable<any> {
   return this.myObservable
      .pipe.apply(this.myObservable, [...this.pipeActions]);
}

doOtherThing(): Observable<any> {
   return this.myObservable
      .pipe.apply(
         this.myObservable,
         [...this.pipeActions, map(...)], // Do something additionally after my list of pipe actions
      );
}