Scala Netty 如何为基于字节数据的协议创建一个简单的客户端?
Scala Netty How to create a simple client for byte data based protocol?
这个问题旨在介绍我在处理当前项目时遇到的问题。我将在下面回答并提出我的解决方案。
我正在从事一个项目,该项目要求我连接到一个数据馈送服务器,该服务器具有传输数据的专有协议,基本上以 GZIP 格式编码在 TCP 协议的数据部分中,需要提取。
数据提供者的数据协议 sample application 使用 Java 中的简单套接字。我想让它适应 scala/netty。此外,值得注意的是,所提供的数据可能分布在多个数据包中。
我一直在寻找关于如何使用 Netty.io 创建简单客户端应用程序的简单明了的示例,但所有示例似乎都过于复杂并且缺乏足够的解释来简单地实现此目的。
更重要的是,许多 netty/scala 示例都面向服务器应用程序。
“Getting Started”netty 教程也缺乏足够的解释,使实际入门时易于导航。
问题是,如何实现一个连接服务器、接收数据并解析结果的简单netty应用程序?
为了尝试理解这个概念,我查看了一些示例:
我 运行 在尝试将使用套接字的 java 应用程序复制到更复杂的使用 Netty 的方法时遇到了这个问题。
我解决问题的方法是了解建立连接所需的 netty 库的各种元素:
这 3 个元素确保连接的创建和管理以供进一步处理。
此外,使用 Netty 时还需要一些其他元素:
- 一个通道初始值设定项,通常是从 ChannelInitializer
继承而来的自定义对象
- 一个解码器,它可以是基于期望接收的消息类型的任何类型,这些通常是 ChannelInboundHandlerAdapter
的子类
- 编码器,类似于解码器,但用于传出消息,通常是 ChannelOutboundHandlerAdapter
的子类
- 一个Handler,本质上是告诉netty如何处理接收到的数据。
通道初始化器负责准备Pipeline,它本质上是将入站和出站数据通过一系列"filters"来处理不同级别的数据,每个级别接收前面encoder/decoder.
处理的数据
以下是 netty 文档中介绍的管道工作方式:
I/O Request
via Channel or
ChannelHandlerContext
|
+---------------------------------------------------+---------------+
| ChannelPipeline | |
| \|/ |
| +---------------------+ +-----------+----------+ |
| | Inbound Handler N | | Outbound Handler 1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler N-1 | | Outbound Handler 2 | |
| +----------+----------+ +-----------+----------+ |
| /|\ . |
| . . |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
| [ method call] [method call] |
| . . |
| . \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 2 | | Outbound Handler M-1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 1 | | Outbound Handler M | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
+---------------+-----------------------------------+---------------+
| \|/
+---------------+-----------------------------------+---------------+
| | | |
| [ Socket.read() ] [ Socket.write() ] |
| |
| Netty Internal I/O Threads (Transport Implementation) |
+-------------------------------------------------------------------+
就问题的原始上下文而言,没有允许使用预定字节解析自定义数据的预设解码器。从本质上讲,这意味着必须为入站数据创建自定义解码器。
让我们首先回顾一下作为客户端应用程序启动的连接的基础知识:
import io.netty.bootstrap.Bootstrap
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.channel.socket.SocketChannel
object App {
def main(args: Array[String]){
connect()
}
def connect() {
val host = "host.example.com"
val port = 9999
val group = new NioEventLoopGroup() // starts the event loop group
try {
var b = new Bootstrap() // creates the netty bootstrap
.group(group) // associates the NioEventLoopGroup to the bootstrap
.channel(classOf[NioSocketChannel]) // associates the channel to the bootstrap
.handler(MyChannelInitializer) // provides the handler for dealing with the incoming/outgoing data on the channel
var ch = b.connect(host, port).sync().channel() //initiates the connection to the server and links it to the netty channel
ch.writeAndFlush("SERVICE_REQUEST") // sends the request to the server
ch.closeFuture().sync() // keeps the connection alive instead of shutting down the channel after receiving the first packet
}
catch {
case t: Throwable => t.printStackTrace(); group.shutdownGracefully()
}
finally {
group.shutdownGracefully() // Shutdown the event group
}
}
}
MyChannelInitializer
在启动 bootstrap 时调用是负责告诉程序如何处理传入和传出数据消息的部分:
import io.netty.channel.ChannelInitializer
import io.netty.channel.socket.SocketChannel
import io.netty.handler.codec.string.StringEncoder
object MyChannelInitializer extends ChannelInitializer[SocketChannel] {
val STR_ENCODER = new StringEncoder // Generic StringEecoder from netty to simply allow a string to be prepared and sent out to the server
def initChannel(ch: SocketChannel) {
val pipeline = ch.pipeline() // loads the pipeline associated with the channel
// Decode Message
pipeline.addLast("packet-decoder",MyPacketDecoder) // first data "filter" to extract the necessary bytes for the second filter
pipeline.addLast("gzip-inflater", MyGZipDecoder) // second "filter" to unzip the contents
// Encode String to send
pipeline.addLast("command-encoder",STR_ENCODER) // String encoder for outgoing data
// Handler
pipeline.addLast("message-handler", MyMessageHandler) // Handles the end data after all "filters" have been applied
}
}
在这种情况下,第一个管道项 MyPacketDecoder
已创建为 ReplayingDecoder 的子类,它提供了一种执行数据包重建的简单方法,以便获得所有必要的字节要使用的消息。 (简单的说,等待所有字节收集到ByteBuf变量中再继续)
了解 ByteBuf 的工作原理对于此类应用程序非常重要,尤其是 read 和 get 方法之间的区别,它们分别允许读取和移动读取索引或仅读取数据而不影响reader 索引。
下面提供了 MyPacketDecoder
的示例
import io.netty.handler.codec.ReplayingDecoder
import io.netty.channel.ChannelHandlerContext
import io.netty.buffer.ByteBuf
import java.util.List
object MyPacketDecoder extends ReplayingDecoder[Int] {
val READ_HEADER = 0
val READ_CONTENT = 1
super.state(READ_HEADER) // sets the initial state of the Decoder by calling the superclass constructor
var blockSize:Int = 0 // size of the data expected, published by the received data from the server, will vary according to your case, there may be additional header bytes before the actual data to be processed
def decode(ctx: ChannelHandlerContext,in: ByteBuf,out: List[AnyRef]): Unit = {
var received_size = in.readableBytes()
if(state() == READ_HEADER){
blockSize = in.readInt() // header data with the size of the expected data to be received in the current and following packets if segmented
checkpoint(READ_CONTENT) // change the state of the object in order to proceed to obtaining all the required bytes necessary for the message to be valid
}
else if(state() == READ_CONTENT){
var bytes = new Array[Byte](blockSize)
in.getBytes(0,bytes,0,blockSize) // adds collected bytes to the by array for the expected size as defined by the blockSize variable
var frame = in.readBytes(blockSize) // creates the bytebuf to be passed to the next "filter"
checkpoint(READ_HEADER) // changes the state preparing for the next message
out.add(frame) // passes the data to the next "filter"
}
else {
throw new Error("Case not covered Exception")
}
}
}
前面的代码从所有数据包中获取接收到的字节,达到预期的字节大小,并将其传递到以下管道级别。
下一个管道级别处理接收到的数据的 GZIP 解压缩。这是由 MyGZipDecoder
对象确保的,该对象被定义为 ByteToMessageDecoder 抽象对象的子类,以便将字节信息作为接收到的数据进行处理:
import io.netty.handler.codec.ByteToMessageDecoder
import io.netty.channel.ChannelHandlerContext
import io.netty.buffer.ByteBuf
import java.net._
import java.io._
import java.util._
import java.util.zip._
import java.text._
object MyGZipDecoder extends ByteToMessageDecoder {
val MAX_DATA_SIZE = 100000
var inflater = new Inflater(true)
var compressedData = new Array[Byte](MAX_DATA_SIZE)
var uncompressedData = new Array[Byte](MAX_DATA_SIZE)
def decode(ctx: ChannelHandlerContext,in: ByteBuf,out: List[AnyRef]): Unit = {
var received_size = in.readableBytes() // reads the number of available bytes
in.readBytes(compressedData, 0, received_size) // puts the bytes into a Byte array
inflater.reset();
inflater.setInput(compressedData, 0, received_size) // prepares the inflater for decompression of the data
var resultLength = inflater.inflate(uncompressedData) // decompresses the data into the uncompressedData Byte array
var message = new String(uncompressedData) // generates a string from the uncompressed data
out.add(message) // passes the data to the next pipeline level
}
}
此解码器解压缩数据包中收到的压缩数据,并将数据作为从该级别收到的解码字节中获得的字符串发送到下一级。
拼图的最后一块是 MyMessageHandler
对象,它本质上对数据进行最终处理以达到应用程序所需的目的。这是 SimpleChannelInboundHandler 的子类,其字符串参数预期作为通道数据的消息:
import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
import io.netty.channel.ChannelHandler.Sharable
@Sharable
object QMMessageHandler extends SimpleChannelInboundHandler[String] {
def channelRead0(ctx: ChannelHandlerContext, msg: String) {
println("Handler => Received message: "+msg)
// Do your data processing here however you need for the application purposes
}
}
这基本上完成了连接到服务器的特定示例的要求,该服务器使用对基本数据包数据进行 GZip 压缩以专有数据协议提供数据。
希望这可以作为那些试图实现类似场景的人的良好基础,但主要思想是需要进行一些定制才能为专有协议创建适应性处理。
此外,值得注意的是,这种类型的实现并不是真正用于简单的客户端-服务器连接,而是用于需要 distributability/scalability 数据的应用程序,由 netty 库提供(即许多并发连接同时广播数据)。
对于在撰写此答案时可能遗漏的任何错误,我提前表示歉意。
我希望这个简短的教程可以帮助其他人,因为我个人不得不花费一些令人沮丧的时间从网上的点点滴滴中弄清楚。
这个问题旨在介绍我在处理当前项目时遇到的问题。我将在下面回答并提出我的解决方案。
我正在从事一个项目,该项目要求我连接到一个数据馈送服务器,该服务器具有传输数据的专有协议,基本上以 GZIP 格式编码在 TCP 协议的数据部分中,需要提取。
数据提供者的数据协议 sample application 使用 Java 中的简单套接字。我想让它适应 scala/netty。此外,值得注意的是,所提供的数据可能分布在多个数据包中。
我一直在寻找关于如何使用 Netty.io 创建简单客户端应用程序的简单明了的示例,但所有示例似乎都过于复杂并且缺乏足够的解释来简单地实现此目的。 更重要的是,许多 netty/scala 示例都面向服务器应用程序。
“Getting Started”netty 教程也缺乏足够的解释,使实际入门时易于导航。
问题是,如何实现一个连接服务器、接收数据并解析结果的简单netty应用程序?
为了尝试理解这个概念,我查看了一些示例:
我 运行 在尝试将使用套接字的 java 应用程序复制到更复杂的使用 Netty 的方法时遇到了这个问题。
我解决问题的方法是了解建立连接所需的 netty 库的各种元素:
这 3 个元素确保连接的创建和管理以供进一步处理。
此外,使用 Netty 时还需要一些其他元素:
- 一个通道初始值设定项,通常是从 ChannelInitializer 继承而来的自定义对象
- 一个解码器,它可以是基于期望接收的消息类型的任何类型,这些通常是 ChannelInboundHandlerAdapter 的子类
- 编码器,类似于解码器,但用于传出消息,通常是 ChannelOutboundHandlerAdapter 的子类
- 一个Handler,本质上是告诉netty如何处理接收到的数据。
通道初始化器负责准备Pipeline,它本质上是将入站和出站数据通过一系列"filters"来处理不同级别的数据,每个级别接收前面encoder/decoder.
处理的数据以下是 netty 文档中介绍的管道工作方式:
I/O Request via Channel or ChannelHandlerContext | +---------------------------------------------------+---------------+ | ChannelPipeline | | | \|/ | | +---------------------+ +-----------+----------+ | | | Inbound Handler N | | Outbound Handler 1 | | | +----------+----------+ +-----------+----------+ | | /|\ | | | | \|/ | | +----------+----------+ +-----------+----------+ | | | Inbound Handler N-1 | | Outbound Handler 2 | | | +----------+----------+ +-----------+----------+ | | /|\ . | | . . | | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()| | [ method call] [method call] | | . . | | . \|/ | | +----------+----------+ +-----------+----------+ | | | Inbound Handler 2 | | Outbound Handler M-1 | | | +----------+----------+ +-----------+----------+ | | /|\ | | | | \|/ | | +----------+----------+ +-----------+----------+ | | | Inbound Handler 1 | | Outbound Handler M | | | +----------+----------+ +-----------+----------+ | | /|\ | | +---------------+-----------------------------------+---------------+ | \|/ +---------------+-----------------------------------+---------------+ | | | | | [ Socket.read() ] [ Socket.write() ] | | | | Netty Internal I/O Threads (Transport Implementation) | +-------------------------------------------------------------------+
就问题的原始上下文而言,没有允许使用预定字节解析自定义数据的预设解码器。从本质上讲,这意味着必须为入站数据创建自定义解码器。
让我们首先回顾一下作为客户端应用程序启动的连接的基础知识:
import io.netty.bootstrap.Bootstrap import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.nio.NioSocketChannel import io.netty.channel.socket.SocketChannel object App { def main(args: Array[String]){ connect() } def connect() { val host = "host.example.com" val port = 9999 val group = new NioEventLoopGroup() // starts the event loop group try { var b = new Bootstrap() // creates the netty bootstrap .group(group) // associates the NioEventLoopGroup to the bootstrap .channel(classOf[NioSocketChannel]) // associates the channel to the bootstrap .handler(MyChannelInitializer) // provides the handler for dealing with the incoming/outgoing data on the channel var ch = b.connect(host, port).sync().channel() //initiates the connection to the server and links it to the netty channel ch.writeAndFlush("SERVICE_REQUEST") // sends the request to the server ch.closeFuture().sync() // keeps the connection alive instead of shutting down the channel after receiving the first packet } catch { case t: Throwable => t.printStackTrace(); group.shutdownGracefully() } finally { group.shutdownGracefully() // Shutdown the event group } } }
MyChannelInitializer
在启动 bootstrap 时调用是负责告诉程序如何处理传入和传出数据消息的部分:
import io.netty.channel.ChannelInitializer import io.netty.channel.socket.SocketChannel import io.netty.handler.codec.string.StringEncoder object MyChannelInitializer extends ChannelInitializer[SocketChannel] { val STR_ENCODER = new StringEncoder // Generic StringEecoder from netty to simply allow a string to be prepared and sent out to the server def initChannel(ch: SocketChannel) { val pipeline = ch.pipeline() // loads the pipeline associated with the channel // Decode Message pipeline.addLast("packet-decoder",MyPacketDecoder) // first data "filter" to extract the necessary bytes for the second filter pipeline.addLast("gzip-inflater", MyGZipDecoder) // second "filter" to unzip the contents // Encode String to send pipeline.addLast("command-encoder",STR_ENCODER) // String encoder for outgoing data // Handler pipeline.addLast("message-handler", MyMessageHandler) // Handles the end data after all "filters" have been applied } }
在这种情况下,第一个管道项 MyPacketDecoder
已创建为 ReplayingDecoder 的子类,它提供了一种执行数据包重建的简单方法,以便获得所有必要的字节要使用的消息。 (简单的说,等待所有字节收集到ByteBuf变量中再继续)
了解 ByteBuf 的工作原理对于此类应用程序非常重要,尤其是 read 和 get 方法之间的区别,它们分别允许读取和移动读取索引或仅读取数据而不影响reader 索引。
下面提供了 MyPacketDecoder
的示例
import io.netty.handler.codec.ReplayingDecoder import io.netty.channel.ChannelHandlerContext import io.netty.buffer.ByteBuf import java.util.List object MyPacketDecoder extends ReplayingDecoder[Int] { val READ_HEADER = 0 val READ_CONTENT = 1 super.state(READ_HEADER) // sets the initial state of the Decoder by calling the superclass constructor var blockSize:Int = 0 // size of the data expected, published by the received data from the server, will vary according to your case, there may be additional header bytes before the actual data to be processed def decode(ctx: ChannelHandlerContext,in: ByteBuf,out: List[AnyRef]): Unit = { var received_size = in.readableBytes() if(state() == READ_HEADER){ blockSize = in.readInt() // header data with the size of the expected data to be received in the current and following packets if segmented checkpoint(READ_CONTENT) // change the state of the object in order to proceed to obtaining all the required bytes necessary for the message to be valid } else if(state() == READ_CONTENT){ var bytes = new Array[Byte](blockSize) in.getBytes(0,bytes,0,blockSize) // adds collected bytes to the by array for the expected size as defined by the blockSize variable var frame = in.readBytes(blockSize) // creates the bytebuf to be passed to the next "filter" checkpoint(READ_HEADER) // changes the state preparing for the next message out.add(frame) // passes the data to the next "filter" } else { throw new Error("Case not covered Exception") } } }
前面的代码从所有数据包中获取接收到的字节,达到预期的字节大小,并将其传递到以下管道级别。
下一个管道级别处理接收到的数据的 GZIP 解压缩。这是由 MyGZipDecoder
对象确保的,该对象被定义为 ByteToMessageDecoder 抽象对象的子类,以便将字节信息作为接收到的数据进行处理:
import io.netty.handler.codec.ByteToMessageDecoder import io.netty.channel.ChannelHandlerContext import io.netty.buffer.ByteBuf import java.net._ import java.io._ import java.util._ import java.util.zip._ import java.text._ object MyGZipDecoder extends ByteToMessageDecoder { val MAX_DATA_SIZE = 100000 var inflater = new Inflater(true) var compressedData = new Array[Byte](MAX_DATA_SIZE) var uncompressedData = new Array[Byte](MAX_DATA_SIZE) def decode(ctx: ChannelHandlerContext,in: ByteBuf,out: List[AnyRef]): Unit = { var received_size = in.readableBytes() // reads the number of available bytes in.readBytes(compressedData, 0, received_size) // puts the bytes into a Byte array inflater.reset(); inflater.setInput(compressedData, 0, received_size) // prepares the inflater for decompression of the data var resultLength = inflater.inflate(uncompressedData) // decompresses the data into the uncompressedData Byte array var message = new String(uncompressedData) // generates a string from the uncompressed data out.add(message) // passes the data to the next pipeline level } }
此解码器解压缩数据包中收到的压缩数据,并将数据作为从该级别收到的解码字节中获得的字符串发送到下一级。
拼图的最后一块是 MyMessageHandler
对象,它本质上对数据进行最终处理以达到应用程序所需的目的。这是 SimpleChannelInboundHandler 的子类,其字符串参数预期作为通道数据的消息:
import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler} import io.netty.channel.ChannelHandler.Sharable @Sharable object QMMessageHandler extends SimpleChannelInboundHandler[String] { def channelRead0(ctx: ChannelHandlerContext, msg: String) { println("Handler => Received message: "+msg) // Do your data processing here however you need for the application purposes } }
这基本上完成了连接到服务器的特定示例的要求,该服务器使用对基本数据包数据进行 GZip 压缩以专有数据协议提供数据。
希望这可以作为那些试图实现类似场景的人的良好基础,但主要思想是需要进行一些定制才能为专有协议创建适应性处理。
此外,值得注意的是,这种类型的实现并不是真正用于简单的客户端-服务器连接,而是用于需要 distributability/scalability 数据的应用程序,由 netty 库提供(即许多并发连接同时广播数据)。
对于在撰写此答案时可能遗漏的任何错误,我提前表示歉意。
我希望这个简短的教程可以帮助其他人,因为我个人不得不花费一些令人沮丧的时间从网上的点点滴滴中弄清楚。