Netty:向许多服务器发送消息的问题

Netty: issue with sending message to many servers

我试图通过 netty 向多台服务器发送一条消息,然后只丢弃服务器应答。我准备了一个实现的小例子,请看下面:

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.LinkedList;
import java.util.List;

public class MainTest {

    private final List<ConnectionInfo> HOSTS = new LinkedList<>();
    private final ByteBuf buf;

    public MainTest() {
        HOSTS.add(new ConnectionInfo("127.0.0.1", 10000));
        HOSTS.add(new ConnectionInfo("127.0.0.1", 20000));
        HOSTS.add(new ConnectionInfo("127.0.0.1", 30000));
        HOSTS.add(new ConnectionInfo("127.0.0.1", 40000));
        HOSTS.add(new ConnectionInfo("127.0.0.1", 50000));

        buf = Unpooled.buffer(100);
        for (int i = 0; i < buf.capacity(); i ++) {
            buf.writeByte((byte) i);
        }
    }

    public static void main(String[] args) throws InterruptedException {
         new MainTest().start();
    }

    public void start() throws InterruptedException {
        for (ConnectionInfo connectionInfo : HOSTS) {
            // Configure the client.
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group)
                        .channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline p = ch.pipeline();
                                //p.addLast(new LoggingHandler(LogLevel.INFO));
                                p.addLast(new EchoClientHandler(buf.copy()));
                            }
                        });
                // Start the client.
                ChannelFuture f = b.connect(connectionInfo.getHost(), connectionInfo.getPort()).sync();
                // Wait until the connection is closed.
                f.channel().closeFuture().sync();
            } finally {
                // Shut down the event loop to terminate all threads.
                group.shutdownGracefully();
            }
        }
    }

    private class ConnectionInfo {
        private final String host;
        private final int port;

        public ConnectionInfo(String host, int port) {
            this.host = host;
            this.port = port;
        }

        public String getHost() {
            return host;
        }

        public int getPort() {
            return port;
        }
    }
}

和 EchoClientHandler class:

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class EchoClientHandler extends ChannelInboundHandlerAdapter {

    private final ByteBuf message;

    public EchoClientHandler(ByteBuf buf) {
        message = buf;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(message);
        ctx.close();
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

但是如果列表中的某些服务器不可用,循环将被中断。我想解决这个问题。你对此有什么想法吗?

我找到了 2 个解决方案:

  1. 将循环中的代码包装到线程 class。
  2. 连接到服务器时禁用同步

让我们在代码中考虑这 2 个解决方案。

1.换行:

for (ConnectionInfo connectionInfo : HOSTS) {
    new Thread() {
    public void run() {
        // Configure the client.
        EventLoopGroup group = new NioEventLoopGroup();
        try {
        Bootstrap b = new Bootstrap();
        // ....... etc
        } finally {
        // Shut down the event loop to terminate all threads.
        group.shutdownGracefully();
        }
    }
    }.start();
}

2。禁用同步(并添加一些日志记录):

// Start the client.
ChannelFuture f = b.connect(connectionInfo.getHost(), connectionInfo.getPort());
f.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) {
    if (future.isSuccess()) {
        // connection complete start to read first data
        LOG.log(Level.INFO, "Connected to {0}:{1} successfully.",
            new Object[]{connectionInfo.getHost(), connectionInfo.getPort().toString()});
    } else {
        // Close the connection if the connection attempt has failed.
        LOG.log(Level.WARNING, "Connection problem to {0}:{1}.",
            new Object[]{connectionInfo.getHost(), connectionInfo.getPort().toString()});
    }
    }
});
// Wait until the connection is closed.
f.channel().closeFuture().sync();

第二个解决方案看起来更简单,我会更喜欢它。