Netty 管道中的多个 ChannelInboundHandlerAdapter
Multiple ChannelInboundHandlerAdapter's in Netty pipleline
我对 netty 很陌生,我想创建一个 TCP 服务器,它在实例化连接时执行自定义应用程序层握手。握手后,我想将消息 (ByteBuf) 传递到队列,以便其他线程可以处理它们。
我的问题是,我可以在通道管道中有多个 ChannelInboundHandlerAdapter 吗?一个用于应用层握手协议,另一个用于将消息传递到队列。此外,我想知道消息如何流经管道。如果一个处理程序(或 decoder/encoder)收到一条消息,它是如何传递给另一个处理程序的。
具体来说,如果我从 here 更改 EchoServer 并添加另一个 ChannelInboundHandlerAdapter,回显服务器处理程序将停止接收任何消息。
ServerBootstrap b = new ServerBootstrap();
b.group(group)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx,
Object msg) {
}
});
ch.pipeline().addLast(
new EchoServerHandler());
}
});
我的逻辑是:有 2 个 ChannelInboundHandlerAdapter,然后与第一个处理程序进行握手,如果不符合握手条件则丢弃数据包,然后通过第二个 ChannelInboundHandlerAdapter 将消息传递到队列。我的逻辑正确吗?如果不是应该怎么办?
非常感谢。
ChannelInboundHandlerAdapter
是 class 到 ChannelInBoundHandler
接口的适配器。一开始你可以使用 SimpleChannelInboundHandler
(或者更复杂的你可以扩展适配器 class 编写你自己的扩展 ChannelInboundHandlerAdapter
的处理程序)。
SimpleCHannelInboundHandler
在 channelRead()
之后自动释放消息(从而将其传递给 ChannelPipeline 中的下一个处理程序)。
要使用更简单 SimpleChannelInboundHandler
请参阅此线程 Netty hello world example not working
所以不用这个 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {}
你必须写一个新的 class 扩展 SimpleChannelInboundHandler
就像
public class MyHandler extends SimpleChannelInboundHandler{
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII));
} finally {
in.release();
}
}
}
并像
一样调用它
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MyHandler());
}
如上所述,SimpleCHannelInboundHandler
在 channelRead() 之后自动释放消息(从而将其传递给 ChannelPipeline 中的下一个处理程序)。
如果您使用 ChannelInboundHandlerAdapter
,您必须自己实现将 message/event 传递给下一个处理程序
处理程序必须调用 ChannelHandlerContext ctx
中的事件传播方法以将事件转发到其下一个处理程序。(在 SimpleChannelInboundHandler class 中这是尚未实施)
public class MyInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("Connected!");
ctx.fireChannelActive();
}
}
看到这个http://netty.io/4.0/api/io/netty/channel/ChannelPipeline.html
我必须提醒:
只能将一个 SimpleChannelInboundHandler 扩展添加到管道链。
因为 SimpleChannelInboundHandler 有一个 finally 代码块将释放所有消息。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I imsg = (I) msg;
channelRead0(ctx, imsg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (autoRelease && release) {
//release all handled messages,so the next handler won't be executed
ReferenceCountUtil.release(msg);**
}
}
}
改为使用 ChannelInboundHandlerAdapter:
public class CustomizeChannelInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("do something you like!")
super.channelRead(ctx, msg);
}
}
我对 netty 很陌生,我想创建一个 TCP 服务器,它在实例化连接时执行自定义应用程序层握手。握手后,我想将消息 (ByteBuf) 传递到队列,以便其他线程可以处理它们。
我的问题是,我可以在通道管道中有多个 ChannelInboundHandlerAdapter 吗?一个用于应用层握手协议,另一个用于将消息传递到队列。此外,我想知道消息如何流经管道。如果一个处理程序(或 decoder/encoder)收到一条消息,它是如何传递给另一个处理程序的。
具体来说,如果我从 here 更改 EchoServer 并添加另一个 ChannelInboundHandlerAdapter,回显服务器处理程序将停止接收任何消息。
ServerBootstrap b = new ServerBootstrap();
b.group(group)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx,
Object msg) {
}
});
ch.pipeline().addLast(
new EchoServerHandler());
}
});
我的逻辑是:有 2 个 ChannelInboundHandlerAdapter,然后与第一个处理程序进行握手,如果不符合握手条件则丢弃数据包,然后通过第二个 ChannelInboundHandlerAdapter 将消息传递到队列。我的逻辑正确吗?如果不是应该怎么办?
非常感谢。
ChannelInboundHandlerAdapter
是 class 到 ChannelInBoundHandler
接口的适配器。一开始你可以使用 SimpleChannelInboundHandler
(或者更复杂的你可以扩展适配器 class 编写你自己的扩展 ChannelInboundHandlerAdapter
的处理程序)。
SimpleCHannelInboundHandler
在 channelRead()
之后自动释放消息(从而将其传递给 ChannelPipeline 中的下一个处理程序)。
要使用更简单 SimpleChannelInboundHandler
请参阅此线程 Netty hello world example not working
所以不用这个 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {}
你必须写一个新的 class 扩展 SimpleChannelInboundHandler
就像
public class MyHandler extends SimpleChannelInboundHandler{
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII));
} finally {
in.release();
}
}
}
并像
一样调用它public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MyHandler());
}
如上所述,SimpleCHannelInboundHandler
在 channelRead() 之后自动释放消息(从而将其传递给 ChannelPipeline 中的下一个处理程序)。
如果您使用 ChannelInboundHandlerAdapter
,您必须自己实现将 message/event 传递给下一个处理程序
处理程序必须调用 ChannelHandlerContext ctx
中的事件传播方法以将事件转发到其下一个处理程序。(在 SimpleChannelInboundHandler class 中这是尚未实施)
public class MyInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("Connected!");
ctx.fireChannelActive();
}
}
看到这个http://netty.io/4.0/api/io/netty/channel/ChannelPipeline.html
我必须提醒:
只能将一个 SimpleChannelInboundHandler 扩展添加到管道链。 因为 SimpleChannelInboundHandler 有一个 finally 代码块将释放所有消息。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I imsg = (I) msg;
channelRead0(ctx, imsg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (autoRelease && release) {
//release all handled messages,so the next handler won't be executed
ReferenceCountUtil.release(msg);**
}
}
}
改为使用 ChannelInboundHandlerAdapter:
public class CustomizeChannelInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("do something you like!")
super.channelRead(ctx, msg);
}
}