如何将 mergeMap 内部订阅限制为最新的 N 个或滑动 window 队列

How to limit mergeMap inner subscriptions to the N latest or a sliding window queue

我有一个从两个流合并而来的源流。当源流发出事件时,我想调用一个订阅函数 Meteor.subscribe 并保持它打开,所以我使用 mergeMap。订阅准备就绪后,我通过管道传输到另一个 mergeMap 来填充数据。它运行良好,直到我点击 100 次并且内存消耗猛增。问题是,如何限制 mergeMap,不是 concurrent: Number 的前 N ​​个订阅,而是最近的 N 个订阅,如滑动 window?

function paginationCache$(): Observable<any> {

    return merge(this.pageParamsChanged$, this.routerParamsChanged$)
        .pipe(
            mergeMap((newParams) => {
                // First merge map subscribes to data and un subscribes when second merge map unsubscribes
                return Observable.create((observer: Subscriber<any>) => {

                    let subscription = Meteor.subscribe('my/best/data/sub', newParams,
                        () => {
                            observer.next(subscription);
                            observer.complete();
                        });
                });
            }),
            mergeMap((subscription: any) => {
                // second subscription is just populating the data

                return Observable.create((observer: Subscriber<Meteor.Error | any>) => {

                    const collection = new Mongo.Collection(subscription.collectionName);

                    const { selector, options } = this.mongoParams();

                    collection.find(selector, options).dataChanges((data) => {
                        observer.next({ data });
                    });

                    return () => {
                        subscription.stop();
                    };
                });

            })
        );
}

我想更详细地解释该代码中发生的事情。

在我的示例中,source 流(管道前的 merge)只要我在 Web 界面中单击按钮,它就永远不会完成,因此它会发出更改我单击界面中的下一个或上一个按钮。 First mergeMap 从源流中获取更改并将它们发送到后端 API(也有命名冲突 publication/subscription)。因此,当客户端上的数据可用时,我调用 observer.next(subscription) 移动到第二个 mergeMap,但我无法破坏或停止 meteor 的订阅。两个原因:1.我想实时更改所选数据,2.如果我停止流星的订阅,客户端的数据将被删除。所以,现在 second mergeMap 如果它在服务器上更新,它会不断更新选定的数据。

所以在每个 UI 按钮点击(下一个,上一个)后我有新的订阅链。如果原始数据 table 不大(1000 条记录)就可以了,我只是点击了几次。但是,我可以拥有超过 30000 个,并且可以多次单击我的按钮。

所以,我们的想法是让 mergeMap 像一个有限大小的队列,只包含最后 N 个订阅,但队列在我单击按钮时一直在变化。

最后编辑:工作代码:

function paginationCache$(): Observable<any> {
    const N = 3;
    const subscriptionsSubject = new Subject();
    return merge(this.pageParamsChanged$, this.routerParamsChanged$)
        .pipe(
            mergeMap((newParams) => {
                // First merge map subscribes to data and un subscribes when second merge map unsubscribes

                subscriptionsSubject.next();

                return Observable.create((observer: Subscriber<any>) => {

                    let subscription = Meteor.subscribe('mu/best/data/sub', newParams,
                        () => {
                            observer.next(subscription);
                            observer.complete();
                        });
                });
            }),
            mergeMap((subscription: any) => {
                // second subscription is just populating the data

                return Observable.create((observer: Subscriber<Meteor.Error | any>) => {

                    const collection = new Mongo.Collection(subscription.collectionName);
                    const { selector, options } = this.mongoParams();

                    collection.find(selector, options).dataChanges((data) => {
                        observer.next({ data });
                    });

                    return () => {
                        subscription.stop();
                    };
                }).pipe(
                    takeUntil(subscriptionsSubject
                        .pipe(
                            take(N),
                            filter((_, idx) => idx === N - 1)
                        )
                    )
                );
            })
        );
}

在不考虑您的代码段的情况下,我将采用以下方法:

not to the first N subscriptions by concurrent: Number, but to the N recent ones, like a sliding window

如果我没理解错的话,你会想要这样的东西(假设 N = 3):

N = 3

Crt             |  1 |  2 |  3 |
Subscriptions   | S1 | S2 | S3 |


When Crt = 4

Crt           | 2  | 3  |  4 |
Subscriptions | S2 | S3 | S4 |

如果是这样的话,我会这样解决:

const subscriptionsSubject = new Subject();

src$.pipe(
  mergeMap(
    data => (new Observable(s => {/* ... */ subscriptionsSubject.next(null) /* Notify about a new subscription when it's the case */ }))
      .pipe(
        takeUntil(
          subscriptionsSubject.pipe(
            take(N), // After `N` subscriptions, it will complete
            filter((_, idx) => idx === N - 1) // Do not want to complete immediately, but only when exactly `N` subscriptions have been reached
          )
        )
      )
  )
)

我这里有两个想法:

  1. 您没有完成第二个内部 Observable。我想这不应该是你问题的根源,但如果可以的话最好完成观察者:

    return () => {
      subscription.stop();
      observer.complete();
    };
    
  2. 您可以使用bufferCount 滑动window Observables 然后使用switchMap() 订阅它们。沿着这些线的东西:

    import { of, range } from 'rxjs'; 
    import { map, bufferCount, switchMap, shareReplay, tap } from 'rxjs/operators';
    
    range(10)
      .pipe(
        // turn each value to an Observable
        // `refCount` is used so that when `switchMap` switches to a new window
        // it won't trigger resubscribe to its sources and make more requests.
        map(v => of(v).pipe(shareReplay({ refCount: false, bufferSize: 1 }))),
        bufferCount(3, 1),
        tap(console.log), // for debugging purposes only
        switchMap(sourcesArray => merge(...sourcesArray)),
      )
      .subscribe(console.log);
    

    现场演示:https://stackblitz.com/edit/rxjs-kuybbs?devtoolsheight=60

    我不完全确定这是否模拟了您的用例,但我尝试也包含 shareReplay,这样它就不会触发对同一个 Observable 的多个 Meteor.subscribe 调用。我必须有一个您的代码的工作演示才能自己测试它。