Netty 连接池不向服务器发送消息
Netty connection pool not sending messages to server
我有一个简单的 netty 连接池和一个简单的 HTTP 端点,可以使用该池将 TCP 消息发送到 ServerSocket。相关代码如下所示,客户端(NettyConnectionPoolClientApplication)是:
@SpringBootApplication
@RestController
public class NettyConnectionPoolClientApplication {
private SimpleChannelPool simpleChannelPool;
public static void main(String[] args) {
SpringApplication.run(NettyConnectionPoolClientApplication.class, args);
}
@PostConstruct
public void setup() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.remoteAddress(new InetSocketAddress("localhost", 9000));
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new DummyClientHandler());
}
});
simpleChannelPool = new SimpleChannelPool(bootstrap, new DummyChannelPoolHandler());
}
@RequestMapping("/test/{msg}")
public void test(@PathVariable String msg) throws Exception {
Future<Channel> future = simpleChannelPool.acquire();
future.addListener((FutureListener<Channel>) f -> {
if (f.isSuccess()) {
System.out.println("Connected");
Channel ch = f.getNow();
ch.writeAndFlush(msg + System.lineSeparator());
// Release back to pool
simpleChannelPool.release(ch);
} else {
System.out.println("not successful");
}
});
}
}
和服务器 (ServerSocketRunner)
public class ServerSocketRunner {
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(9000);
while (true) {
Socket socket = serverSocket.accept();
new Thread(() -> {
System.out.println("New client connected");
try (PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
BufferedReader in = new BufferedReader(
new InputStreamReader(socket.getInputStream()));) {
String inputLine, outputLine;
out.println("Hello client!");
do {
inputLine = in.readLine();
System.out.println("Received: " + inputLine);
} while (!"bye".equals(inputLine));
System.out.println("Closing connection...");
socket.close();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
DummyChannelPoolHandler and DummyClientHandler 只是打印出发生的事件,所以它们是不相关的。当服务器和客户端启动并且我向测试端点发送测试消息时,我可以看到服务器打印 "New client connected" 但客户端发送的消息未打印。 None 个客户端发送的连续消息被服务器打印出来。
如果我尝试 telnet,一切正常,服务器会打印出消息。它也适用于具有相同 bootstrap 配置且没有连接池 (SimpleNettyClientApplication) 的常规 netty 客户端。
谁能看看我的连接池有什么问题,我没主意了
Netty 版本:4.1.39.Final
所有代码都可用here。
更新
听从 Norman Maurer 的建议。我添加了
ChannelFuture channelFuture = ch
.writeAndFlush(msg + System.lineSeparator());
channelFuture.addListener(writeFuture -> {
System.out
.println("isSuccess(): " + channelFuture.isSuccess() + " : " + channelFuture.cause());
});
这会打印出来
isSuccess: false : java.lang.UnsupportedOperationException: unsupported message type: String (expected: ByteBuf, FileRegion)
为了修复它,我只是将 String
转换为 ByteBuf
ch.writeAndFlush(Unpooled.wrappedBuffer((msg + System.lineSeparator()).getBytes()));
您应该检查 writeAndFlush(...)
返回的 ChannelFuture
的状态。我怀疑它失败了。
我有一个简单的 netty 连接池和一个简单的 HTTP 端点,可以使用该池将 TCP 消息发送到 ServerSocket。相关代码如下所示,客户端(NettyConnectionPoolClientApplication)是:
@SpringBootApplication
@RestController
public class NettyConnectionPoolClientApplication {
private SimpleChannelPool simpleChannelPool;
public static void main(String[] args) {
SpringApplication.run(NettyConnectionPoolClientApplication.class, args);
}
@PostConstruct
public void setup() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.remoteAddress(new InetSocketAddress("localhost", 9000));
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new DummyClientHandler());
}
});
simpleChannelPool = new SimpleChannelPool(bootstrap, new DummyChannelPoolHandler());
}
@RequestMapping("/test/{msg}")
public void test(@PathVariable String msg) throws Exception {
Future<Channel> future = simpleChannelPool.acquire();
future.addListener((FutureListener<Channel>) f -> {
if (f.isSuccess()) {
System.out.println("Connected");
Channel ch = f.getNow();
ch.writeAndFlush(msg + System.lineSeparator());
// Release back to pool
simpleChannelPool.release(ch);
} else {
System.out.println("not successful");
}
});
}
}
和服务器 (ServerSocketRunner)
public class ServerSocketRunner {
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(9000);
while (true) {
Socket socket = serverSocket.accept();
new Thread(() -> {
System.out.println("New client connected");
try (PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
BufferedReader in = new BufferedReader(
new InputStreamReader(socket.getInputStream()));) {
String inputLine, outputLine;
out.println("Hello client!");
do {
inputLine = in.readLine();
System.out.println("Received: " + inputLine);
} while (!"bye".equals(inputLine));
System.out.println("Closing connection...");
socket.close();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
DummyChannelPoolHandler and DummyClientHandler 只是打印出发生的事件,所以它们是不相关的。当服务器和客户端启动并且我向测试端点发送测试消息时,我可以看到服务器打印 "New client connected" 但客户端发送的消息未打印。 None 个客户端发送的连续消息被服务器打印出来。
如果我尝试 telnet,一切正常,服务器会打印出消息。它也适用于具有相同 bootstrap 配置且没有连接池 (SimpleNettyClientApplication) 的常规 netty 客户端。
谁能看看我的连接池有什么问题,我没主意了
Netty 版本:4.1.39.Final
所有代码都可用here。
更新
听从 Norman Maurer 的建议。我添加了
ChannelFuture channelFuture = ch
.writeAndFlush(msg + System.lineSeparator());
channelFuture.addListener(writeFuture -> {
System.out
.println("isSuccess(): " + channelFuture.isSuccess() + " : " + channelFuture.cause());
});
这会打印出来
isSuccess: false : java.lang.UnsupportedOperationException: unsupported message type: String (expected: ByteBuf, FileRegion)
为了修复它,我只是将 String
转换为 ByteBuf
ch.writeAndFlush(Unpooled.wrappedBuffer((msg + System.lineSeparator()).getBytes()));
您应该检查 writeAndFlush(...)
返回的 ChannelFuture
的状态。我怀疑它失败了。