RxJS - 对 maxWait 和 maxElements 使用 windowWhen() windows
RxJS - using windowWhen() for maxWait and maxElements windows
我正在尝试将对服务器的调用捆绑到最大 maxEntries,但不想等待超过 maxWait 毫秒。这曾经在 RxJS 4 中作为 windowWithTimeOrCount()
提供,但已从 RxJS 5 中删除。
一切正常,只是 window 的最后一个元素丢失了。说到 'lost' - 这就是我现在的感受。有哪位 RxJS 大师可以告诉我我做错了什么吗?
private chunk(queue: Observable<CacheEntry>, maxEntries: number, maxWait: number): Observable<Observable<CacheEntry>> {
// We have an incoming stream of CacheEntries to be retrieved. We want to bundle these in units of max maxEntries
// but wait no longer than max maxWait ms. We return an Observable, that emits Observables of CacheEntries that
// complete after maxEntries / maxWait (whatever comes first).
const toggleSubject = new Subject<void>();
return queue
// Start emitting a new Observable every time toggleSubject emits.
// (bufferWhen() wouldn't work as we have to count the elements as they come and buffer only gives us the
// complete collection)
.windowWhen(() => toggleSubject)
// map() is called once for every window (maxEntries/maxWait)
// the inner do() is called for every element in the window, allowing us to set up the timeout callback and to
// count all elements, then emitting on toggleSubject, triggering a new Observable.
// (We have to map() here - instead of an outer do() - because otherwise the original obs would be streamed
// and the hooked up version with the inner do() would never be called.)
.map((obs) => {
// counts the number of cacheEntries already in this stream
let count = 0;
// flag to kill the timeout callback
let done = false;
// we have to return an Observable
return obs.do(() => {
count++;
if (count === 1) {
// we start counting when the first element is streamed.
IntervalObservable.create(maxWait).first().subscribe(() => {
if (!done) {
//trigger due to maxWait
toggleSubject.next(null);
}
});
}
if (count > (maxEntries)) {
done = true;
// trigger due due to maxEntries(' + maxEntries + ')');
toggleSubject.next(null);
}
}
);
});
}
由于 if (count > (maxEntries))
而触发 toggleSubject.next(null)
的元素丢失了(不在任何 window 中)。
编辑:maxTime 在推送新 Observable 的第一个元素时开始计时。 if (count === 1 )
。这是 a) 我在 map()
中的 windowed Observables 内部工作的原因和 b) 重要的,因为这是必需的行为。
示例:maxElements:100,maxWait:100。101 个元素在 t=99 时被推送。预期行为:在 t=99 时,一个包含 100 个元素的 Observable 被推送。剩下 1 个元素。计数器 + 定时器复位。在 t=199,第二个 'chunk' 的计数器到期并推送一个具有 1 个元素的 Observable。
(在这个例子中,Brandons(见答案)代码会——如果我没看错的话——在 t=99 时推送一个包含 100 个元素的 Observable,然后 1 毫秒后,在 t =100,一个只有一个元素的 Observable。)
是的,您不想使用 map
来产生这样的副作用。正如您所注意到的,您最终会掉落物品。
这是一个通用方法,我认为它可以满足您的需求。
注意:RXJS 5 目前有一个 issue 具有此发布重载的类型定义。我添加了一些应该允许它在 TypeScript 中编译的类型转换。
chunk<T>(queue: Observable<T>, maxEntries: number, maxWait: number): Observable<Observable<T>> {
// use publish() so that we can subscribe multiple times to the same stream of data.
return queue.publish(entries => {
// observable which will trigger after maxWait
const timer = IntervalObservable.create(maxWait);
// observable which will trigger after maxEntries
const limit = entries.take(maxEntries).last();
// observable which will trigger on either condition
const endOfWindow = limit.takeUntil(timer);
// use endOfWindow to close each window.
return entries.windowWhen(() => endOfWindow) as Observable<T>;
}) as Observable<Observable<T>>;
}
编辑:
如果您不想在每个 window 中的第一个项目到达之后才开始计时,那么您可以这样做:
chunk<T>(queue: Observable<T>, maxEntries: number, maxWait: number): Observable<Observable<T>> {
// use publish() so that we can subscribe multiple times to the same stream of data.
return queue.publish(entries => {
// observable which will trigger after maxWait after the first
// item in this window arrives:
const timer = entries.take(1).delay(maxWait);
// observable which will trigger after maxEntries
const limit = entries.take(maxEntries).last();
// observable which will trigger on either condition
const endOfWindow = limit.takeUntil(timer);
// use endOfWindow to close each window.
return entries.windowWhen(() => endOfWindow) as Observable<T>;
}) as Observable<Observable<T>>;
}
我想出的解决方案是在异步调度程序上切换windowWhen()
。
if (count === (maxEntries)) {
done = true;
this.LOGGER.debug(' - trigger due due to maxEntries(' + maxEntries + ')');
Rx.Scheduler.async.schedule(()=>toggleSubject.next(null));
}
问题是 windowWhen()
立即完成了返回的 Observables - 阻止了任何下游操作员接收最后一个值。
很抱歉提出(和回答)这个问题。在发帖之前我尝试了 Rx.Scheduler.async
等,但不知何故它似乎不起作用。
我正在尝试将对服务器的调用捆绑到最大 maxEntries,但不想等待超过 maxWait 毫秒。这曾经在 RxJS 4 中作为 windowWithTimeOrCount()
提供,但已从 RxJS 5 中删除。
一切正常,只是 window 的最后一个元素丢失了。说到 'lost' - 这就是我现在的感受。有哪位 RxJS 大师可以告诉我我做错了什么吗?
private chunk(queue: Observable<CacheEntry>, maxEntries: number, maxWait: number): Observable<Observable<CacheEntry>> {
// We have an incoming stream of CacheEntries to be retrieved. We want to bundle these in units of max maxEntries
// but wait no longer than max maxWait ms. We return an Observable, that emits Observables of CacheEntries that
// complete after maxEntries / maxWait (whatever comes first).
const toggleSubject = new Subject<void>();
return queue
// Start emitting a new Observable every time toggleSubject emits.
// (bufferWhen() wouldn't work as we have to count the elements as they come and buffer only gives us the
// complete collection)
.windowWhen(() => toggleSubject)
// map() is called once for every window (maxEntries/maxWait)
// the inner do() is called for every element in the window, allowing us to set up the timeout callback and to
// count all elements, then emitting on toggleSubject, triggering a new Observable.
// (We have to map() here - instead of an outer do() - because otherwise the original obs would be streamed
// and the hooked up version with the inner do() would never be called.)
.map((obs) => {
// counts the number of cacheEntries already in this stream
let count = 0;
// flag to kill the timeout callback
let done = false;
// we have to return an Observable
return obs.do(() => {
count++;
if (count === 1) {
// we start counting when the first element is streamed.
IntervalObservable.create(maxWait).first().subscribe(() => {
if (!done) {
//trigger due to maxWait
toggleSubject.next(null);
}
});
}
if (count > (maxEntries)) {
done = true;
// trigger due due to maxEntries(' + maxEntries + ')');
toggleSubject.next(null);
}
}
);
});
}
由于 if (count > (maxEntries))
而触发 toggleSubject.next(null)
的元素丢失了(不在任何 window 中)。
编辑:maxTime 在推送新 Observable 的第一个元素时开始计时。 if (count === 1 )
。这是 a) 我在 map()
中的 windowed Observables 内部工作的原因和 b) 重要的,因为这是必需的行为。
示例:maxElements:100,maxWait:100。101 个元素在 t=99 时被推送。预期行为:在 t=99 时,一个包含 100 个元素的 Observable 被推送。剩下 1 个元素。计数器 + 定时器复位。在 t=199,第二个 'chunk' 的计数器到期并推送一个具有 1 个元素的 Observable。
(在这个例子中,Brandons(见答案)代码会——如果我没看错的话——在 t=99 时推送一个包含 100 个元素的 Observable,然后 1 毫秒后,在 t =100,一个只有一个元素的 Observable。)
是的,您不想使用 map
来产生这样的副作用。正如您所注意到的,您最终会掉落物品。
这是一个通用方法,我认为它可以满足您的需求。
注意:RXJS 5 目前有一个 issue 具有此发布重载的类型定义。我添加了一些应该允许它在 TypeScript 中编译的类型转换。
chunk<T>(queue: Observable<T>, maxEntries: number, maxWait: number): Observable<Observable<T>> {
// use publish() so that we can subscribe multiple times to the same stream of data.
return queue.publish(entries => {
// observable which will trigger after maxWait
const timer = IntervalObservable.create(maxWait);
// observable which will trigger after maxEntries
const limit = entries.take(maxEntries).last();
// observable which will trigger on either condition
const endOfWindow = limit.takeUntil(timer);
// use endOfWindow to close each window.
return entries.windowWhen(() => endOfWindow) as Observable<T>;
}) as Observable<Observable<T>>;
}
编辑:
如果您不想在每个 window 中的第一个项目到达之后才开始计时,那么您可以这样做:
chunk<T>(queue: Observable<T>, maxEntries: number, maxWait: number): Observable<Observable<T>> {
// use publish() so that we can subscribe multiple times to the same stream of data.
return queue.publish(entries => {
// observable which will trigger after maxWait after the first
// item in this window arrives:
const timer = entries.take(1).delay(maxWait);
// observable which will trigger after maxEntries
const limit = entries.take(maxEntries).last();
// observable which will trigger on either condition
const endOfWindow = limit.takeUntil(timer);
// use endOfWindow to close each window.
return entries.windowWhen(() => endOfWindow) as Observable<T>;
}) as Observable<Observable<T>>;
}
我想出的解决方案是在异步调度程序上切换windowWhen()
。
if (count === (maxEntries)) {
done = true;
this.LOGGER.debug(' - trigger due due to maxEntries(' + maxEntries + ')');
Rx.Scheduler.async.schedule(()=>toggleSubject.next(null));
}
问题是 windowWhen()
立即完成了返回的 Observables - 阻止了任何下游操作员接收最后一个值。
很抱歉提出(和回答)这个问题。在发帖之前我尝试了 Rx.Scheduler.async
等,但不知何故它似乎不起作用。