如何在 takeWhile 中使用 rxjs 缓冲区

How to use rxjs buffer with takeWhile

我正在研究 webrtc。应用程序将 icecandidates 发送到后端 firestore 服务器。 问题是对信令服务器的调用被多次触发,因为 onecandidate 被多次触发。我想收集所有的 icecandidates 并向信令服务器发出一次呼叫。 这个想法是缓冲所有事件,直到 iceGathering 完成。下面的尝试不起作用

this.pc = new RTCPeerConnection(iceServers);
const source: Observable<any> =  fromEvent(this.pc, 'icecandidate');
const takeWhile$ = source
        .pipe(
            takeWhile(val=> val.currentTarget.iceGatheringState === 'gathering'
    ))
const buff = source.pipe(buffer(takeWhile$));
    buff.subscribe(() => {
        // this.pc.onicecandidate = onicecandidateCallback;
    })

方法一:

你快到了。

takeWhile$ 获取值并在满足条件时发出它们。因此在 buff 中,每当 takeWhile$ 发出一个值时,buff 发出一个包含 icecandidate 个事件的缓冲区。

所以你只需要在 takeWhile$.

中发出一个值

所以你需要的是 takeLast() 运算符只发出最后一个值。

当您将 takeLast(1) 放入 takeWhile$ 时,它只会发出最后一个值,而在 buff 中,最后发出的值会导致创建 icecandidate 事件的缓冲区。

this.pc = new RTCPeerConnection(iceServers);

const source: Observable<any> = fromEvent(this.pc, "icecandidate");

const takeWhile$ = source.pipe(
  takeWhile(val => val.currentTarget.iceGatheringState === "gathering"),
  takeLast(1)
);

const buff = source.pipe(buffer(takeWhile$));

buff.subscribe((bufferValues) => {

   // bufferValues has a buffer of icecandidate events

  // this.pc.onicecandidate = onicecandidateCallback;
});

您将可以像上面代码中的 bufferValues 那样访问订阅中 icecandidate 事件的缓冲区。

方法二:

您也可以使用 reduce 运算符来实现相同的场景

this.pc = new RTCPeerConnection(iceServers);

const source: Observable<any> = fromEvent(this.pc, "icecandidate");

const takeWhile$ = source.pipe(
  takeWhile(val => val.currentTarget.iceGatheringState === "gathering"),
  reduce((acc, val) => [...acc,val], [])
);

takeWhile$.subscribe((bufferValues) => {

  // bufferValues has a buffer of icecandidate events

 // this.pc.onicecandidate = onicecandidateCallback;
})