从不同 InboundChannelHandlerAdapter.channelRead 调用 writeAndFlush 时的 netty 问题
netty issue when writeAndFlush called from different InboundChannelHandlerAdapter.channelRead
我遇到了一个问题,出于安全原因,我无法 post 完整代码(抱歉)。我的问题的要点是我有一个 ServerBootstrap,创建如下:
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
final ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addFirst("idleStateHandler", new IdleStateHandler(0, 0, 3000));
//Adds the MQTT encoder and decoder
ch.pipeline().addLast("decoder", new MyMessageDecoder());
ch.pipeline().addLast("encoder", new MyMessageEncoder());
ch.pipeline().addLast(createMyHandler());
}
}).option(ChannelOption.SO_BACKLOG, 128).option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// Bind and start to accept incoming connections.
channelFuture = b.bind(listenAddress, listenPort);
使用 createMyHandlerMethod() 基本上 returns ChannelInboundHandlerAdapter
的扩展实现
我还有一个 "client" 侦听器,用于侦听传入的连接请求,加载方式如下:
final String host = getHost();
final int port = getPort();
nioEventLoopGroup = new NioEventLoopGroup();
bootStrap = new Bootstrap();
bootStrap.group(nioEventLoopGroup);
bootStrap.channel(NioSocketChannel.class);
bootStrap.option(ChannelOption.SO_KEEPALIVE, true);
bootStrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addFirst("idleStateHandler", new IdleStateHandler(0, 0, getKeepAliveInterval()));
ch.pipeline().addAfter("idleStateHandler", "idleEventHandler", new MoquetteIdleTimeoutHandler());
ch.pipeline().addLast("decoder", new MyMessageDecoder());
ch.pipeline().addLast("encoder", new MyMessageEncoder());
ch.pipeline().addLast(MyClientHandler.this);
}
})
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.TCP_NODELAY, true);
// Start the client.
try {
channelFuture = bootStrap.connect(host, port).sync();
} catch (InterruptedException e) {
throw new MyException(“Exception”, e);
}
其中 MyClientHandler 又是 ChannelInboundHandlerAdapter 的子类实例。一切正常,我收到来自 "server" 适配器的消息,我处理它们,然后在相同的上下文中将它们发回。 "client" 处理程序反之亦然。
当我必须(对于某些消息)将它们从服务器或客户端处理程序代理到其他连接时,就会出现问题。再次,我很抱歉不能 post 很多代码,但它的要点是我打电话给:
serverHandler.channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof myProxyingMessage) {
if (ctx.channel().isActive()) {
ctx.channel().writeAndFlush(someOtherMessage);
**getClientHandler().writeAndFlush(myProxyingMessage);**
}
}
}
现在问题来了:加粗的(客户端)writeAndFlush - 从未实际写入消息字节,它不会抛出任何错误。 ChannelFuture returns 全部为 false(成功、取消、完成)。如果我同步它,最终它会因其他原因超时(我的代码中设置了连接超时)。
我知道我还没有 post 编辑我的所有代码,但我希望有人有一些提示 and/or 如何隔离为什么它不写的问题到客户端上下文。无论如何,我都不是 Netty 专家,而且大部分代码都是别人写的。它们都是 ChannelInboundHandlerAdapter
的子类
如有任何问题,欢迎随时提出。
*****编辑*********
我尝试使用以下测试代码将请求代理回不同的 context/channel(即客户端通道):
public void proxyPubRec(int messageId) throws MQTTException {
logger.log(logLevel, "proxying PUBREC to context: " + debugContext());
PubRecMessage pubRecMessage = new PubRecMessage();
pubRecMessage.setMessageID(messageId);
pubRecMessage.setRemainingLength(2);
logger.log(logLevel, "pipeline writable flag: " + ctx.pipeline().channel().isWritable());
MyMQTTEncoder encoder = new MyMQTTEncoder();
ByteBuf buff = null;
try {
buff = encoder.encode(pubRecMessage);
ctx.channel().writeAndFlush(buff);
} catch (Throwable t) {
logger.log(Level.SEVERE, "unable to encode PUBREC");
} finally {
if (buff != null) {
buff.release();
}
}
}
public class MyMQTTEncoder extends MQTTEncoder {
public ByteBuf encode(AbstractMessage msg) {
PooledByteBufAllocator allocator = new PooledByteBufAllocator();
ByteBuf buf = allocator.buffer();
try {
super.encode(ctx, msg, buf);
} catch (Throwable t) {
logger.log(Level.SEVERE, "unable to encode PUBREC, " + t.getMessage());
}
return buf;
}
}
但上面的行:ctx.channel().writeAndFlush(buff)
没有写入其他通道 - 任何 tips/tricks 调试此类问题?
someOtherMessage
必须是 ByteBuf
.
所以,拿这个:
serverHandler.channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof myProxyingMessage) {
if (ctx.channel().isActive()) {
ctx.channel().writeAndFlush(someOtherMessage);
**getClientHandler().writeAndFlush(myProxyingMessage);**
}
}
}
... 并将其替换为:
serverHandler.channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof myProxyingMessage) {
if (ctx.channel().isActive()) {
ctx.channel().writeAndFlush(ByteBuf);
**getClientHandler().writeAndFlush(myProxyingMessage);**
}
}
}
实际上,这是一个线程问题。我的一个线程是 blocked/waiting,而其他线程正在写入上下文,因此,写入被缓冲而不发送,即使刷新也是如此。问题解决了!
本质上,我将第一个消息代码放在一个 Runnable/Executor 线程中,这允许它单独 运行 以便第二个 write/response 能够写入上下文。这仍然可能存在一些问题(在消息排序方面),但这不是原始问题的主题。感谢您的帮助!
我遇到了一个问题,出于安全原因,我无法 post 完整代码(抱歉)。我的问题的要点是我有一个 ServerBootstrap,创建如下:
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
final ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addFirst("idleStateHandler", new IdleStateHandler(0, 0, 3000));
//Adds the MQTT encoder and decoder
ch.pipeline().addLast("decoder", new MyMessageDecoder());
ch.pipeline().addLast("encoder", new MyMessageEncoder());
ch.pipeline().addLast(createMyHandler());
}
}).option(ChannelOption.SO_BACKLOG, 128).option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// Bind and start to accept incoming connections.
channelFuture = b.bind(listenAddress, listenPort);
使用 createMyHandlerMethod() 基本上 returns ChannelInboundHandlerAdapter
我还有一个 "client" 侦听器,用于侦听传入的连接请求,加载方式如下:
final String host = getHost();
final int port = getPort();
nioEventLoopGroup = new NioEventLoopGroup();
bootStrap = new Bootstrap();
bootStrap.group(nioEventLoopGroup);
bootStrap.channel(NioSocketChannel.class);
bootStrap.option(ChannelOption.SO_KEEPALIVE, true);
bootStrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addFirst("idleStateHandler", new IdleStateHandler(0, 0, getKeepAliveInterval()));
ch.pipeline().addAfter("idleStateHandler", "idleEventHandler", new MoquetteIdleTimeoutHandler());
ch.pipeline().addLast("decoder", new MyMessageDecoder());
ch.pipeline().addLast("encoder", new MyMessageEncoder());
ch.pipeline().addLast(MyClientHandler.this);
}
})
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.TCP_NODELAY, true);
// Start the client.
try {
channelFuture = bootStrap.connect(host, port).sync();
} catch (InterruptedException e) {
throw new MyException(“Exception”, e);
}
其中 MyClientHandler 又是 ChannelInboundHandlerAdapter 的子类实例。一切正常,我收到来自 "server" 适配器的消息,我处理它们,然后在相同的上下文中将它们发回。 "client" 处理程序反之亦然。
当我必须(对于某些消息)将它们从服务器或客户端处理程序代理到其他连接时,就会出现问题。再次,我很抱歉不能 post 很多代码,但它的要点是我打电话给:
serverHandler.channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof myProxyingMessage) {
if (ctx.channel().isActive()) {
ctx.channel().writeAndFlush(someOtherMessage);
**getClientHandler().writeAndFlush(myProxyingMessage);**
}
}
}
现在问题来了:加粗的(客户端)writeAndFlush - 从未实际写入消息字节,它不会抛出任何错误。 ChannelFuture returns 全部为 false(成功、取消、完成)。如果我同步它,最终它会因其他原因超时(我的代码中设置了连接超时)。
我知道我还没有 post 编辑我的所有代码,但我希望有人有一些提示 and/or 如何隔离为什么它不写的问题到客户端上下文。无论如何,我都不是 Netty 专家,而且大部分代码都是别人写的。它们都是 ChannelInboundHandlerAdapter
如有任何问题,欢迎随时提出。
*****编辑********* 我尝试使用以下测试代码将请求代理回不同的 context/channel(即客户端通道):
public void proxyPubRec(int messageId) throws MQTTException {
logger.log(logLevel, "proxying PUBREC to context: " + debugContext());
PubRecMessage pubRecMessage = new PubRecMessage();
pubRecMessage.setMessageID(messageId);
pubRecMessage.setRemainingLength(2);
logger.log(logLevel, "pipeline writable flag: " + ctx.pipeline().channel().isWritable());
MyMQTTEncoder encoder = new MyMQTTEncoder();
ByteBuf buff = null;
try {
buff = encoder.encode(pubRecMessage);
ctx.channel().writeAndFlush(buff);
} catch (Throwable t) {
logger.log(Level.SEVERE, "unable to encode PUBREC");
} finally {
if (buff != null) {
buff.release();
}
}
}
public class MyMQTTEncoder extends MQTTEncoder {
public ByteBuf encode(AbstractMessage msg) {
PooledByteBufAllocator allocator = new PooledByteBufAllocator();
ByteBuf buf = allocator.buffer();
try {
super.encode(ctx, msg, buf);
} catch (Throwable t) {
logger.log(Level.SEVERE, "unable to encode PUBREC, " + t.getMessage());
}
return buf;
}
}
但上面的行:ctx.channel().writeAndFlush(buff)
没有写入其他通道 - 任何 tips/tricks 调试此类问题?
someOtherMessage
必须是 ByteBuf
.
所以,拿这个:
serverHandler.channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof myProxyingMessage) {
if (ctx.channel().isActive()) {
ctx.channel().writeAndFlush(someOtherMessage);
**getClientHandler().writeAndFlush(myProxyingMessage);**
}
}
}
... 并将其替换为:
serverHandler.channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof myProxyingMessage) {
if (ctx.channel().isActive()) {
ctx.channel().writeAndFlush(ByteBuf);
**getClientHandler().writeAndFlush(myProxyingMessage);**
}
}
}
实际上,这是一个线程问题。我的一个线程是 blocked/waiting,而其他线程正在写入上下文,因此,写入被缓冲而不发送,即使刷新也是如此。问题解决了!
本质上,我将第一个消息代码放在一个 Runnable/Executor 线程中,这允许它单独 运行 以便第二个 write/response 能够写入上下文。这仍然可能存在一些问题(在消息排序方面),但这不是原始问题的主题。感谢您的帮助!