计数、间隔和事件的响应式扩展缓冲区

Reactive Extensions Buffer on count, interval and event

我想缓冲发送到我的服务器的事件。刷新缓冲区的触发器是已达到缓冲区大小、已达到缓冲期或已卸载 window。

我通过创建主题并使用 buffer 和关闭通知程序来缓冲发送到我的服务器的事件。我使用 race 作为关闭通知程序,并与 window.beforeunload 事件竞争缓冲期。

this.event$ = new Subject();
this.bufferedEvent$ = this.event$
    .buffer(
        Observable.race(
            Observable.interval(bufferPeriodMs),
            Observable.fromEvent(window, 'beforeunload')
        )
    )
    .filter(events => events.length > 0)
    .switchMap(events =>
        ajax.post(
            this.baseUrl + RESOURCE_URL,
            {
                entries: events,
            },
            {
                'Content-Type': 'application/json',
            }
       )
    );

问题是,我现在如何也限制缓冲区的大小。即,当缓冲区有 10 个项目时,我从不希望它被刷新。

这是我的有效解决方案。添加了额外的 console.log() 以显示事件的顺序。

唯一有点麻烦的是fullBufferTrigger中的.skip(1),但需要它,因为它会在缓冲区已满(natch)时触发,但bufferedEvent$中的缓冲区似乎没有在触发之前获得最新事件。

幸运的是,timeoutTrigger 到位后,最后一个事件被发出。没有超时,fullBufferTrigger 本身不会发出最终事件。

此外,将 buffer 更改为 bufferWhen,因为前者似乎没有使用两个触发器触发,尽管您希望它来自文档。
footnotebuffer(race()) 比赛只完成一次,因此无论哪个触发器先到达那里,都将被使用,其他触发器将被忽略。相比之下,bufferWhen(x => race()) 会在每次事件发生时进行评估。

const bufferPeriodMs = 1000

const event$ = new Subject()
event$.subscribe(event => console.log('event$ emit', event))

// Define triggers here for testing individually
const beforeunloadTrigger = Observable.fromEvent(window, 'beforeunload')
const fullBufferTrigger = event$.skip(1).bufferCount(2)
const timeoutTrigger = Observable.interval(bufferPeriodMs).take(10)

const bufferedEvent$ = event$
  .bufferWhen( x => 
    Observable.race(
      fullBufferTrigger,
      timeoutTrigger
    )
  )
  .filter(events => events.length > 0)

// output
fullBufferTrigger.subscribe(x => console.log('fullBufferTrigger', x))
timeoutTrigger.subscribe(x => console.log('timeoutTrigger', x))
bufferedEvent$.subscribe(events => {
  console.log('subscription', events)
})

// Test sequence
const delayBy = n => (bufferPeriodMs * n) + 500 
event$.next('event1')
event$.next('event2')
event$.next('event3')

setTimeout( () => {
  event$.next('event4')
}, delayBy(1))

setTimeout( () => {
  event$.next('event5')
}, delayBy(2))

setTimeout( () => {
  event$.next('event6')
  event$.next('event7')
}, delayBy(3))

工作示例:CodePen

编辑:触发缓冲区的替代方法

由于 bufferWhenrace 的组合可能有点低效(比赛在每次事件发射时重新开始),另一种方法是将触发器合并到一个流中并使用简单的 buffer

const bufferTrigger$ = timeoutTrigger
  .merge(fullBufferTrigger)
  .merge(beforeunloadTrigger)

const bufferedEvent$ = event$
  .buffer(bufferTrigger$)
  .filter(events => events.length > 0)

关于使用 独立触发器 的解决方案让我困扰的一件事是 fullBufferTrigger 不知道什么时候 timeoutTrigger 已经发出了它的缓冲值之一,所以给定正确的事件序列,fullBuffer 将在超时后提前触发。

理想情况下,希望 fullBufferTriggertimeoutTrigger 触发时重置,但事实证明这很难做到。

使用bufferTime()

在 RxJS v4 中有一个运算符 bufferWithTimeOrCount(timeSpan, count, [scheduler]),在 RxJS v5 中它被卷入了一个额外的签名 bufferTime()(从清晰的角度来看可以说是一个错误)。

bufferTime<T>(
  bufferTimeSpan: number, 
  bufferCreationInterval: number, 
  maxBufferSize: number, 
  scheduler?: IScheduler
): OperatorFunction<T, T[]>;

唯一剩下的问题是如何合并 window.beforeunload 触发器。查看 bufferTime 的源代码,它应该在接收 onComplete.
时刷新它的缓冲区 因此,我们可以通过向缓冲事件流发送 onComplete 来处理 window.beforeunload

bufferTime 的规范没有对 onComplete 的明确测试,但我想我已经设法将其放在一起。

备注:

  • 超时时间设置大,为了测试把它拿出来。
  • 源事件流不受影响,以说明 event8 已添加但从未发出,因为 window 在它发生之前已被销毁。
  • 查看输出流没有 beforeunloadTrigger,注释掉发出onComplete 的行。 Event7 在缓冲区中,但不会发出。

测试:

const bufferPeriodMs = 7000  // Set high for this test
const bufferSize = 2
const event$ = new Rx.Subject()

/*
  Create bufferedEvent$
*/
const bufferedEvent$ = event$
  .bufferTime(bufferPeriodMs, null, bufferSize)
  .filter(events => events.length > 0)
const subscription = bufferedEvent$.subscribe(console.log)  

/*
  Simulate window destroy
*/
const destroy = setTimeout( () => {
  subscription.unsubscribe()
}, 4500)

/*
  Simulate Observable.fromEvent(window, 'beforeunload')
*/
const beforeunloadTrigger = new Rx.Subject()
// Comment out the following line, observe that event7 does not emit
beforeunloadTrigger.subscribe(x=> event$.complete())
setTimeout( () => {
  beforeunloadTrigger.next('unload')
}, 4400)

/*
  Test sequence
  Event stream:        '(123)---(45)---6---7-----8--|'
  Destroy window:      '-----------------------x'
  window.beforeunload: '---------------------y'
  Buffered output:     '(12)---(34)---(56)---7'
*/
event$.next('event1')
event$.next('event2')
event$.next('event3')
setTimeout( () => { event$.next('event4'); event$.next('event5') }, 1000)
setTimeout( () => { event$.next('event6') }, 3000)
setTimeout( () => { event$.next('event7') }, 4000)
setTimeout( () => { event$.next('event8') }, 5000)

工作示例:CodePen