Netty4.1:SimpleChannelPool 重新连接调度事件
Netty4.1: SimpleChannelPool reconnection with schedule event
我正在尝试构建一个 Netty 客户端,它与各种地址(或只是不同的端口)建立多个 TCP 连接。并且如果任何连接的通道关闭,我希望它能够在一定的延迟时间后重新连接。这里有一些代码试图实现这一点:
private ChannelPoolMap<InetSocketAddress, SimpleChannelPool> poolMap
= new AbstractChannelPoolMap<InetSocketAddress, SimpleChannelPool>() {
@Override
protected SimpleChannelPool newPool(InetSocketAddress key) {
return new SimpleChannelPool(
clientBootstrap.remoteAddress(key),
simpleChannelPoolHandler);
}
};
private void doConnect(InetSocketAddress address) {
final SimpleChannelPool pool = poolMap.get(address);
Future<Channel> f = pool.acquire();
f.addListener(
(FutureListener<Channel>) (Future<Channel> future) -> {
if(future.isSuccess()){
Channel ch = f.getNow();
ch.closeFuture().addListener(closeFuture -> {
log.info("closed: reconnecting");
doConnect(address);
});
pool.release(ch);
}else{
log.info("failed to connect, retry in 5 second");
Thread.sleep(5000);
doConnect(address);
}
});
}
public void remoteConnect() throws IOException, InterruptedException {
for (int i = 0; i < 100; i++) {
InetSocketAddress address = new InetSocketAddress("localhost", 8300 + i);
doConnect(address);
}
}
clientBootstrap
和 simpleChannelPoolHandler
被创建为 Spring bean,因此除非必要,否则我不会在这里显示。
如上所示,我正在使用 Thread.sleep(5000)
来模拟重新连接之前的延迟。它不起作用,因为它会阻止同一线程中的其他连接通道接收新数据。
我也曾尝试使用 ch.eventLoop().schedule(..)
,但如果重新连接失败,这样做会抛出 NullPointerException(因为从 pool.acquire()
获取的通道将为空)。
这种情况应该如何获取eventLoop来做调度呢?或者我还需要处理哪些其他选项?
提前致谢!
创建您自己的 ScheduledExecutorService 并在那里安排工作。如果你只是想测试一下,那你为什么不尝试使用 netty 的 GlobalEventExecutor 来安排重新连接。
我正在尝试构建一个 Netty 客户端,它与各种地址(或只是不同的端口)建立多个 TCP 连接。并且如果任何连接的通道关闭,我希望它能够在一定的延迟时间后重新连接。这里有一些代码试图实现这一点:
private ChannelPoolMap<InetSocketAddress, SimpleChannelPool> poolMap
= new AbstractChannelPoolMap<InetSocketAddress, SimpleChannelPool>() {
@Override
protected SimpleChannelPool newPool(InetSocketAddress key) {
return new SimpleChannelPool(
clientBootstrap.remoteAddress(key),
simpleChannelPoolHandler);
}
};
private void doConnect(InetSocketAddress address) {
final SimpleChannelPool pool = poolMap.get(address);
Future<Channel> f = pool.acquire();
f.addListener(
(FutureListener<Channel>) (Future<Channel> future) -> {
if(future.isSuccess()){
Channel ch = f.getNow();
ch.closeFuture().addListener(closeFuture -> {
log.info("closed: reconnecting");
doConnect(address);
});
pool.release(ch);
}else{
log.info("failed to connect, retry in 5 second");
Thread.sleep(5000);
doConnect(address);
}
});
}
public void remoteConnect() throws IOException, InterruptedException {
for (int i = 0; i < 100; i++) {
InetSocketAddress address = new InetSocketAddress("localhost", 8300 + i);
doConnect(address);
}
}
clientBootstrap
和 simpleChannelPoolHandler
被创建为 Spring bean,因此除非必要,否则我不会在这里显示。
如上所示,我正在使用 Thread.sleep(5000)
来模拟重新连接之前的延迟。它不起作用,因为它会阻止同一线程中的其他连接通道接收新数据。
我也曾尝试使用 ch.eventLoop().schedule(..)
,但如果重新连接失败,这样做会抛出 NullPointerException(因为从 pool.acquire()
获取的通道将为空)。
这种情况应该如何获取eventLoop来做调度呢?或者我还需要处理哪些其他选项?
提前致谢!
创建您自己的 ScheduledExecutorService 并在那里安排工作。如果你只是想测试一下,那你为什么不尝试使用 netty 的 GlobalEventExecutor 来安排重新连接。