Scala Netty ByteToMessageDecoder 子类从输入中删除字节并传递到下一个管道级别
Scala Netty ByteToMessageDecoder subclass remove bytes from input and pass to next pipeline level
我正在尝试为在 TCP 上接收到的字节流实现一个 netty 解码器。
这是当前的实现:
import io.netty.handler.codec.ByteToMessageDecoder
import io.netty.channel.ChannelHandlerContext
import io.netty.buffer.ByteBuf
import io.netty.handler.codec.compression.{ZlibCodecFactory,ZlibWrapper}
import java.util.List
object MyCustomDecoder extends ByteToMessageDecoder {
val GZIP_HEADER_1 = 0x1F
val GZIP_HEADER_2 = 0x8B
def decode(ctx: ChannelHandlerContext,in: ByteBuf,out: List[AnyRef]): Unit = {
// Read values in Little Endian
val blockTime = java.lang.Long.reverseBytes(in.readLong()) // little endian
val blockSeq = java.lang.Integer.reverseBytes(in.readInt()) // little endian
val blockSize = java.lang.Integer.reverseBytes(in.readInt()) // little endian
// Checks if compressed block size matches block size published value
if(in.readableBytes() - 16 == blockSize){
val h1 = in.getUnsignedByte(in.readerIndex())
val h2 = in.getUnsignedByte(in.readerIndex()+1)
//
if(isGzip(h1, h2))
enableGzip(ctx)
else
System.err.println("Not in GZip format")
}
}
// Check if header matches GZIP for decompression
private def isGzip(h1: Int, h2: Int):Boolean = h1 == GZIP_HEADER_1 && h2 == GZIP_HEADER_2
private def enableGzip(ctx: ChannelHandlerContext): Unit = {
val p = ctx.pipeline()
//p.addLast("gzipdeflater", ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP))
p.addLast("gzipinflater", ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP))
//p.remove(this)
}
}
解码器应该使用 netty 实现接收消息,验证消息中的数据是否与消息头中发布的实际块大小相匹配,然后发送到下一级管道膨胀使用 GZip 压缩的内容。
我当前的实现能够读取数据,但我不确定如何从传递到下一个管道级别(即:blockTime
、blockSeq
, blockSize
) 以便有一个可行的消息被解压缩然后传递给结束消息处理程序。
我的数据定义如下:
<blockTime><blockSequence><blockSize><block>
我基于 this example 为 netty 管道创建了这个解码器子类。
如有任何帮助,我们将不胜感激。
谢谢。
需要执行两个步骤:
- 从 ByteBuf 对象中删除字节 (in)
示例中已经通过执行以下操作实现了这一点(read[Type]()
实质上移动了 ByteBuf 索引指针):
val blockTime = java.lang.Long.reverseBytes(in.readLong()) // little endian
val blockSeq = java.lang.Integer.reverseBytes(in.readInt()) // little endian
val blockSize = java.lang.Integer.reverseBytes(in.readInt()) // little endian
此外,可以通过发出以下语句从 ByteBuf 对象中删除之前的 ("used") 个字节:in.discardReadBytes()
- 将消息传递到下一个级别
假设管道中的下一个decoder/handler将一个ByteBuf作为输入(例如另一个ByteToMessageDecoder子类),这可以简单地实现如下:
out.add(in)
鉴于头字节已经在(1)中被移除,传递给下一个管道级别的是ByteBuf对象表示的剩余字节。
然后处理将根据初始化程序中定义的管道结构继续进行。
我正在尝试为在 TCP 上接收到的字节流实现一个 netty 解码器。
这是当前的实现:
import io.netty.handler.codec.ByteToMessageDecoder
import io.netty.channel.ChannelHandlerContext
import io.netty.buffer.ByteBuf
import io.netty.handler.codec.compression.{ZlibCodecFactory,ZlibWrapper}
import java.util.List
object MyCustomDecoder extends ByteToMessageDecoder {
val GZIP_HEADER_1 = 0x1F
val GZIP_HEADER_2 = 0x8B
def decode(ctx: ChannelHandlerContext,in: ByteBuf,out: List[AnyRef]): Unit = {
// Read values in Little Endian
val blockTime = java.lang.Long.reverseBytes(in.readLong()) // little endian
val blockSeq = java.lang.Integer.reverseBytes(in.readInt()) // little endian
val blockSize = java.lang.Integer.reverseBytes(in.readInt()) // little endian
// Checks if compressed block size matches block size published value
if(in.readableBytes() - 16 == blockSize){
val h1 = in.getUnsignedByte(in.readerIndex())
val h2 = in.getUnsignedByte(in.readerIndex()+1)
//
if(isGzip(h1, h2))
enableGzip(ctx)
else
System.err.println("Not in GZip format")
}
}
// Check if header matches GZIP for decompression
private def isGzip(h1: Int, h2: Int):Boolean = h1 == GZIP_HEADER_1 && h2 == GZIP_HEADER_2
private def enableGzip(ctx: ChannelHandlerContext): Unit = {
val p = ctx.pipeline()
//p.addLast("gzipdeflater", ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP))
p.addLast("gzipinflater", ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP))
//p.remove(this)
}
}
解码器应该使用 netty 实现接收消息,验证消息中的数据是否与消息头中发布的实际块大小相匹配,然后发送到下一级管道膨胀使用 GZip 压缩的内容。
我当前的实现能够读取数据,但我不确定如何从传递到下一个管道级别(即:blockTime
、blockSeq
, blockSize
) 以便有一个可行的消息被解压缩然后传递给结束消息处理程序。
我的数据定义如下:
<blockTime><blockSequence><blockSize><block>
我基于 this example 为 netty 管道创建了这个解码器子类。
如有任何帮助,我们将不胜感激。
谢谢。
需要执行两个步骤:
- 从 ByteBuf 对象中删除字节 (in)
示例中已经通过执行以下操作实现了这一点(read[Type]()
实质上移动了 ByteBuf 索引指针):
val blockTime = java.lang.Long.reverseBytes(in.readLong()) // little endian
val blockSeq = java.lang.Integer.reverseBytes(in.readInt()) // little endian
val blockSize = java.lang.Integer.reverseBytes(in.readInt()) // little endian
此外,可以通过发出以下语句从 ByteBuf 对象中删除之前的 ("used") 个字节:in.discardReadBytes()
- 将消息传递到下一个级别
假设管道中的下一个decoder/handler将一个ByteBuf作为输入(例如另一个ByteToMessageDecoder子类),这可以简单地实现如下:
out.add(in)
鉴于头字节已经在(1)中被移除,传递给下一个管道级别的是ByteBuf对象表示的剩余字节。
然后处理将根据初始化程序中定义的管道结构继续进行。