Netty 4 替代 LineBasedFrameDecoder
Netty 4 alternative to LineBasedFrameDecoder
是否有更有效的方法来搜索 ByteBuf
上的行分隔符序列?特别是我正在寻找一种方法来找到 \r\n
序列。
因为我想使用 bytebuf.forEachByte(ByteBufProcessor)
搜索 2 个字节是行不通的。
到目前为止我能找到的最简单的方法是使用 LineBasedFrameDecoder
。问题是我收到的一些消息可能非常大(在 MB 范围内),就像 Norman Maurer 提到的 here 在处理这么大的消息时循环遍历 ByteBuf
非常低效(可能需要几分钟才能完成)找到分隔符)。
您仍然可以搜索两个字节:
public class CrLfProcessor implements ByteBufProcessor{
private byte previousByte;
@Override
public boolean process(byte value) {
if(previousByte == '\r'){
if(value == '\n'){
return false;
}
}
previousByte = value;
return true;
}
}
这是测试各种优化的 JMH 基准测试:
Fork(1)
@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Measurement(iterations = 10)
@Warmup(iterations = 10)
@BenchmarkMode(Mode.AverageTime)
public class ByteBufProcessorBenchmark {
private static interface ByteProcessor {
boolean process(byte value);
}
private static final int DATA_SIZE = 1024 * 1024;
private byte[] data;
@Setup(Level.Trial)
public void setUp() {
data = new byte[DATA_SIZE];
Random random = new Random();
random.nextBytes(data);
}
@Benchmark
public void crFirst(Blackhole blackhole) {
ByteProcessor byteProcessor = new ByteProcessor() {
private byte previousByte;
@Override
public boolean process(byte value) {
if(previousByte == '\r'){
if(value == '\n'){
return false;
}
}
previousByte = value;
return true;
}
};
doProcess(byteProcessor, blackhole);
}
@Benchmark
public void lfFirst(Blackhole blackhole) {
ByteProcessor byteProcessor = new ByteProcessor() {
private byte previousByte;
@Override
public boolean process(byte value) {
if (value == '\n') {
if(previousByte == '\r'){
return false;
}
}
previousByte = value;
return true;
}
};
doProcess(byteProcessor, blackhole);
}
@Benchmark
public void crFirstUpdateCacheOnDemand(Blackhole blackhole) {
ByteProcessor byteProcessor = new ByteProcessor() {
private byte previousByte;
@Override
public boolean process(byte value) {
if(previousByte == '\r'){
if(value == '\n'){
return false;
}
previousByte = 0;
}else if(value == '\r'){
previousByte = value;
}
return true;
}
};
doProcess(byteProcessor, blackhole);
}
@Benchmark
public void lfFirstUpdateCacheOnDemand(Blackhole blackhole) {
ByteProcessor byteProcessor = new ByteProcessor() {
private byte previousByte;
@Override
public boolean process(byte value) {
if (value == '\n') {
if(previousByte == '\r'){
return false;
}
previousByte = 0;
}else if(value == '\r'){
previousByte = value;
}
return true;
}
};
doProcess(byteProcessor, blackhole);
}
@Benchmark
public void consume(Blackhole blackhole){
for(int i = 0; i < data.length; i++){
blackhole.consume(data[i]);
}
}
private void doProcess(ByteProcessor byteProcessor, Blackhole blackhole){
for(int i = 0; i < data.length; i++){
blackhole.consume(byteProcessor.process(data[i]));
}
}
}
结果如下:
# Run complete. Total time: 00:01:21
Benchmark Mode Cnt Score Error Units
ByteBufProcessorBenchmark.crFirst avgt 10 4,211 ± 0,061 ms/op
ByteBufProcessorBenchmark.crFirstUpdateCacheOnDemand avgt 10 4,285 ± 0,336 ms/op
ByteBufProcessorBenchmark.lfFirst avgt 10 4,375 ± 0,289 ms/op
ByteBufProcessorBenchmark.lfFirstUpdateCacheOnDemand avgt 10 4,129 ± 0,075 ms/op
ByteBufProcessorBenchmark.consume avgt 10 3,126 ± 0,152 ms/op
如您所见,最快的选项是 ByteBufProcessorBenchmark.lfFirstUpdateCacheOnDemand
,但与 ByteBufProcessorBenchmark.crFirst
的区别在于它并没有超过增加的复杂性。
还有你的性能要求是什么,因为恕我直言,每 MB 4 毫秒(包括你可以从结果中看到的黑洞需要 3 毫秒)一点也不慢;最后你每毫秒得到 1MB,这一点也不差。
是否有更有效的方法来搜索 ByteBuf
上的行分隔符序列?特别是我正在寻找一种方法来找到 \r\n
序列。
因为我想使用 bytebuf.forEachByte(ByteBufProcessor)
搜索 2 个字节是行不通的。
到目前为止我能找到的最简单的方法是使用 LineBasedFrameDecoder
。问题是我收到的一些消息可能非常大(在 MB 范围内),就像 Norman Maurer 提到的 here 在处理这么大的消息时循环遍历 ByteBuf
非常低效(可能需要几分钟才能完成)找到分隔符)。
您仍然可以搜索两个字节:
public class CrLfProcessor implements ByteBufProcessor{
private byte previousByte;
@Override
public boolean process(byte value) {
if(previousByte == '\r'){
if(value == '\n'){
return false;
}
}
previousByte = value;
return true;
}
}
这是测试各种优化的 JMH 基准测试:
Fork(1)
@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Measurement(iterations = 10)
@Warmup(iterations = 10)
@BenchmarkMode(Mode.AverageTime)
public class ByteBufProcessorBenchmark {
private static interface ByteProcessor {
boolean process(byte value);
}
private static final int DATA_SIZE = 1024 * 1024;
private byte[] data;
@Setup(Level.Trial)
public void setUp() {
data = new byte[DATA_SIZE];
Random random = new Random();
random.nextBytes(data);
}
@Benchmark
public void crFirst(Blackhole blackhole) {
ByteProcessor byteProcessor = new ByteProcessor() {
private byte previousByte;
@Override
public boolean process(byte value) {
if(previousByte == '\r'){
if(value == '\n'){
return false;
}
}
previousByte = value;
return true;
}
};
doProcess(byteProcessor, blackhole);
}
@Benchmark
public void lfFirst(Blackhole blackhole) {
ByteProcessor byteProcessor = new ByteProcessor() {
private byte previousByte;
@Override
public boolean process(byte value) {
if (value == '\n') {
if(previousByte == '\r'){
return false;
}
}
previousByte = value;
return true;
}
};
doProcess(byteProcessor, blackhole);
}
@Benchmark
public void crFirstUpdateCacheOnDemand(Blackhole blackhole) {
ByteProcessor byteProcessor = new ByteProcessor() {
private byte previousByte;
@Override
public boolean process(byte value) {
if(previousByte == '\r'){
if(value == '\n'){
return false;
}
previousByte = 0;
}else if(value == '\r'){
previousByte = value;
}
return true;
}
};
doProcess(byteProcessor, blackhole);
}
@Benchmark
public void lfFirstUpdateCacheOnDemand(Blackhole blackhole) {
ByteProcessor byteProcessor = new ByteProcessor() {
private byte previousByte;
@Override
public boolean process(byte value) {
if (value == '\n') {
if(previousByte == '\r'){
return false;
}
previousByte = 0;
}else if(value == '\r'){
previousByte = value;
}
return true;
}
};
doProcess(byteProcessor, blackhole);
}
@Benchmark
public void consume(Blackhole blackhole){
for(int i = 0; i < data.length; i++){
blackhole.consume(data[i]);
}
}
private void doProcess(ByteProcessor byteProcessor, Blackhole blackhole){
for(int i = 0; i < data.length; i++){
blackhole.consume(byteProcessor.process(data[i]));
}
}
}
结果如下:
# Run complete. Total time: 00:01:21
Benchmark Mode Cnt Score Error Units
ByteBufProcessorBenchmark.crFirst avgt 10 4,211 ± 0,061 ms/op
ByteBufProcessorBenchmark.crFirstUpdateCacheOnDemand avgt 10 4,285 ± 0,336 ms/op
ByteBufProcessorBenchmark.lfFirst avgt 10 4,375 ± 0,289 ms/op
ByteBufProcessorBenchmark.lfFirstUpdateCacheOnDemand avgt 10 4,129 ± 0,075 ms/op
ByteBufProcessorBenchmark.consume avgt 10 3,126 ± 0,152 ms/op
如您所见,最快的选项是 ByteBufProcessorBenchmark.lfFirstUpdateCacheOnDemand
,但与 ByteBufProcessorBenchmark.crFirst
的区别在于它并没有超过增加的复杂性。
还有你的性能要求是什么,因为恕我直言,每 MB 4 毫秒(包括你可以从结果中看到的黑洞需要 3 毫秒)一点也不慢;最后你每毫秒得到 1MB,这一点也不差。