Netty - Client/Server 聊天
Netty - Client/Server chat
出于我的项目目的之一,我需要在 netty 中进行 client/server 通信。所以我刚开始动手实践以改进 that.I 正在学习 netty,我是这方面的初学者。
我尝试了一个简单的客户端服务器与 netty 聊天。
客户端和服务器正在初始化,我可以看到服务器能够获取客户端管道以建立连接,但是当客户端发送消息时,它没有进入 ServerAdapterHandler 的 messageReceived 部分。下面是我的源代码,
客户:
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class ContainerClient {
String server;
int port;
int containerPort;
public ContainerClient(String server, int port, int containerPort) {
this.server = server;
this.port = port;
this.containerPort = containerPort;
}
public static void main(String[] args) {
String server = "localhost";
int port = 5252;
int containerPort = 8094;
new ContainerClient(server, port, containerPort).start();
}
public void start() {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap().group(group)
.channel(NioSocketChannel.class)
.handler(new ClientAdapterInitializer());
Channel channel = bootstrap.connect(server, port).sync().channel();
channel.write("Hi\n");
channel.write("Hi\n");
channel.write("Hi\n");
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}
客户端通道初始化程序:
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class ClientAdapterInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new ClientAdapterHandler());
}
}
客户端消息处理程序:
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
public class ClientAdapterHandler extends
ChannelInboundMessageHandlerAdapter<String> {
@Override
public void messageReceived(ChannelHandlerContext context, String message)
throws Exception {
System.out.println(message);
if (message.equals("quit"))
throw new ServerEndedException("Server is closed");
}
@Override
public void channelRead(ChannelHandlerContext arg0, Object arg1)
throws Exception {
// TODO Auto-generated method stub
}
@Override
public void channelReadComplete(ChannelHandlerContext arg0)
throws Exception {
// TODO Auto-generated method stub
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext arg0)
throws Exception {
// TODO Auto-generated method stub
}
}
服务器:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class ContainerServer {
int port;
public static void main(String[] args) {
new ContainerServer().start();
}
public void start() {
port = 5252;
EventLoopGroup producer = new NioEventLoopGroup();
EventLoopGroup consumer = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap()
.group(producer, consumer)
.channel(NioServerSocketChannel.class)
.childHandler(new ServerAdapterInitializer());
System.out.println("Server started");
bootstrap.bind(port).sync().channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdownGracefully();
consumer.shutdownGracefully();
}
}
}
服务器通道初始化程序:
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class ServerAdapterInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new ServerAdapterHandler());
}
}
服务器消息处理程序:
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
public class ServerAdapterHandler extends
ChannelInboundMessageHandlerAdapter<String> {
private static final ChannelGroup channels = new DefaultChannelGroup(
"containers", GlobalEventExecutor.INSTANCE);
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("[START] New Container has been initialzed");
channels.add(ctx.channel());
super.handlerAdded(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("[END] A Container has been removed");
channels.remove(ctx.channel());
super.handlerRemoved(ctx);
}
@Override
public void messageReceived(ChannelHandlerContext ctx, String arg1)
throws Exception {
Channel currentChannel = ctx.channel();
System.out.println("[INFO] - " + currentChannel.remoteAddress() + " - "
+ arg1);
currentChannel.write("[Server] - Success");
}
@Override
public boolean beginMessageReceived(ChannelHandlerContext ctx)
throws Exception {
System.out.println("Message received");
return super.beginMessageReceived(ctx);
}
@Override
public void channelRead(ChannelHandlerContext arg0, Object arg1)
throws Exception {
System.out.println("channelRead");
}
@Override
public void channelReadComplete(ChannelHandlerContext arg0)
throws Exception {
// TODO Auto-generated method stub
System.out.println("channelReadComplete");
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext arg0)
throws Exception {
// TODO Auto-generated method stub
System.out.println("channelWritabilityChanged");
}
}
以下是我在服务器中获得的输出,而在客户端中没有任何输出:
Server started
[START] New Container has been initialzed
channelReadComplete
[END] A Container has been removed
但预期应该是,
Server started
[START] New Container has been initialzed
channelReadComplete
[INFO] - localhost - Hi
[INFO] - localhost - Hi
[INFO] - localhost - Hi
[END] A Container has been removed
我应该在客户端得到响应,
[Server] - Success
[Server] - Success
[Server] - Success
我也尝试在 framer 中使用 line Delimter,但结果相同。
有人可以帮忙吗?
提前致谢!!
我对 ContainerClient
中的 start
方法进行以下更改对我有用。只需添加一个 channel.flush()
.
public void start() {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ClientAdapterInitializer());
Channel channel = bootstrap.connect(server, port).sync().channel();
channel.write("Hi\n");
channel.write("Hi\n");
channel.write("Hi\n");
channel.flush();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
ChannelInboundMessageHandlerAdapter
在 4.0 的较新版本中不再存在。我用了 SimpleChannelInboundHandler
.
public class ServerAdapterHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
Channel currentChannel = ctx.channel();
System.out.println("[INFO] - " + currentChannel.remoteAddress() + " - " + msg);
currentChannel.write("[Server] - Success");
}
}
出于我的项目目的之一,我需要在 netty 中进行 client/server 通信。所以我刚开始动手实践以改进 that.I 正在学习 netty,我是这方面的初学者。
我尝试了一个简单的客户端服务器与 netty 聊天。
客户端和服务器正在初始化,我可以看到服务器能够获取客户端管道以建立连接,但是当客户端发送消息时,它没有进入 ServerAdapterHandler 的 messageReceived 部分。下面是我的源代码,
客户:
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class ContainerClient {
String server;
int port;
int containerPort;
public ContainerClient(String server, int port, int containerPort) {
this.server = server;
this.port = port;
this.containerPort = containerPort;
}
public static void main(String[] args) {
String server = "localhost";
int port = 5252;
int containerPort = 8094;
new ContainerClient(server, port, containerPort).start();
}
public void start() {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap().group(group)
.channel(NioSocketChannel.class)
.handler(new ClientAdapterInitializer());
Channel channel = bootstrap.connect(server, port).sync().channel();
channel.write("Hi\n");
channel.write("Hi\n");
channel.write("Hi\n");
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}
客户端通道初始化程序:
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class ClientAdapterInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new ClientAdapterHandler());
}
}
客户端消息处理程序:
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
public class ClientAdapterHandler extends
ChannelInboundMessageHandlerAdapter<String> {
@Override
public void messageReceived(ChannelHandlerContext context, String message)
throws Exception {
System.out.println(message);
if (message.equals("quit"))
throw new ServerEndedException("Server is closed");
}
@Override
public void channelRead(ChannelHandlerContext arg0, Object arg1)
throws Exception {
// TODO Auto-generated method stub
}
@Override
public void channelReadComplete(ChannelHandlerContext arg0)
throws Exception {
// TODO Auto-generated method stub
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext arg0)
throws Exception {
// TODO Auto-generated method stub
}
}
服务器:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class ContainerServer {
int port;
public static void main(String[] args) {
new ContainerServer().start();
}
public void start() {
port = 5252;
EventLoopGroup producer = new NioEventLoopGroup();
EventLoopGroup consumer = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap()
.group(producer, consumer)
.channel(NioServerSocketChannel.class)
.childHandler(new ServerAdapterInitializer());
System.out.println("Server started");
bootstrap.bind(port).sync().channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdownGracefully();
consumer.shutdownGracefully();
}
}
}
服务器通道初始化程序:
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class ServerAdapterInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new ServerAdapterHandler());
}
}
服务器消息处理程序:
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
public class ServerAdapterHandler extends
ChannelInboundMessageHandlerAdapter<String> {
private static final ChannelGroup channels = new DefaultChannelGroup(
"containers", GlobalEventExecutor.INSTANCE);
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("[START] New Container has been initialzed");
channels.add(ctx.channel());
super.handlerAdded(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("[END] A Container has been removed");
channels.remove(ctx.channel());
super.handlerRemoved(ctx);
}
@Override
public void messageReceived(ChannelHandlerContext ctx, String arg1)
throws Exception {
Channel currentChannel = ctx.channel();
System.out.println("[INFO] - " + currentChannel.remoteAddress() + " - "
+ arg1);
currentChannel.write("[Server] - Success");
}
@Override
public boolean beginMessageReceived(ChannelHandlerContext ctx)
throws Exception {
System.out.println("Message received");
return super.beginMessageReceived(ctx);
}
@Override
public void channelRead(ChannelHandlerContext arg0, Object arg1)
throws Exception {
System.out.println("channelRead");
}
@Override
public void channelReadComplete(ChannelHandlerContext arg0)
throws Exception {
// TODO Auto-generated method stub
System.out.println("channelReadComplete");
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext arg0)
throws Exception {
// TODO Auto-generated method stub
System.out.println("channelWritabilityChanged");
}
}
以下是我在服务器中获得的输出,而在客户端中没有任何输出:
Server started
[START] New Container has been initialzed
channelReadComplete
[END] A Container has been removed
但预期应该是,
Server started
[START] New Container has been initialzed
channelReadComplete
[INFO] - localhost - Hi
[INFO] - localhost - Hi
[INFO] - localhost - Hi
[END] A Container has been removed
我应该在客户端得到响应,
[Server] - Success
[Server] - Success
[Server] - Success
我也尝试在 framer 中使用 line Delimter,但结果相同。
有人可以帮忙吗?
提前致谢!!
我对 ContainerClient
中的 start
方法进行以下更改对我有用。只需添加一个 channel.flush()
.
public void start() {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ClientAdapterInitializer());
Channel channel = bootstrap.connect(server, port).sync().channel();
channel.write("Hi\n");
channel.write("Hi\n");
channel.write("Hi\n");
channel.flush();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
ChannelInboundMessageHandlerAdapter
在 4.0 的较新版本中不再存在。我用了 SimpleChannelInboundHandler
.
public class ServerAdapterHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
Channel currentChannel = ctx.channel();
System.out.println("[INFO] - " + currentChannel.remoteAddress() + " - " + msg);
currentChannel.write("[Server] - Success");
}
}