将 pipable 运算符与 RxJS / Angular 组合

Combined pipable operators with RxJS / Angular

是否有可能(我找不到任何全面的内容)在一个函数中组合多个 pipable 运算符,这样我就可以通过管道传输它并以不同的方法重新使用它?

事情是这样的:

public ngOnInit(): void {
      this.route.url
            .pipe(
                switchMap( (ids: UrlSegment[]) => /* http call */),
                tap(result => /* setting a variable within a service */),
                tap(result => /* some more manipulating and logic */),
                map(result => /* setting a _this_ variable */)
            )
            .subscribe( () => {
                /* some other async tasks, which depend on _this_ variables */
                this.cdr.detectChanges();
            });
      }

如何提取 pipe() 中的所有内容,以便我可以从不同的方法调用相同的运算符链,这需要执行相同的 http 调用和后续逻辑和操作?

我努力实现的是:

 this.route.url
   .pipe(
       this.combinedPipableMethod(url: UrlSegment[])
   )
   .subscribe()

您可以提取一个方法:

getData(ids: UrlSegment[]) {
   return this.http.get(/* url construction logic */)
      .pipe(
         tap(result => /* setting a variable within a service */),
         tap(result => /* some more manipulating and logic */),
         map(result => /* setting a _this_ variable */)
      );
}

然后switchMap到它:

public ngOnInit(): void {
   this.route.url
      .pipe(
          switchMap(this.getData),
       )
       .subscribe( () => {
          /* some other async tasks, which depend on _this_ variables */
          this.cdr.detectChanges();
       });
}

否则,您可以创建一个自定义运算符,但为此目的似乎有些矫枉过正:

const combinedPipableMethod = () => {
  return source => defer(() => {
    return source.pipe(
       switchMap((ids: UrlSegment[]) => /* http call */),
       tap(result => /* setting a variable within a service */),
       tap(result => /* some more manipulating and logic */),
       map(result => /* setting a _this_ variable */)
    )
  })
}
public ngOnInit(): void {
   this.route.url
      .pipe(
          combinedPipableMethod(),
       )
       .subscribe( () => {
          /* some other async tasks, which depend on _this_ variables */
          this.cdr.detectChanges();
       });
}

您可以将链存储在 Subject 中,然后只需对该主题调用 next() 即可强制管道 运行

//Subject for storing pipeline
loadDataSubject = new Subject();

ngOnInit() {
  loadDataPipe(this.loadDataSubject).subscribe(
     /* some other async tasks, which depend on _this_ variables */
     this.cdr.detectChanges();
  )
}

loadDataPipe(source) {
  return source.pipe(
       switchMap( (ids: UrlSegment[]) => /* http call */),
       tap(result => /* setting a variable within a service */),
       tap(result => /* some more manipulating and logic */),
       map(result => /* setting a _this_ variable */)
  )
}

现在,您可以随时使用 next():

再次调用管道 运行
 ....
 this.loadDataSubject.next();
 ....

您可以使用 rxjs pipe(注意,它是一个独立的函数,而不是 Observable 的方法)函数将一系列运算符组合成一个可重用的运算符。

import { pipe } from "rxjs";
 const customPipable = pipe(
     switchMap( (ids: UrlSegment[]) => /* http call */),
     tap(result => /* setting a variable within a service */),
     tap(result => /* some more manipulating and logic */),
     map(result => /* setting a _this_ variable */)
 )
  this.route.url
  .pipe(customPipable)
  .subscribe()

Here is an article about it