Scala Netty ByteToMessageDecoder 子类从输入中删除字节并传递到下一个管道级别

Scala Netty ByteToMessageDecoder subclass remove bytes from input and pass to next pipeline level

我正在尝试为在 TCP 上接收到的字节流实现一个 netty 解码器。

这是当前的实现:

import io.netty.handler.codec.ByteToMessageDecoder
import io.netty.channel.ChannelHandlerContext
import io.netty.buffer.ByteBuf
import io.netty.handler.codec.compression.{ZlibCodecFactory,ZlibWrapper}
import java.util.List

object MyCustomDecoder extends ByteToMessageDecoder {

  val GZIP_HEADER_1 = 0x1F
  val GZIP_HEADER_2 = 0x8B

  def decode(ctx: ChannelHandlerContext,in: ByteBuf,out: List[AnyRef]): Unit = {

    // Read values in Little Endian
    val blockTime = java.lang.Long.reverseBytes(in.readLong()) // little endian
    val blockSeq = java.lang.Integer.reverseBytes(in.readInt()) // little endian
    val blockSize = java.lang.Integer.reverseBytes(in.readInt()) // little endian

    // Checks if compressed block size matches block size published value
    if(in.readableBytes() - 16 == blockSize){

      val h1 = in.getUnsignedByte(in.readerIndex())
      val h2 = in.getUnsignedByte(in.readerIndex()+1)

      //
      if(isGzip(h1, h2))
        enableGzip(ctx)
      else 
        System.err.println("Not in GZip format")      
    }    

  }

  // Check if header matches GZIP for decompression
  private def isGzip(h1: Int, h2: Int):Boolean = h1 == GZIP_HEADER_1 && h2 == GZIP_HEADER_2

  private def enableGzip(ctx: ChannelHandlerContext): Unit = {
    val p = ctx.pipeline()
    //p.addLast("gzipdeflater", ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP))
    p.addLast("gzipinflater", ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP))
    //p.remove(this)
  }

}

解码器应该使用 netty 实现接收消息,验证消息中的数据是否与消息头中发布的实际块大小相匹配,然后发送到下一级管道膨胀使用 GZip 压缩的内容。

我当前的实现能够读取数据,但我不确定如何从传递到下一个管道级别(即:blockTimeblockSeq, blockSize ) 以便有一个可行的消息被解压缩然后传递给结束消息处理程序。

我的数据定义如下: <blockTime><blockSequence><blockSize><block>

我基于 this example 为 netty 管道创建了这个解码器子类。

如有任何帮助,我们将不胜感激。

谢谢。

需要执行两个步骤:

  1. ByteBuf 对象中删除字节 (in)

示例中已经通过执行以下操作实现了这一点(read[Type]() 实质上移动了 ByteBuf 索引指针):

val blockTime = java.lang.Long.reverseBytes(in.readLong()) // little endian
val blockSeq = java.lang.Integer.reverseBytes(in.readInt()) // little endian
val blockSize = java.lang.Integer.reverseBytes(in.readInt()) // little endian

此外,可以通过发出以下语句从 ByteBuf 对象中删除之前的 ("used") 个字节:in.discardReadBytes()

  1. 将消息传递到下一个级别

假设管道中的下一个decoder/handler将一个ByteBuf作为输入(例如另一个ByteToMessageDecoder子类),这可以简单地实现如下:

out.add(in)

鉴于头字节已经在(1)中被移除,传递给下一个管道级别的是ByteBuf对象表示的剩余字节。

然后处理将根据初始化程序中定义的管道结构继续进行。