将 ByteBuffer 流转换为 Rx 中的行的有效方法
Efficient way to transform ByteBuffer stream into lines in Rx
我想通过按行结束字符拆分将 Observable<ByteBuffer>
转换为行 (Observable<String>
)。如果我们有像toString
、concat
和splitByLine
这样的函数,我们必须能够像下面那样做:
Observable<ByteBuffer> o = ...;
o.map(toString).reduce(concat).flatMap(splitByLine);
然而,该算法需要先扫描整个字节并将它们存储在内存中,然后才能实际发出反序列化字符串的第一行。每次行结束符以递增方式出现在字节中时,如何发出新行?
我终于找到了rxjava-string. It provides us with Rx operators to handle a stream of chunked byte arrays and strings. The API document is found here。
题目可以通过以下方式实现:
Observable<byte[]> o = ...;
Charset charset = Charset.forName("UTF-8");
// StringObservable have no operators for ByteBuffer yet
StringObservable.byLine(StringObservable.decode(o, charset));
我想通过按行结束字符拆分将 Observable<ByteBuffer>
转换为行 (Observable<String>
)。如果我们有像toString
、concat
和splitByLine
这样的函数,我们必须能够像下面那样做:
Observable<ByteBuffer> o = ...;
o.map(toString).reduce(concat).flatMap(splitByLine);
然而,该算法需要先扫描整个字节并将它们存储在内存中,然后才能实际发出反序列化字符串的第一行。每次行结束符以递增方式出现在字节中时,如何发出新行?
我终于找到了rxjava-string. It provides us with Rx operators to handle a stream of chunked byte arrays and strings. The API document is found here。
题目可以通过以下方式实现:
Observable<byte[]> o = ...;
Charset charset = Charset.forName("UTF-8");
// StringObservable have no operators for ByteBuffer yet
StringObservable.byLine(StringObservable.decode(o, charset));