Netty TCP 套接字输入流
Netty TCP Socket InputStream
Netty TCP 服务器 运行 在端口 8000
接收 NMEA
格式数据。它使用 Marine API 库将乱码转换为需要来自套接字的输入流的有意义的信息。
SentenceReader sentenceReader = new SentenceReader(socket.getInputStream());
sentenceReader.addSentenceListener(new MultiSentenceListener());
sentenceReader.start();
如何获取正在使用的 netty 服务器端口的输入流?
你做不到,因为 InputStream 是阻塞的,而 netty 是异步非阻塞的 API。
SentenceReader
没有任何方法接受 "streamed in" 数据,但是通过 subclassing 可以让它接受数据。
SentenceReader
的核心使用 DataReader
作为其数据,通常这个数据读取器是从一个单独的线程 SentenceReader
本身轮询的,我们可以修改这个结构来获得我们想要的需要。
首先,我们用我们自己的 class 子class SentenceReader
,给它适当的构造函数和我们想要的方法,并删除启动和停止方法的影响。我们现在提供 null
作为文件(并希望未来的版本提供一种直接传递数据读取器的方法)
public class NettySentenceReader extends SentenceReader {
public NettySentenceReader () {
super((InputStream)null);
}
@Override
public void start() {
}
@Override
public void stop() {
}
}
我们现在需要在我们自己的 Netty 处理程序中实现内部 class DataReader
的所有功能,以复制相同的行为
public class SentenceReaderHandler extends
SimpleChannelInboundHandler<String> {
private SentenceFactory factory;
private SentenceReader parent;
public SentenceReaderHandler (SentenceReader parent) {
this.parent = parent;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
if(!ctx.channel().isActive())
return;
//ActivityMonitor monitor = new ActivityMonitor(parent);
this.factory = SentenceFactory.getInstance();
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
//ActivityMonitor monitor = new ActivityMonitor(parent);
this.factory = SentenceFactory.getInstance();
}
@Override
// This method will be renamed to `messageReceived` in Netty 5.0.0
protected void channelRead0(ChannelHandlerContext ctx, String data)
throws Exception {
if (SentenceValidator.isValid(data)) {
monitor.refresh();
Sentence s = factory.createParser(data);
parent.fireSentenceEvent(s);
} else if (!SentenceValidator.isSentence(data)) {
parent.fireDataEvent(data);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
//monitor.reset();
parent.fireReadingStopped();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) {
if(!ctx.channel().isActive())
return;
//monitor.reset();
parent.fireReadingStopped();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
parent.handleException("Data read failed", e);
}
}
最后,我们需要将其集成到 Netty 管道中:
SentenceReader reader = new NettySentenceReader();
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
private static final StringDecoder DECODER = new StringDecoder();
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(DECODER);
pipeline.addLast(new SentenceReaderHandler(reader));
}
});
Netty TCP 服务器 运行 在端口 8000
接收 NMEA
格式数据。它使用 Marine API 库将乱码转换为需要来自套接字的输入流的有意义的信息。
SentenceReader sentenceReader = new SentenceReader(socket.getInputStream());
sentenceReader.addSentenceListener(new MultiSentenceListener());
sentenceReader.start();
如何获取正在使用的 netty 服务器端口的输入流?
你做不到,因为 InputStream 是阻塞的,而 netty 是异步非阻塞的 API。
SentenceReader
没有任何方法接受 "streamed in" 数据,但是通过 subclassing 可以让它接受数据。
SentenceReader
的核心使用 DataReader
作为其数据,通常这个数据读取器是从一个单独的线程 SentenceReader
本身轮询的,我们可以修改这个结构来获得我们想要的需要。
首先,我们用我们自己的 class 子class SentenceReader
,给它适当的构造函数和我们想要的方法,并删除启动和停止方法的影响。我们现在提供 null
作为文件(并希望未来的版本提供一种直接传递数据读取器的方法)
public class NettySentenceReader extends SentenceReader {
public NettySentenceReader () {
super((InputStream)null);
}
@Override
public void start() {
}
@Override
public void stop() {
}
}
我们现在需要在我们自己的 Netty 处理程序中实现内部 class DataReader
的所有功能,以复制相同的行为
public class SentenceReaderHandler extends
SimpleChannelInboundHandler<String> {
private SentenceFactory factory;
private SentenceReader parent;
public SentenceReaderHandler (SentenceReader parent) {
this.parent = parent;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
if(!ctx.channel().isActive())
return;
//ActivityMonitor monitor = new ActivityMonitor(parent);
this.factory = SentenceFactory.getInstance();
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
//ActivityMonitor monitor = new ActivityMonitor(parent);
this.factory = SentenceFactory.getInstance();
}
@Override
// This method will be renamed to `messageReceived` in Netty 5.0.0
protected void channelRead0(ChannelHandlerContext ctx, String data)
throws Exception {
if (SentenceValidator.isValid(data)) {
monitor.refresh();
Sentence s = factory.createParser(data);
parent.fireSentenceEvent(s);
} else if (!SentenceValidator.isSentence(data)) {
parent.fireDataEvent(data);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
//monitor.reset();
parent.fireReadingStopped();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) {
if(!ctx.channel().isActive())
return;
//monitor.reset();
parent.fireReadingStopped();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
parent.handleException("Data read failed", e);
}
}
最后,我们需要将其集成到 Netty 管道中:
SentenceReader reader = new NettySentenceReader();
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
private static final StringDecoder DECODER = new StringDecoder();
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(DECODER);
pipeline.addLast(new SentenceReaderHandler(reader));
}
});