仅当 Rxjs 轮询上的某些属性发生变化时才调度 NgRx 操作

Dispatch NgRx actions only when some properties changes on Rxjs Polling

我有一个像这样的 Rxjs 轮询效果 :

updateProductData$ = createEffect(() =>
  this.actions$.pipe(
    ofType(fromActions.loadProduct),
    switchMap(_) =>
      this.http.get('endpoint').pipe(
        delay(1000),
        repeat(),
        switchMap((data) => [
          fromActions.updateFoo({foo: data.foo}),
          fromActions.updateBar({bar: data.bar}),
        ])
      )
    )
  );

如何仅在 data.foodata.bar 分别发生变化时调度 updateFooupdateBar

我可以通过使用 distinctUntilChanged 来改进这一点,这样做不会在 data.stuff 发生变化时触发操作,但是,当其中一个发生变化时,两个操作仍会分派。

...
     repeat(),
     distinctUntileChanged((prev, curr) => prev.foo === curr.foo && prev.bar === curr.bar) // works but it fires both actions when either one changes
     switchMap((data) => [
       fromActions.updateFoo({foo: data.foo}),
       fromActions.updateBar({bar: data.bar}),
     ])

我想在 data.foo 更改时调度 updateFoo,在 data.bar 更改时调度 updateBar,我知道 data 有很多其他属性可以随着时间的推移而改变。

您可以尝试使用 pairwise 运算符来获取先前的状态并显式检查先前的值。我使用 startWith(null) 来触发 HTTP 请求中第一个值的发射。没有它,它不会在第二次调用之前开始发射

stop$ = new Subject<any>();
poll$ = timer(0, 1000).pipe(
  startWith(null),          // <-- emit `[null, data]` for first emission
  switchMap(this.http.get('endpoint')),
  pairwise(),
  map((data) => ({
    foo: {prev: data[0].foo, current: data[1].foo},
    bar: {prev: data[0].bar, current: data[1].bar}
  })),
  takeUntil(stop$)
);

updateProductData$ = createEffect(() =>
  this.actions$.pipe(
    ofType(fromActions.loadProduct),
    switchMap(_ =>
      poll$.pipe(
        switchMap((data) => {
          let result = [];

          if (data.foo.prev && data.foo.prev !== data.foo.current) 
            result.push(fromActions.updateFoo({foo: data.foo}))
          if (data.bar.prev && data.bar.prev !== data.bar.current) 
            result.push(fromActions.updateBar({bar: data.bar}))

          return result;
        })
      )
  )
);

注意:我没有使用过 NgRX,这是未经测试的代码。

我认为这可能是一种方法:

updateProductData$ = createEffect(() =>
  this.actions$.pipe(
    ofType(fromActions.loadProduct),
    switchMap(_) =>
      this.http.get('endpoint').pipe(
        delay(1000),
        repeat(),
        
        multicast(
          new Subject(),
          source => merge(
            // concerning `foo`
            source.pipe(
              distinctUntilChanged((prev, crt) => prev.foo === crt.foo),
              map(data => fromActions.updateFoo({foo: data.foo})),
            ),

            // concerning `bar`
            source.pipe(
              distinctUntilChanged((prev, crt) => prev.bar === crt.bar),
              map(data => fromActions.updateBar({bar: data.bar})),
            ),
          )
        )
      )
    )
  );

multicast 的第二个参数中的 source 是已声明为第一个参数的 Subject 的实例。通过使用 multicast,我们可以将问题分成 2 个其他较小的问题,而无需冗余地订阅源(这就是使用 Subject 的原因)。