Netty TCP 套接字输入流

Netty TCP Socket InputStream

Netty TCP 服务器 运行 在端口 8000 接收 NMEA 格式数据。它使用 Marine API 库将乱码转换为需要来自套接字的输入流的有意义的信息。

SentenceReader sentenceReader = new SentenceReader(socket.getInputStream());
sentenceReader.addSentenceListener(new MultiSentenceListener());
sentenceReader.start();

如何获取正在使用的 netty 服务器端口的输入流?

你做不到,因为 InputStream 是阻塞的,而 netty 是异步非阻塞的 API。

SentenceReader 没有任何方法接受 "streamed in" 数据,但是通过 subclassing 可以让它接受数据。

SentenceReader 的核心使用 DataReader 作为其数据,通常这个数据读取器是从一个单独的线程 SentenceReader 本身轮询的,我们可以修改这个结构来获得我们想要的需要。

首先,我们用我们自己的 class 子class SentenceReader,给它适当的构造函数和我们想要的方法,并删除启动和停止方法的影响。我们现在提供 null 作为文件(并希望未来的版本提供一种直接传递数据读取器的方法)

public class NettySentenceReader extends SentenceReader {
    public NettySentenceReader () {
        super((InputStream)null);
    }

    @Override
    public void start() {
    }

    @Override
    public void stop() {
    }
}

我们现在需要在我们自己的 Netty 处理程序中实现内部 class DataReader 的所有功能,以复制相同的行为

public class SentenceReaderHandler extends
         SimpleChannelInboundHandler<String> {
    private SentenceFactory factory;
    private SentenceReader parent;

    public SentenceReaderHandler (SentenceReader parent) {
        this.parent = parent;
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) {
        if(!ctx.channel().isActive())
            return;
        //ActivityMonitor monitor = new ActivityMonitor(parent);
        this.factory = SentenceFactory.getInstance();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        //ActivityMonitor monitor = new ActivityMonitor(parent);
        this.factory = SentenceFactory.getInstance();
    }

    @Override
    // This method will be renamed to `messageReceived` in Netty 5.0.0
    protected void channelRead0(ChannelHandlerContext ctx, String data)
             throws Exception {
        if (SentenceValidator.isValid(data)) {
            monitor.refresh();
            Sentence s = factory.createParser(data);
            parent.fireSentenceEvent(s);
        } else if (!SentenceValidator.isSentence(data)) {
            parent.fireDataEvent(data);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        //monitor.reset();
        parent.fireReadingStopped();
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) {
        if(!ctx.channel().isActive())
            return;
        //monitor.reset();
        parent.fireReadingStopped();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
        parent.handleException("Data read failed", e);
    }
}

最后,我们需要将其集成到 Netty 管道中:

SentenceReader reader = new NettySentenceReader();
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    private static final StringDecoder DECODER = new StringDecoder();
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
        pipeline.addLast(DECODER);
        pipeline.addLast(new SentenceReaderHandler(reader)); 
    }
});