Netty:向许多服务器发送消息的问题
Netty: issue with sending message to many servers
我试图通过 netty 向多台服务器发送一条消息,然后只丢弃服务器应答。我准备了一个实现的小例子,请看下面:
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.LinkedList;
import java.util.List;
public class MainTest {
private final List<ConnectionInfo> HOSTS = new LinkedList<>();
private final ByteBuf buf;
public MainTest() {
HOSTS.add(new ConnectionInfo("127.0.0.1", 10000));
HOSTS.add(new ConnectionInfo("127.0.0.1", 20000));
HOSTS.add(new ConnectionInfo("127.0.0.1", 30000));
HOSTS.add(new ConnectionInfo("127.0.0.1", 40000));
HOSTS.add(new ConnectionInfo("127.0.0.1", 50000));
buf = Unpooled.buffer(100);
for (int i = 0; i < buf.capacity(); i ++) {
buf.writeByte((byte) i);
}
}
public static void main(String[] args) throws InterruptedException {
new MainTest().start();
}
public void start() throws InterruptedException {
for (ConnectionInfo connectionInfo : HOSTS) {
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoClientHandler(buf.copy()));
}
});
// Start the client.
ChannelFuture f = b.connect(connectionInfo.getHost(), connectionInfo.getPort()).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}
}
private class ConnectionInfo {
private final String host;
private final int port;
public ConnectionInfo(String host, int port) {
this.host = host;
this.port = port;
}
public String getHost() {
return host;
}
public int getPort() {
return port;
}
}
}
和 EchoClientHandler class:
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
private final ByteBuf message;
public EchoClientHandler(ByteBuf buf) {
message = buf;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(message);
ctx.close();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
但是如果列表中的某些服务器不可用,循环将被中断。我想解决这个问题。你对此有什么想法吗?
我找到了 2 个解决方案:
- 将循环中的代码包装到线程 class。
- 连接到服务器时禁用同步
让我们在代码中考虑这 2 个解决方案。
1.换行:
for (ConnectionInfo connectionInfo : HOSTS) {
new Thread() {
public void run() {
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
// ....... etc
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}
}.start();
}
2。禁用同步(并添加一些日志记录):
// Start the client.
ChannelFuture f = b.connect(connectionInfo.getHost(), connectionInfo.getPort());
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// connection complete start to read first data
LOG.log(Level.INFO, "Connected to {0}:{1} successfully.",
new Object[]{connectionInfo.getHost(), connectionInfo.getPort().toString()});
} else {
// Close the connection if the connection attempt has failed.
LOG.log(Level.WARNING, "Connection problem to {0}:{1}.",
new Object[]{connectionInfo.getHost(), connectionInfo.getPort().toString()});
}
}
});
// Wait until the connection is closed.
f.channel().closeFuture().sync();
第二个解决方案看起来更简单,我会更喜欢它。
我试图通过 netty 向多台服务器发送一条消息,然后只丢弃服务器应答。我准备了一个实现的小例子,请看下面:
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.LinkedList;
import java.util.List;
public class MainTest {
private final List<ConnectionInfo> HOSTS = new LinkedList<>();
private final ByteBuf buf;
public MainTest() {
HOSTS.add(new ConnectionInfo("127.0.0.1", 10000));
HOSTS.add(new ConnectionInfo("127.0.0.1", 20000));
HOSTS.add(new ConnectionInfo("127.0.0.1", 30000));
HOSTS.add(new ConnectionInfo("127.0.0.1", 40000));
HOSTS.add(new ConnectionInfo("127.0.0.1", 50000));
buf = Unpooled.buffer(100);
for (int i = 0; i < buf.capacity(); i ++) {
buf.writeByte((byte) i);
}
}
public static void main(String[] args) throws InterruptedException {
new MainTest().start();
}
public void start() throws InterruptedException {
for (ConnectionInfo connectionInfo : HOSTS) {
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoClientHandler(buf.copy()));
}
});
// Start the client.
ChannelFuture f = b.connect(connectionInfo.getHost(), connectionInfo.getPort()).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}
}
private class ConnectionInfo {
private final String host;
private final int port;
public ConnectionInfo(String host, int port) {
this.host = host;
this.port = port;
}
public String getHost() {
return host;
}
public int getPort() {
return port;
}
}
}
和 EchoClientHandler class:
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
private final ByteBuf message;
public EchoClientHandler(ByteBuf buf) {
message = buf;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(message);
ctx.close();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
但是如果列表中的某些服务器不可用,循环将被中断。我想解决这个问题。你对此有什么想法吗?
我找到了 2 个解决方案:
- 将循环中的代码包装到线程 class。
- 连接到服务器时禁用同步
让我们在代码中考虑这 2 个解决方案。
1.换行:
for (ConnectionInfo connectionInfo : HOSTS) {
new Thread() {
public void run() {
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
// ....... etc
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}
}.start();
}
2。禁用同步(并添加一些日志记录):
// Start the client.
ChannelFuture f = b.connect(connectionInfo.getHost(), connectionInfo.getPort());
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// connection complete start to read first data
LOG.log(Level.INFO, "Connected to {0}:{1} successfully.",
new Object[]{connectionInfo.getHost(), connectionInfo.getPort().toString()});
} else {
// Close the connection if the connection attempt has failed.
LOG.log(Level.WARNING, "Connection problem to {0}:{1}.",
new Object[]{connectionInfo.getHost(), connectionInfo.getPort().toString()});
}
}
});
// Wait until the connection is closed.
f.channel().closeFuture().sync();
第二个解决方案看起来更简单,我会更喜欢它。