如何使用 RxJS 从 arrayBuffer 中拆分数据帧?
How to split a data frame from an arrayBuffer with RxJS?
我正在使用 websocket 从硬件接收数据帧。
数据框定义如下:
0xbb(head) ---data--- 0xee(tail)
接收到的数据存储在Uint8Array中,可能有多个帧:
var buffer = new Uint8Array([0xbb,0,0,0,0xee,0xbb,1,1,1,0xee,0xbb,3,3,3,0xee]);
我可以将数组转换为可观察的:
var obs= Rx.Observable.from(buffer);
Rx大理石:
--0xbb--0--0--0--0xee--0xbb--1--1--1--0xee--0xbb--2--2--2--0xee------
------------------000------------------111------------------222------
如何使用RxJS拆分observable?
使用哪些运营商?
RxJS 是这种情况的最佳实践吗?
const source = Rx.Observable
.from(['0xbb','0','0','0','0xee','0xbb','1','1','1','0xee','0xbb','3','3','3','0xee'])
.concatMap(i => Rx.Observable.of(i).delay(1));
source
.filter(i => i != '0xee' && i != '0xbb')
.buffer(source.filter(i => i === '0xee'))
.subscribe(val => console.log(val));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.2/Rx.js"></script>
我们需要使用 concatMap(.. delay(1))
将值转换为异步,否则缓冲区的 closingNotifier
比值的摄取快 运行,最终得到三个空数组.但是由于您从 websocket 接收到这些数据包,您已经是异步的了。
此代码并非 100% 万无一失,例如当外部设备不发射时会发生什么 0xee
?我们最终将下一条消息连接到上一条消息。
我想你可以用 scan()
实现你想要的:
const buffer = new Uint8Array([0xbb,0,0,0,0xee,0xbb,1,1,1,0xee,0xbb,3,3,3,0xee]);
const obs = Observable.from(buffer);
obs.scan((acc, v) => {
if (v === 0xbb) {
return [v];
} else {
acc.push(v);
return acc;
}
}, [])
.filter(acc => acc[acc.length - 1] === 0xee)
.subscribe(console.log);
这将打印以下数组:
[ 187, 0, 0, 0, 238 ]
[ 187, 1, 1, 1, 238 ]
[ 187, 3, 3, 3, 238 ]
但是,如果您需要为每个数据帧发出 Observables,那么按照 Mark 的回答,只需使用 window()
而不是 buffer()
。
我正在使用 websocket 从硬件接收数据帧。 数据框定义如下:
0xbb(head) ---data--- 0xee(tail)
接收到的数据存储在Uint8Array中,可能有多个帧:
var buffer = new Uint8Array([0xbb,0,0,0,0xee,0xbb,1,1,1,0xee,0xbb,3,3,3,0xee]);
我可以将数组转换为可观察的:
var obs= Rx.Observable.from(buffer);
Rx大理石:
--0xbb--0--0--0--0xee--0xbb--1--1--1--0xee--0xbb--2--2--2--0xee------
------------------000------------------111------------------222------
如何使用RxJS拆分observable? 使用哪些运营商? RxJS 是这种情况的最佳实践吗?
const source = Rx.Observable
.from(['0xbb','0','0','0','0xee','0xbb','1','1','1','0xee','0xbb','3','3','3','0xee'])
.concatMap(i => Rx.Observable.of(i).delay(1));
source
.filter(i => i != '0xee' && i != '0xbb')
.buffer(source.filter(i => i === '0xee'))
.subscribe(val => console.log(val));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.2/Rx.js"></script>
我们需要使用 concatMap(.. delay(1))
将值转换为异步,否则缓冲区的 closingNotifier
比值的摄取快 运行,最终得到三个空数组.但是由于您从 websocket 接收到这些数据包,您已经是异步的了。
此代码并非 100% 万无一失,例如当外部设备不发射时会发生什么 0xee
?我们最终将下一条消息连接到上一条消息。
我想你可以用 scan()
实现你想要的:
const buffer = new Uint8Array([0xbb,0,0,0,0xee,0xbb,1,1,1,0xee,0xbb,3,3,3,0xee]);
const obs = Observable.from(buffer);
obs.scan((acc, v) => {
if (v === 0xbb) {
return [v];
} else {
acc.push(v);
return acc;
}
}, [])
.filter(acc => acc[acc.length - 1] === 0xee)
.subscribe(console.log);
这将打印以下数组:
[ 187, 0, 0, 0, 238 ]
[ 187, 1, 1, 1, 238 ]
[ 187, 3, 3, 3, 238 ]
但是,如果您需要为每个数据帧发出 Observables,那么按照 Mark 的回答,只需使用 window()
而不是 buffer()
。