如何将 mergeMap 内部订阅限制为最新的 N 个或滑动 window 队列
How to limit mergeMap inner subscriptions to the N latest or a sliding window queue
我有一个从两个流合并而来的源流。当源流发出事件时,我想调用一个订阅函数 Meteor.subscribe
并保持它打开,所以我使用 mergeMap
。订阅准备就绪后,我通过管道传输到另一个 mergeMap
来填充数据。它运行良好,直到我点击 100 次并且内存消耗猛增。问题是,如何限制 mergeMap,不是 concurrent: Number
的前 N 个订阅,而是最近的 N 个订阅,如滑动 window?
function paginationCache$(): Observable<any> {
return merge(this.pageParamsChanged$, this.routerParamsChanged$)
.pipe(
mergeMap((newParams) => {
// First merge map subscribes to data and un subscribes when second merge map unsubscribes
return Observable.create((observer: Subscriber<any>) => {
let subscription = Meteor.subscribe('my/best/data/sub', newParams,
() => {
observer.next(subscription);
observer.complete();
});
});
}),
mergeMap((subscription: any) => {
// second subscription is just populating the data
return Observable.create((observer: Subscriber<Meteor.Error | any>) => {
const collection = new Mongo.Collection(subscription.collectionName);
const { selector, options } = this.mongoParams();
collection.find(selector, options).dataChanges((data) => {
observer.next({ data });
});
return () => {
subscription.stop();
};
});
})
);
}
我想更详细地解释该代码中发生的事情。
在我的示例中,source 流(管道前的 merge
)只要我在 Web 界面中单击按钮,它就永远不会完成,因此它会发出更改我单击界面中的下一个或上一个按钮。 First mergeMap
从源流中获取更改并将它们发送到后端 API(也有命名冲突 publication/subscription)。因此,当客户端上的数据可用时,我调用 observer.next(subscription)
移动到第二个 mergeMap
,但我无法破坏或停止 meteor 的订阅。两个原因:1.我想实时更改所选数据,2.如果我停止流星的订阅,客户端的数据将被删除。所以,现在 second mergeMap
如果它在服务器上更新,它会不断更新选定的数据。
所以在每个 UI 按钮点击(下一个,上一个)后我有新的订阅链。如果原始数据 table 不大(1000 条记录)就可以了,我只是点击了几次。但是,我可以拥有超过 30000 个,并且可以多次单击我的按钮。
所以,我们的想法是让 mergeMap 像一个有限大小的队列,只包含最后 N 个订阅,但队列在我单击按钮时一直在变化。
最后编辑:工作代码:
function paginationCache$(): Observable<any> {
const N = 3;
const subscriptionsSubject = new Subject();
return merge(this.pageParamsChanged$, this.routerParamsChanged$)
.pipe(
mergeMap((newParams) => {
// First merge map subscribes to data and un subscribes when second merge map unsubscribes
subscriptionsSubject.next();
return Observable.create((observer: Subscriber<any>) => {
let subscription = Meteor.subscribe('mu/best/data/sub', newParams,
() => {
observer.next(subscription);
observer.complete();
});
});
}),
mergeMap((subscription: any) => {
// second subscription is just populating the data
return Observable.create((observer: Subscriber<Meteor.Error | any>) => {
const collection = new Mongo.Collection(subscription.collectionName);
const { selector, options } = this.mongoParams();
collection.find(selector, options).dataChanges((data) => {
observer.next({ data });
});
return () => {
subscription.stop();
};
}).pipe(
takeUntil(subscriptionsSubject
.pipe(
take(N),
filter((_, idx) => idx === N - 1)
)
)
);
})
);
}
在不考虑您的代码段的情况下,我将采用以下方法:
not to the first N subscriptions by concurrent: Number, but to the N recent ones, like a sliding window
如果我没理解错的话,你会想要这样的东西(假设 N = 3
):
N = 3
Crt | 1 | 2 | 3 |
Subscriptions | S1 | S2 | S3 |
When Crt = 4
Crt | 2 | 3 | 4 |
Subscriptions | S2 | S3 | S4 |
如果是这样的话,我会这样解决:
const subscriptionsSubject = new Subject();
src$.pipe(
mergeMap(
data => (new Observable(s => {/* ... */ subscriptionsSubject.next(null) /* Notify about a new subscription when it's the case */ }))
.pipe(
takeUntil(
subscriptionsSubject.pipe(
take(N), // After `N` subscriptions, it will complete
filter((_, idx) => idx === N - 1) // Do not want to complete immediately, but only when exactly `N` subscriptions have been reached
)
)
)
)
)
我这里有两个想法:
您没有完成第二个内部 Observable。我想这不应该是你问题的根源,但如果可以的话最好完成观察者:
return () => {
subscription.stop();
observer.complete();
};
您可以使用bufferCount
滑动window Observables 然后使用switchMap()
订阅它们。沿着这些线的东西:
import { of, range } from 'rxjs';
import { map, bufferCount, switchMap, shareReplay, tap } from 'rxjs/operators';
range(10)
.pipe(
// turn each value to an Observable
// `refCount` is used so that when `switchMap` switches to a new window
// it won't trigger resubscribe to its sources and make more requests.
map(v => of(v).pipe(shareReplay({ refCount: false, bufferSize: 1 }))),
bufferCount(3, 1),
tap(console.log), // for debugging purposes only
switchMap(sourcesArray => merge(...sourcesArray)),
)
.subscribe(console.log);
现场演示:https://stackblitz.com/edit/rxjs-kuybbs?devtoolsheight=60
我不完全确定这是否模拟了您的用例,但我尝试也包含 shareReplay
,这样它就不会触发对同一个 Observable 的多个 Meteor.subscribe
调用。我必须有一个您的代码的工作演示才能自己测试它。
我有一个从两个流合并而来的源流。当源流发出事件时,我想调用一个订阅函数 Meteor.subscribe
并保持它打开,所以我使用 mergeMap
。订阅准备就绪后,我通过管道传输到另一个 mergeMap
来填充数据。它运行良好,直到我点击 100 次并且内存消耗猛增。问题是,如何限制 mergeMap,不是 concurrent: Number
的前 N 个订阅,而是最近的 N 个订阅,如滑动 window?
function paginationCache$(): Observable<any> {
return merge(this.pageParamsChanged$, this.routerParamsChanged$)
.pipe(
mergeMap((newParams) => {
// First merge map subscribes to data and un subscribes when second merge map unsubscribes
return Observable.create((observer: Subscriber<any>) => {
let subscription = Meteor.subscribe('my/best/data/sub', newParams,
() => {
observer.next(subscription);
observer.complete();
});
});
}),
mergeMap((subscription: any) => {
// second subscription is just populating the data
return Observable.create((observer: Subscriber<Meteor.Error | any>) => {
const collection = new Mongo.Collection(subscription.collectionName);
const { selector, options } = this.mongoParams();
collection.find(selector, options).dataChanges((data) => {
observer.next({ data });
});
return () => {
subscription.stop();
};
});
})
);
}
我想更详细地解释该代码中发生的事情。
在我的示例中,source 流(管道前的 merge
)只要我在 Web 界面中单击按钮,它就永远不会完成,因此它会发出更改我单击界面中的下一个或上一个按钮。 First mergeMap
从源流中获取更改并将它们发送到后端 API(也有命名冲突 publication/subscription)。因此,当客户端上的数据可用时,我调用 observer.next(subscription)
移动到第二个 mergeMap
,但我无法破坏或停止 meteor 的订阅。两个原因:1.我想实时更改所选数据,2.如果我停止流星的订阅,客户端的数据将被删除。所以,现在 second mergeMap
如果它在服务器上更新,它会不断更新选定的数据。
所以在每个 UI 按钮点击(下一个,上一个)后我有新的订阅链。如果原始数据 table 不大(1000 条记录)就可以了,我只是点击了几次。但是,我可以拥有超过 30000 个,并且可以多次单击我的按钮。
所以,我们的想法是让 mergeMap 像一个有限大小的队列,只包含最后 N 个订阅,但队列在我单击按钮时一直在变化。
最后编辑:工作代码:
function paginationCache$(): Observable<any> {
const N = 3;
const subscriptionsSubject = new Subject();
return merge(this.pageParamsChanged$, this.routerParamsChanged$)
.pipe(
mergeMap((newParams) => {
// First merge map subscribes to data and un subscribes when second merge map unsubscribes
subscriptionsSubject.next();
return Observable.create((observer: Subscriber<any>) => {
let subscription = Meteor.subscribe('mu/best/data/sub', newParams,
() => {
observer.next(subscription);
observer.complete();
});
});
}),
mergeMap((subscription: any) => {
// second subscription is just populating the data
return Observable.create((observer: Subscriber<Meteor.Error | any>) => {
const collection = new Mongo.Collection(subscription.collectionName);
const { selector, options } = this.mongoParams();
collection.find(selector, options).dataChanges((data) => {
observer.next({ data });
});
return () => {
subscription.stop();
};
}).pipe(
takeUntil(subscriptionsSubject
.pipe(
take(N),
filter((_, idx) => idx === N - 1)
)
)
);
})
);
}
在不考虑您的代码段的情况下,我将采用以下方法:
not to the first N subscriptions by concurrent: Number, but to the N recent ones, like a sliding window
如果我没理解错的话,你会想要这样的东西(假设 N = 3
):
N = 3
Crt | 1 | 2 | 3 |
Subscriptions | S1 | S2 | S3 |
When Crt = 4
Crt | 2 | 3 | 4 |
Subscriptions | S2 | S3 | S4 |
如果是这样的话,我会这样解决:
const subscriptionsSubject = new Subject();
src$.pipe(
mergeMap(
data => (new Observable(s => {/* ... */ subscriptionsSubject.next(null) /* Notify about a new subscription when it's the case */ }))
.pipe(
takeUntil(
subscriptionsSubject.pipe(
take(N), // After `N` subscriptions, it will complete
filter((_, idx) => idx === N - 1) // Do not want to complete immediately, but only when exactly `N` subscriptions have been reached
)
)
)
)
)
我这里有两个想法:
您没有完成第二个内部 Observable。我想这不应该是你问题的根源,但如果可以的话最好完成观察者:
return () => { subscription.stop(); observer.complete(); };
您可以使用
bufferCount
滑动window Observables 然后使用switchMap()
订阅它们。沿着这些线的东西:import { of, range } from 'rxjs'; import { map, bufferCount, switchMap, shareReplay, tap } from 'rxjs/operators'; range(10) .pipe( // turn each value to an Observable // `refCount` is used so that when `switchMap` switches to a new window // it won't trigger resubscribe to its sources and make more requests. map(v => of(v).pipe(shareReplay({ refCount: false, bufferSize: 1 }))), bufferCount(3, 1), tap(console.log), // for debugging purposes only switchMap(sourcesArray => merge(...sourcesArray)), ) .subscribe(console.log);
现场演示:https://stackblitz.com/edit/rxjs-kuybbs?devtoolsheight=60
我不完全确定这是否模拟了您的用例,但我尝试也包含
shareReplay
,这样它就不会触发对同一个 Observable 的多个Meteor.subscribe
调用。我必须有一个您的代码的工作演示才能自己测试它。