如何在 takeWhile 中使用 rxjs 缓冲区
How to use rxjs buffer with takeWhile
我正在研究 webrtc。应用程序将 icecandidates 发送到后端 firestore 服务器。
问题是对信令服务器的调用被多次触发,因为 onecandidate 被多次触发。我想收集所有的 icecandidates 并向信令服务器发出一次呼叫。
这个想法是缓冲所有事件,直到 iceGathering 完成。下面的尝试不起作用
this.pc = new RTCPeerConnection(iceServers);
const source: Observable<any> = fromEvent(this.pc, 'icecandidate');
const takeWhile$ = source
.pipe(
takeWhile(val=> val.currentTarget.iceGatheringState === 'gathering'
))
const buff = source.pipe(buffer(takeWhile$));
buff.subscribe(() => {
// this.pc.onicecandidate = onicecandidateCallback;
})
方法一:
你快到了。
takeWhile$
获取值并在满足条件时发出它们。因此在 buff
中,每当 takeWhile$
发出一个值时,buff
发出一个包含 icecandidate
个事件的缓冲区。
所以你只需要在 takeWhile$
.
中发出一个值
所以你需要的是 takeLast()
运算符只发出最后一个值。
当您将 takeLast(1)
放入 takeWhile$
时,它只会发出最后一个值,而在 buff
中,最后发出的值会导致创建 icecandidate
事件的缓冲区。
this.pc = new RTCPeerConnection(iceServers);
const source: Observable<any> = fromEvent(this.pc, "icecandidate");
const takeWhile$ = source.pipe(
takeWhile(val => val.currentTarget.iceGatheringState === "gathering"),
takeLast(1)
);
const buff = source.pipe(buffer(takeWhile$));
buff.subscribe((bufferValues) => {
// bufferValues has a buffer of icecandidate events
// this.pc.onicecandidate = onicecandidateCallback;
});
您将可以像上面代码中的 bufferValues
那样访问订阅中 icecandidate
事件的缓冲区。
方法二:
您也可以使用 reduce
运算符来实现相同的场景
this.pc = new RTCPeerConnection(iceServers);
const source: Observable<any> = fromEvent(this.pc, "icecandidate");
const takeWhile$ = source.pipe(
takeWhile(val => val.currentTarget.iceGatheringState === "gathering"),
reduce((acc, val) => [...acc,val], [])
);
takeWhile$.subscribe((bufferValues) => {
// bufferValues has a buffer of icecandidate events
// this.pc.onicecandidate = onicecandidateCallback;
})
我正在研究 webrtc。应用程序将 icecandidates 发送到后端 firestore 服务器。 问题是对信令服务器的调用被多次触发,因为 onecandidate 被多次触发。我想收集所有的 icecandidates 并向信令服务器发出一次呼叫。 这个想法是缓冲所有事件,直到 iceGathering 完成。下面的尝试不起作用
this.pc = new RTCPeerConnection(iceServers);
const source: Observable<any> = fromEvent(this.pc, 'icecandidate');
const takeWhile$ = source
.pipe(
takeWhile(val=> val.currentTarget.iceGatheringState === 'gathering'
))
const buff = source.pipe(buffer(takeWhile$));
buff.subscribe(() => {
// this.pc.onicecandidate = onicecandidateCallback;
})
方法一:
你快到了。
takeWhile$
获取值并在满足条件时发出它们。因此在 buff
中,每当 takeWhile$
发出一个值时,buff
发出一个包含 icecandidate
个事件的缓冲区。
所以你只需要在 takeWhile$
.
所以你需要的是 takeLast()
运算符只发出最后一个值。
当您将 takeLast(1)
放入 takeWhile$
时,它只会发出最后一个值,而在 buff
中,最后发出的值会导致创建 icecandidate
事件的缓冲区。
this.pc = new RTCPeerConnection(iceServers);
const source: Observable<any> = fromEvent(this.pc, "icecandidate");
const takeWhile$ = source.pipe(
takeWhile(val => val.currentTarget.iceGatheringState === "gathering"),
takeLast(1)
);
const buff = source.pipe(buffer(takeWhile$));
buff.subscribe((bufferValues) => {
// bufferValues has a buffer of icecandidate events
// this.pc.onicecandidate = onicecandidateCallback;
});
您将可以像上面代码中的 bufferValues
那样访问订阅中 icecandidate
事件的缓冲区。
方法二:
您也可以使用 reduce
运算符来实现相同的场景
this.pc = new RTCPeerConnection(iceServers);
const source: Observable<any> = fromEvent(this.pc, "icecandidate");
const takeWhile$ = source.pipe(
takeWhile(val => val.currentTarget.iceGatheringState === "gathering"),
reduce((acc, val) => [...acc,val], [])
);
takeWhile$.subscribe((bufferValues) => {
// bufferValues has a buffer of icecandidate events
// this.pc.onicecandidate = onicecandidateCallback;
})