如何将 FixedChannelPool 与 WebSockets 一起使用?
How to use FixedChannelPool with WebSockets?
各位开发者大家好...
我正在尝试实现 WebSocket 固定连接池,不幸的是 netty 为 FixedChannelPool
提供了糟糕的指导,最大 "what you can get" 是他们的 Unit Tests
我正在使用的客户端代码示例Source Code
并从 中发现 bootstrap.handler(new ChannelInitializer<>())
被 ChannelPool
覆盖
然后我尝试将 ChannelInitializer
块移动到:
public class SomeConnectionPoolHandler implements ChannelPoolHandler {
private final URI uri = URI.create("ws://some.url:80");
@Override
public void channelCreated(Channel ch) {
String protocol = uri.getScheme();
if (!"ws".equals(protocol)) {
throw new IllegalArgumentException("Unsupported protocol: " + protocol);
}
DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
final WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, false, httpHeaders, 1280000);
final WebSocketClientHandler handler = new WebSocketClientHandler(handshaker, connection);
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpClientCodec(), new HttpObjectAggregator(65536), handler);
}
@Override
public void channelReleased(Channel ch) {
//TODO
}
@Override
public void channelAcquired(Channel ch) {
//TODO
}
}
现在我的客户端代码如下所示:
public class Application {
private final URI uri = URI.create("ws://some.url:80");
public static void main(String[] args) {
Channel ch = null;
try {
String protocol = uri.getScheme();
if (!"ws".equals(protocol)) {
throw new IllegalArgumentException("Unsupported protocol: " + protocol);
}
Bootstrap b = new Bootstrap();
b.option(ChannelOption.SO_KEEPALIVE, true);
b.remoteAddress(uri.getHost(), uri.getPort());
b.group(group);
b.channel(NioSocketChannel.class);
SomeConnectionPoolHandler connectionPoolHandler = new SomeConnectionPoolHandler ();
channelPool = new FixedChannelPool(b, connectionPoolHandler, 8);
ch = channelPool.acquire().sync().get();
ch.writeAndFlush(new TextWebSocketFrame("test")).sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
if(ch != null) {
channelPool.release(ch);
}
}
}
}
现在我收到错误:
java.lang.UnsupportedOperationException: unsupported message type: TextWebSocketFrame (expected: ByteBuf, FileRegion)
at io.netty.channel.nio.AbstractNioByteChannel.filterOutboundMessage(AbstractNioByteChannel.java:266)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:799)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1291)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.write(CombinedChannelDuplexHandler.java:528)
at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:101)
at io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:348)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
at io.netty.channel.AbstractChannelHandlerContext.access00(AbstractChannelHandlerContext.java:38)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1089)
at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1136)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1078)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)
好的,我找到了解决问题的方法。
问题是我试图在 WebSocket 握手完成之前将数据写入 Channel
。
在handler source code中你可以看到它有获取WebSocket握手ChannelFuture
的方法。
public ChannelFuture handshakeFuture() {
return handshakeFuture;
}
现在,在从FixedChannelPool
获取Channel
之后,我正在调用方法(returns无论握手是否完成):
private boolean isConnectionReady(Channel ch) {
try {
WebSocketClientHandler handler = (WebSocketClientHandler) ch.pipeline().get("handler");
return handler.handshakeFuture().sync().isSuccess();
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
还在 channelCreated(Channel ch)
中添加了 WebSocketClientHandler
的名称
DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
final WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, false, httpHeaders, 1280000);
final WebSocketClientHandler handler = new WebSocketClientHandler(handshaker, connection);
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpClientCodec(), new HttpObjectAggregator(65536));
pipeline.addLast("handler", handler);
我知道这段代码现在看起来很糟糕,应该重构。
各位开发者大家好...
我正在尝试实现 WebSocket 固定连接池,不幸的是 netty 为 FixedChannelPool
提供了糟糕的指导,最大 "what you can get" 是他们的 Unit Tests
我正在使用的客户端代码示例Source Code
并从 bootstrap.handler(new ChannelInitializer<>())
被 ChannelPool
然后我尝试将 ChannelInitializer
块移动到:
public class SomeConnectionPoolHandler implements ChannelPoolHandler {
private final URI uri = URI.create("ws://some.url:80");
@Override
public void channelCreated(Channel ch) {
String protocol = uri.getScheme();
if (!"ws".equals(protocol)) {
throw new IllegalArgumentException("Unsupported protocol: " + protocol);
}
DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
final WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, false, httpHeaders, 1280000);
final WebSocketClientHandler handler = new WebSocketClientHandler(handshaker, connection);
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpClientCodec(), new HttpObjectAggregator(65536), handler);
}
@Override
public void channelReleased(Channel ch) {
//TODO
}
@Override
public void channelAcquired(Channel ch) {
//TODO
}
}
现在我的客户端代码如下所示:
public class Application {
private final URI uri = URI.create("ws://some.url:80");
public static void main(String[] args) {
Channel ch = null;
try {
String protocol = uri.getScheme();
if (!"ws".equals(protocol)) {
throw new IllegalArgumentException("Unsupported protocol: " + protocol);
}
Bootstrap b = new Bootstrap();
b.option(ChannelOption.SO_KEEPALIVE, true);
b.remoteAddress(uri.getHost(), uri.getPort());
b.group(group);
b.channel(NioSocketChannel.class);
SomeConnectionPoolHandler connectionPoolHandler = new SomeConnectionPoolHandler ();
channelPool = new FixedChannelPool(b, connectionPoolHandler, 8);
ch = channelPool.acquire().sync().get();
ch.writeAndFlush(new TextWebSocketFrame("test")).sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
if(ch != null) {
channelPool.release(ch);
}
}
}
}
现在我收到错误:
java.lang.UnsupportedOperationException: unsupported message type: TextWebSocketFrame (expected: ByteBuf, FileRegion)
at io.netty.channel.nio.AbstractNioByteChannel.filterOutboundMessage(AbstractNioByteChannel.java:266)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:799)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1291)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.write(CombinedChannelDuplexHandler.java:528)
at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:101)
at io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:348)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
at io.netty.channel.AbstractChannelHandlerContext.access00(AbstractChannelHandlerContext.java:38)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1089)
at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1136)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1078)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)
好的,我找到了解决问题的方法。
问题是我试图在 WebSocket 握手完成之前将数据写入 Channel
。
在handler source code中你可以看到它有获取WebSocket握手ChannelFuture
的方法。
public ChannelFuture handshakeFuture() {
return handshakeFuture;
}
现在,在从FixedChannelPool
获取Channel
之后,我正在调用方法(returns无论握手是否完成):
private boolean isConnectionReady(Channel ch) {
try {
WebSocketClientHandler handler = (WebSocketClientHandler) ch.pipeline().get("handler");
return handler.handshakeFuture().sync().isSuccess();
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
还在 channelCreated(Channel ch)
WebSocketClientHandler
的名称
DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
final WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, false, httpHeaders, 1280000);
final WebSocketClientHandler handler = new WebSocketClientHandler(handshaker, connection);
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpClientCodec(), new HttpObjectAggregator(65536));
pipeline.addLast("handler", handler);
我知道这段代码现在看起来很糟糕,应该重构。