如何通过管道 rx 运算符组合片段数据?
How to pipe rx operators to combine fragment data?
我想用Rx来处理串口数据,数据包结构是这样的。
+-----------+--------+---------+
| Signature | Length | Payload |
+-----------+--------+---------+
| 2 byte | 1 byte | ... |
+-----------+--------+---------+
但是接收到的数据会有很多碎片。比如(Signature are 0xFC 0xFA)
Data 1: 0xFC 0xFA 0x02 0x01 0x01 0xFC 0xFA 0x03 0x01 //包含一个数据包和一个分片数据包
Data 2: 0x02 0x03 0xFC 0xFA 0x02 0x01 0x03 // 包含前一个数据包和一个新数据包的连续片段
如何通过管道将运算符输出为
数据包 1:0xFC 0xFA 0x02 0x01 0x01
数据包 2:0xFC 0xFA 0x03 0x01 0x02 0x03
...
你需要一个有状态的观察者。它会有这些状态:
- 侦听数据包的开始
- 第一个 byte-listening 收到第二个字节
- 收到第二个 byte-listening 长度
- 收到 header-listening body
在 RxJava 中,您将创建一个 class Packetizer
,它有两个感兴趣的方法:
public void nextByte(Char next);
public Observable<Packet> packetSource();
在内部,它会维护状态,包括 body 剩余部分的长度等。它还会有一个 PublishSubject<Packet>
,它会在构建时发出每个数据包。
您正在按定义的模式拆分字节流。我不确定您如何接收字节以及如何对可观察对象进行建模,Observable<byte>
或 Observable<byte[]>
!?
无论如何,这里我猜的是用字符串翻译的,但思路还是一样的。我选择 x
后跟 y
作为模式(在你的例子中是 0xFC 0xFA
)。
您会在代码中找到我的评论:
final ImmutableList<String> PATTERN = ImmutableList.of("x", "y");
Observable<String> source = Observable
.fromArray("x", "y", "1", "2", "3", "x", "y", "4", "5", "x", "y", "1", "x", "y", "x", "4", "6", "x")
.share();//publishing to hot observable (we are splitting this source by some of its elements)
//find the next pattern
Observable<List<String>> nextBoundary = source
.buffer(2, 1)
.filter(pairs -> CollectionUtils.isEqualCollection(PATTERN, pairs));
//https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer2.png
//start a buffer for each x found
//buffers (packets) may overlap
source.buffer(source.filter(e -> e.equals("x")),
x -> source
.take(1)//next emission after the x
.switchMap(y -> y.equals("y") ?
nextBoundary // if 'y' then find the next patter
: Observable.empty() //otherwise stop buffering
)
)
.filter(packet -> packet.size() > 2)//do not take the wrong buffers like ["x", "4"] (x not followed by y) but it is not lost
.map(packet -> {
//each packet is like the following :
//[x, y, 1, 2, 3, x, y]
//[x, y, 4, 5, x, y]
//[x, y, 1, x, y]
//[x, y, x, 4, 6, x]
//because of the closing boundary, the event comes too late
//then we have to handle the packet (it overlaps on the next one)
List<String> ending = packet.subList(packet.size() - 2, packet.size());
return CollectionUtils.isEqualCollection(PATTERN, ending) ? packet.subList(0, packet.size() - 2) : packet;
})
.blockingSubscribe(e -> System.out.println(e));
结果:
[x, y, 1, 2, 3]
[x, y, 4, 5]
[x, y, 1]
[x, y, x, 4, 6, x]
我想用Rx来处理串口数据,数据包结构是这样的。
+-----------+--------+---------+
| Signature | Length | Payload |
+-----------+--------+---------+
| 2 byte | 1 byte | ... |
+-----------+--------+---------+
但是接收到的数据会有很多碎片。比如(Signature are 0xFC 0xFA)
Data 1: 0xFC 0xFA 0x02 0x01 0x01 0xFC 0xFA 0x03 0x01 //包含一个数据包和一个分片数据包
Data 2: 0x02 0x03 0xFC 0xFA 0x02 0x01 0x03 // 包含前一个数据包和一个新数据包的连续片段
如何通过管道将运算符输出为
数据包 1:0xFC 0xFA 0x02 0x01 0x01
数据包 2:0xFC 0xFA 0x03 0x01 0x02 0x03
...
你需要一个有状态的观察者。它会有这些状态:
- 侦听数据包的开始
- 第一个 byte-listening 收到第二个字节
- 收到第二个 byte-listening 长度
- 收到 header-listening body
在 RxJava 中,您将创建一个 class Packetizer
,它有两个感兴趣的方法:
public void nextByte(Char next);
public Observable<Packet> packetSource();
在内部,它会维护状态,包括 body 剩余部分的长度等。它还会有一个 PublishSubject<Packet>
,它会在构建时发出每个数据包。
您正在按定义的模式拆分字节流。我不确定您如何接收字节以及如何对可观察对象进行建模,Observable<byte>
或 Observable<byte[]>
!?
无论如何,这里我猜的是用字符串翻译的,但思路还是一样的。我选择 x
后跟 y
作为模式(在你的例子中是 0xFC 0xFA
)。
您会在代码中找到我的评论:
final ImmutableList<String> PATTERN = ImmutableList.of("x", "y");
Observable<String> source = Observable
.fromArray("x", "y", "1", "2", "3", "x", "y", "4", "5", "x", "y", "1", "x", "y", "x", "4", "6", "x")
.share();//publishing to hot observable (we are splitting this source by some of its elements)
//find the next pattern
Observable<List<String>> nextBoundary = source
.buffer(2, 1)
.filter(pairs -> CollectionUtils.isEqualCollection(PATTERN, pairs));
//https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer2.png
//start a buffer for each x found
//buffers (packets) may overlap
source.buffer(source.filter(e -> e.equals("x")),
x -> source
.take(1)//next emission after the x
.switchMap(y -> y.equals("y") ?
nextBoundary // if 'y' then find the next patter
: Observable.empty() //otherwise stop buffering
)
)
.filter(packet -> packet.size() > 2)//do not take the wrong buffers like ["x", "4"] (x not followed by y) but it is not lost
.map(packet -> {
//each packet is like the following :
//[x, y, 1, 2, 3, x, y]
//[x, y, 4, 5, x, y]
//[x, y, 1, x, y]
//[x, y, x, 4, 6, x]
//because of the closing boundary, the event comes too late
//then we have to handle the packet (it overlaps on the next one)
List<String> ending = packet.subList(packet.size() - 2, packet.size());
return CollectionUtils.isEqualCollection(PATTERN, ending) ? packet.subList(0, packet.size() - 2) : packet;
})
.blockingSubscribe(e -> System.out.println(e));
结果:
[x, y, 1, 2, 3]
[x, y, 4, 5]
[x, y, 1]
[x, y, x, 4, 6, x]