在 Angular 应用程序中动态添加运算符到 Rxjs observable

Dynamically add operator to Rxjs observable in Angular application

在我们的 Angular 应用程序中,我们有一个加载器组件,它通过输入 setter 属性.

接收一个 observable

在 setter 中,我们首先将布尔值 isLoading 设置为 true,它在模板中开始显示加载微调器。 然后我们订阅可观察对象,当接收到数据时,isLoading 布尔值再次设置为 false,使微调器消失:

// loading-component template:

<ng-container *ngIf="data; else errorOrLoading">
  ...
</ng-container>

<ng-template #errorOrLoading>
 ...
</ng-template>

// loading-component class (short version without unsubscribe):

  @Input() set data$(data$: Observable<any>) {
    if (data$) {
      this.data = null;
      data$
        .subscribe(data => {
          this.data = data;
        });
    }
  }

如果我们只有一个来自 Observable 的事件,这会很好用。 但是当 Observable 发出多个事件时,isLoading 不会再次设置为 true,因为 setter 没有被调用。

有没有办法动态添加一个额外的 tap 运算符,允许在给定的 Observable 链开始发出新事件之前设置 this.data = null?

所以如果 Observable 是:

myService.onChanges$.pipe(
  switchMap(() => this.getBackendData())
);

我们能否动态添加一个 tap 运算符,将管道更改为:

myService.onChanges$.pipe(
  tap(() => this.data = null),
  switchMap(_ => this.getBackendData())
);

更新:我选择简化加载程序控制并将所有可观察的相关逻辑移动到服务,感觉更具可扩展性和灵活性。

更新 #1

解决方案#1。使用 shareReplay()

根据

All you have to do is use a shareReplay() operator:

class MyService {
    public data$: Observable<any>;
    public loaded$: Observable<boolean>;

    constructor(private dataService: DataService) {
        this.data$ = this.dataService.loadData().pipe(
            startWith(null), // data$ emit a null before data is ready followed by the API data.
            shareReplay(1);
        );
        this.loaded$ = this.data$.pipe(
           mapTo(true),
           startWith(false)
        );
    }
}

You have to call myService.data$.subscribe() to trigger the first reading of the stream to make the data ready. You can do that in the constructor, but keep in mind that Angular doesn't create a service until it is first used. If you want the data to be eagerly loaded, then use a resolver in a route or inject the service into a NgModule constructor and subscribe there.


解决方案 #2。使用专用服务

更好的解决方案是引入一个 LoaderService 来处理 component/view 数据的加载。

根据您的项目需要,它可以是 singleton or shared

假设我们的服务将仅为当前视图(共享服务)处理加载状态

加载程序服务:

export class LoaderService {
  private  readonly  _loading: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(false)

  // returns value of `_loading` on the moment of accessing `isLoading`
  get isLoading(): boolean {
    return this._loading.getValue();
  }
  
  // returns value of `_loading` as Observable
  get isLoading$(): Observable<boolean> {
    return this._loading.asObservable();
  }
  
  // pushes `true` as a new value of `_loading` subject and notifies subscribers
  start(): void {
    this._loading.next(true);
  }
  
  // pushes `true` as a new value of `_loading` subject and notifies subscribers
  stop(): void {
    this._loading.next(false);
  }
  
}

假设我们有两个服务:

  • API - 仅包含 return 纯 (non-modified) http 流
  • 的方法声明
  • ComponentService - 在将数据传递给表示组件之前准备数据的服务

我的水果服务


  constructor(
   ..., 
   private api: MyFruitAPIService,
   private loader: LoaderService
  ) { ... }

  getApples(): Observable<T> {
    this.loader.start();
    
    return this.api.getApples()
     .pipe(
      finalize(() => this.loader.stop()) // finalize - will call a function when observable completes or errors
     );
  }
   

我的苹果水果组件


 readonly loading$ = this.loader.isLoading$.pipe(tap((loading) => {
   if (loading) {
    this.data = null;
   }
 }));

 constructor(private loader: LoaderService) { ... }

 <ng-container *ngIf="loading$ | async; else errorOrLoading">

 ...

 </ng-container>

简而言之,没有。这是不可能的。这也可能不是一个好主意,因为这样做会破坏封装。

您也不能动态更改函数调用。如果 doADance() 为你跳舞,你不能真正动态地让它也添加一个数字列表。函数的实现应该与其调用保持分离。尽管就您的观点而言,Javascript 确实有人让函数通过绑定不同的上下文等动态地做奇怪的事情。

RxJS 也将实现与流的调用(订阅)分开。如果一个库进行了 20 次转换并且 returns 流向您,您并不是 真的 收到了转换列表,这只是库编写者可以更改的实现细节,而无需引入重大变化。

更新 1:

True, encapsulation is important and exists for a reason. However, we of course cán dynamically add list of numbers to doADance() by passing the list as a parameter. Maybe a controlled way of allowing sort of placeholders inside the pipe to be filled with dynamically given operators would kind of be the same?

虽然 pipe 中的占位符实际上没有意义,因为任何可管道运算符都可以很容易地变成静态运算符,并且任何一组运算符都可以很容易地变成单个运算符。

你能做的是非常接近的事情。例如,这不适用于从库返回的流,但您可以设计流以允许对其处理方式进行自定义。

为您服务:

function dataOnChange(): Observable<Data>{
  return myService.onChanges$.pipe(
    switchMap(() => this.getBackendData())
  );
}

function dataOnChangePlus(op): Observable<Data>{
  if(op == null) return dataOnChange();
  return myService.onChanges$.pipe(
    op,
    switchMap(() => this.getBackendData())
  );
}

别处:

this.service.dataOnChangePlus(
  tap(_ => this.data = null)
).subscribe(console.log);

在其他地方,做同样的事情但有点不同:

this.service.dataOnChangePlus(
  st => st.pipe(
    mapTo(null),
    tap(val => this.data = val)
  )
).subscribe(console.log);

现在,dataOnChangePlus 的使用者将返回一个流,并且还可以帮助定义该流的构造方式。它不是动态添加运算符,但它确实允许您推迟运算符的定义。

好处是每次调用都可以定义不同的东西。

如果需要,您可以通过只允许呼叫者访问特定类型的接线员来缩小呼叫者可以执行的操作。例如,只让他们为 tap 运算符定义 lambda:

function dataOnChangePlusTap(tapper): Observable<Data>{
  return myService.onChanges$.pipe(
    tap(tapper),
    switchMap(() => this.getBackendData())
  );
}

this.service.dataOnChangePlusTap(
  _ => this.data = null
).subscribe(console.log);