Rxjs 仅使用 concatMap 执行队列中的最后一项

Rxjs only execute last item in queue with concatMap

我正在将 NGRX 与 RXJS 结合使用。在效果层中,我使用 concatMap 对我的请求进行排队,但是,一旦最新的请求完成,我想执行 添加到队列中的最后一项 而不是所有剩余的那些。这可能吗?

我尝试过使用 mergeMap、switchMap 等,但我需要 运行 同步请求而不是并发请求。这就是为什么我需要使用 concatMap(或类似的东西)。

    updateThing$ = createEffect(() =>
    this.actions$.pipe(
        ofType(updateThingRequest),
        concatMap((action) => {

            return this.myService
            //Once this request is complete, Instead of executing all of the pending requests from the concatMap queue,
            //I only want to execute the last request pending in the concatMap queue, if there are any, and remove all the other pending ones.
                .update(action)
                .pipe(
                    map(() => {

                        return someActionComplete();
                    }),
                    catchError((err) => {
                        return of(
                            someActionError()
                        );
                    })
                );
        })
    )
);

您可以使用 exhoustMap 它会等到内部 observable 完成并忽略将要出现的其他值。

https://www.learnrxjs.io/learn-rxjs/operators/transformation/exhaustmap

我不确定它是否是正确的解决方案,但也许你可以使用 finalize operator? (or try to figure out proper operator by rxjs decision tree)

您遇到的问题是背压,您想使用“最新的”背压策略。

问题是 RXJS 不是一个很好的背压实现,在 RxJava 中这将是一行,在 RXJS 中则不会。 RXJS4.x之后的版本,根本就没有背压处理,你可以用一些算子模拟一下,但差不多就是这样。

更多相关信息:https://reactivex.io/documentation/operators/backpressure.html#collapseRxJS

因此,对于您的情况,说实话,自定义运算符似乎是个好主意...

P.S。我设法设计了一个没有自定义运算符的解决方案,这在理论上应该可行,但复杂度相当高。如果您有兴趣,我会在此处添加代码。

编辑 - 在没有自定义运算符的情况下添加 RXJS 解决方案

该代码使用两个流,一个用于请求,一个用于发送新请求的可能性:

const requests = interval(200).pipe(take(20));
const requestCompletedStream = new BehaviorSubject(true);

combineLatest(requests, requestCompletedStream)
  .pipe(
    tap(([requestNumber]) =>
      // this will print twice for the requests that are actually executed
      console.log(`Request with number ${requestNumber} was requested`)
    ),
    filter(([_, requestComplete]) => requestComplete), // continue only when the last request has completed
    map(([requestNumber]) => requestNumber),
    distinctUntilChanged(), // do not handle the same request twice
    concatMap((requestNumber) => {
      console.log(`Async request starts handling for: ${requestNumber}`);
      requestCompletedStream.next(false);

      return of(1).pipe(
        delay(2000), // Simulates async request to the backend
        tap(() => {
          requestCompletedStream.next(true);
        }),
        map(() => requestNumber)
      );
    })
  )
  .subscribe((val) =>
    console.log(`Client received completed request with number ${val}`)
  );

你可以在这里测试:https://stackblitz.com/edit/rxjs-backpressure-last-strategy?devtoolsheight=100&file=index.ts

示例请求 20 个请求,请求之间有 200 毫秒的延迟,处理一个请求是 2000 毫秒,因此请求 0、10 和 19 是唯一处理并传递给订阅者的请求。

这在控制台中是可见的,如下所示:

Request with number 0 was requested
Async request starts handling for: 0
Request with number 0 was requested
Request with number 1 was requested
Request with number 2 was requested
Request with number 3 was requested
Request with number 4 was requested
Request with number 5 was requested
Request with number 6 was requested
Request with number 7 was requested
Request with number 8 was requested
Request with number 9 was requested
Request with number 10 was requested
Request with number 10 was requested
Client received completed request with number 0
Async request starts handling for: 10
Request with number 10 was requested
Request with number 11 was requested
Request with number 12 was requested
Request with number 13 was requested
Request with number 14 was requested
Request with number 15 was requested
Request with number 16 was requested
Request with number 17 was requested
Request with number 18 was requested
Request with number 19 was requested
Request with number 19 was requested
Client received completed request with number 10
Async request starts handling for: 19
Request with number 19 was requested
Request with number 19 was requested
Client received completed request with number 19