Netty 可以自动处理请求排队吗?
Can Netty automatically handle queueing of requests?
在 Apache PLC4X 项目 (https://plc4x.apache.org) 中,我们正在使用 Netty 实现工业 PLC 的驱动程序。这里通常是各种协议分层的。有时一层需要我们将一条消息拆分为底层的多条消息。现在我们面临一个大问题:一种协议协商每个连接的最大未确认消息数。所以我们不能向它发送超过这个最大值的消息,否则接收方只会发送一个错误响应。
现在我们不需要在编码方法中向 "out" 添加内容,而是将它们添加到某种队列并让某种 Netty 机制负责清空该队列……有这样的吗Netty 中的一种机制?如果没有,实现它的最佳方法是什么?
如果对 Netty 有深入了解的人可以加入我们的项目邮件列表 (dev@plc4x.apache.org) 也会很棒,因为我们还在为 Netty 开发一些非常酷的附加功能(以太网框架上的原始套接字传输)和一个基于 IP 数据包的)...我敢打赌这两个项目可以从彼此中受益匪浅。
虽然 Netty 没有提供这样的开箱即用的处理程序,但是由于内部设计,开箱即用地制作这样的最大并发挂起请求确实很容易。
可以使用来自 Netty 框架的 PendingWriteQueue
class 结合通用处理程序来制作这样的处理程序:
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.PendingWriteQueue;
public class MaxPendingRequestHandler extends ChannelHandlerAdapter {
private PendingWriteQueue queue;
private int freeSlots;
public MaxPendingRequestHandler(int maxRequests) {
this.freeSlots = maxRequests;
}
private synchronized void trySendMessages(ChannelHandlerContext ctx) {
if(this.freeSlots > 0) {
while(this.freeSlots > 0) {
if(this.queue.removeAndWrite() == null) {
ctx.flush();
return;
}
this.freeSlots--;
}
ctx.flush();
}
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
this.queue = new PendingWriteQueue(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// Send everything so we get a proper failurefor those pending writes
this.queue.removeAndWriteAll();
super.channelInactive(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
this.queue.removeAndWriteAll();
super.channelUnregistered(ctx);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
this.queue.add(msg, promise);
trySendMessages(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
synchronized(this) {
this.freeSlots++;
trySendMessages(ctx);
}
super.channelRead(ctx, msg);
}
}
此处理程序的工作原理是将每条新消息保存在队列中,并在每个 write/read.
上检查线路上的空闲插槽
请注意,处理程序应放在数据包 decoders/encoders 之后的管道中,否则在将传入数据包计为潜在的多个数据包时会出现问题,示例:
pipeline.addLast(new PacketCodex()); // A codex exists of an encoder and decoder, you can also ass them seperately
// pipeline.addLast(new TrafficShapingHandler()) // Optional, depending on your required protocols
// pipeline.addLast(new IdleStateHandler()) // Optional, depending on your required protocols
pipeline.addLast(new MaxPendingRequestHandler())
pipeline.addLast(new Businesshandler())
当然,您还想验证我们的处理程序是否有效,这可以使用包含 EmbeddedChannel
& JUnit 的单元测试来完成:
public class MaxPendingRequestHandlerTest {
@Test
public void testMaxPending() {
EmbeddedChannel channel = new EmbeddedChannel(new MaxPendingRequestHandler(3));
// channel.writeOutbound("1", "2", "3"); // This doesn't work as it expects operations to complete directly
channel.write("1");
channel.write("2");
channel.write("3");
channel.write("4");
channel.write("5");
channel.write("6");
Assert.assertEquals(channel.readOutbound(), "1");
Assert.assertEquals(channel.readOutbound(), "2");
Assert.assertEquals(channel.readOutbound(), "3");
Assert.assertEquals(channel.readOutbound(), (Object)null);
}
@Test
public void testMaxPendingWhenAResponseHasReceived() {
EmbeddedChannel channel = new EmbeddedChannel(new MaxPendingRequestHandler(3));
// channel.writeOutbound("1", "2", "3"); // This doesn't work as it expects operations to complete directly
channel.write("1");
channel.write("2");
channel.write("3");
channel.write("4");
channel.write("5");
channel.write("6");
channel.writeInbound("RE: 1");
Assert.assertEquals(channel.readOutbound(), "1");
Assert.assertEquals(channel.readOutbound(), "2");
Assert.assertEquals(channel.readOutbound(), "3");
Assert.assertEquals(channel.readOutbound(), "4");
Assert.assertEquals(channel.readOutbound(), (Object)null);
}
@Test
public void testMaxPendingWhenAllResponseHasReceived() {
EmbeddedChannel channel = new EmbeddedChannel(new MaxPendingRequestHandler(3));
// channel.writeOutbound("1", "2", "3"); // This doesn't work as it expects operations to complete directly
channel.write("1");
channel.write("2");
channel.write("3");
channel.write("4");
channel.write("5");
channel.write("6");
channel.writeInbound("RE: 1");
channel.writeInbound("RE: 2");
channel.writeInbound("RE: 3");
channel.writeInbound("RE: 4");
channel.writeInbound("RE: 5");
channel.writeInbound("RE: 6");
Assert.assertEquals(channel.readOutbound(), "1");
Assert.assertEquals(channel.readOutbound(), "2");
Assert.assertEquals(channel.readOutbound(), "3");
Assert.assertEquals(channel.readOutbound(), "4");
Assert.assertEquals(channel.readOutbound(), "5");
Assert.assertEquals(channel.readOutbound(), "6");
Assert.assertEquals(channel.readOutbound(), (Object)null);
}
@Test
public void testMaxPendingWhenAllResponseHasReceivedAndNewMessagesAreSend() {
EmbeddedChannel channel = new EmbeddedChannel(new MaxPendingRequestHandler(3));
// channel.writeOutbound("1", "2", "3"); // This doesn't work as it expects operations to complete directly
channel.write("1");
channel.write("2");
channel.write("3");
channel.write("4");
channel.write("5");
channel.write("6");
channel.writeInbound("RE: 1");
channel.writeInbound("RE: 2");
channel.writeInbound("RE: 3");
channel.writeInbound("RE: 4");
channel.writeInbound("RE: 5");
channel.writeInbound("RE: 6");
channel.write("7");
channel.write("8");
channel.write("9");
channel.write("10");
Assert.assertEquals(channel.readOutbound(), "1");
Assert.assertEquals(channel.readOutbound(), "2");
Assert.assertEquals(channel.readOutbound(), "3");
Assert.assertEquals(channel.readOutbound(), "4");
Assert.assertEquals(channel.readOutbound(), "5");
Assert.assertEquals(channel.readOutbound(), "6");
Assert.assertEquals(channel.readOutbound(), "7");
Assert.assertEquals(channel.readOutbound(), "8");
Assert.assertEquals(channel.readOutbound(), "9");
Assert.assertEquals(channel.readOutbound(), (Object)null);
}
}
在 Apache PLC4X 项目 (https://plc4x.apache.org) 中,我们正在使用 Netty 实现工业 PLC 的驱动程序。这里通常是各种协议分层的。有时一层需要我们将一条消息拆分为底层的多条消息。现在我们面临一个大问题:一种协议协商每个连接的最大未确认消息数。所以我们不能向它发送超过这个最大值的消息,否则接收方只会发送一个错误响应。
现在我们不需要在编码方法中向 "out" 添加内容,而是将它们添加到某种队列并让某种 Netty 机制负责清空该队列……有这样的吗Netty 中的一种机制?如果没有,实现它的最佳方法是什么?
如果对 Netty 有深入了解的人可以加入我们的项目邮件列表 (dev@plc4x.apache.org) 也会很棒,因为我们还在为 Netty 开发一些非常酷的附加功能(以太网框架上的原始套接字传输)和一个基于 IP 数据包的)...我敢打赌这两个项目可以从彼此中受益匪浅。
虽然 Netty 没有提供这样的开箱即用的处理程序,但是由于内部设计,开箱即用地制作这样的最大并发挂起请求确实很容易。
可以使用来自 Netty 框架的 PendingWriteQueue
class 结合通用处理程序来制作这样的处理程序:
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.PendingWriteQueue;
public class MaxPendingRequestHandler extends ChannelHandlerAdapter {
private PendingWriteQueue queue;
private int freeSlots;
public MaxPendingRequestHandler(int maxRequests) {
this.freeSlots = maxRequests;
}
private synchronized void trySendMessages(ChannelHandlerContext ctx) {
if(this.freeSlots > 0) {
while(this.freeSlots > 0) {
if(this.queue.removeAndWrite() == null) {
ctx.flush();
return;
}
this.freeSlots--;
}
ctx.flush();
}
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
this.queue = new PendingWriteQueue(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// Send everything so we get a proper failurefor those pending writes
this.queue.removeAndWriteAll();
super.channelInactive(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
this.queue.removeAndWriteAll();
super.channelUnregistered(ctx);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
this.queue.add(msg, promise);
trySendMessages(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
synchronized(this) {
this.freeSlots++;
trySendMessages(ctx);
}
super.channelRead(ctx, msg);
}
}
此处理程序的工作原理是将每条新消息保存在队列中,并在每个 write/read.
上检查线路上的空闲插槽请注意,处理程序应放在数据包 decoders/encoders 之后的管道中,否则在将传入数据包计为潜在的多个数据包时会出现问题,示例:
pipeline.addLast(new PacketCodex()); // A codex exists of an encoder and decoder, you can also ass them seperately
// pipeline.addLast(new TrafficShapingHandler()) // Optional, depending on your required protocols
// pipeline.addLast(new IdleStateHandler()) // Optional, depending on your required protocols
pipeline.addLast(new MaxPendingRequestHandler())
pipeline.addLast(new Businesshandler())
当然,您还想验证我们的处理程序是否有效,这可以使用包含 EmbeddedChannel
& JUnit 的单元测试来完成:
public class MaxPendingRequestHandlerTest {
@Test
public void testMaxPending() {
EmbeddedChannel channel = new EmbeddedChannel(new MaxPendingRequestHandler(3));
// channel.writeOutbound("1", "2", "3"); // This doesn't work as it expects operations to complete directly
channel.write("1");
channel.write("2");
channel.write("3");
channel.write("4");
channel.write("5");
channel.write("6");
Assert.assertEquals(channel.readOutbound(), "1");
Assert.assertEquals(channel.readOutbound(), "2");
Assert.assertEquals(channel.readOutbound(), "3");
Assert.assertEquals(channel.readOutbound(), (Object)null);
}
@Test
public void testMaxPendingWhenAResponseHasReceived() {
EmbeddedChannel channel = new EmbeddedChannel(new MaxPendingRequestHandler(3));
// channel.writeOutbound("1", "2", "3"); // This doesn't work as it expects operations to complete directly
channel.write("1");
channel.write("2");
channel.write("3");
channel.write("4");
channel.write("5");
channel.write("6");
channel.writeInbound("RE: 1");
Assert.assertEquals(channel.readOutbound(), "1");
Assert.assertEquals(channel.readOutbound(), "2");
Assert.assertEquals(channel.readOutbound(), "3");
Assert.assertEquals(channel.readOutbound(), "4");
Assert.assertEquals(channel.readOutbound(), (Object)null);
}
@Test
public void testMaxPendingWhenAllResponseHasReceived() {
EmbeddedChannel channel = new EmbeddedChannel(new MaxPendingRequestHandler(3));
// channel.writeOutbound("1", "2", "3"); // This doesn't work as it expects operations to complete directly
channel.write("1");
channel.write("2");
channel.write("3");
channel.write("4");
channel.write("5");
channel.write("6");
channel.writeInbound("RE: 1");
channel.writeInbound("RE: 2");
channel.writeInbound("RE: 3");
channel.writeInbound("RE: 4");
channel.writeInbound("RE: 5");
channel.writeInbound("RE: 6");
Assert.assertEquals(channel.readOutbound(), "1");
Assert.assertEquals(channel.readOutbound(), "2");
Assert.assertEquals(channel.readOutbound(), "3");
Assert.assertEquals(channel.readOutbound(), "4");
Assert.assertEquals(channel.readOutbound(), "5");
Assert.assertEquals(channel.readOutbound(), "6");
Assert.assertEquals(channel.readOutbound(), (Object)null);
}
@Test
public void testMaxPendingWhenAllResponseHasReceivedAndNewMessagesAreSend() {
EmbeddedChannel channel = new EmbeddedChannel(new MaxPendingRequestHandler(3));
// channel.writeOutbound("1", "2", "3"); // This doesn't work as it expects operations to complete directly
channel.write("1");
channel.write("2");
channel.write("3");
channel.write("4");
channel.write("5");
channel.write("6");
channel.writeInbound("RE: 1");
channel.writeInbound("RE: 2");
channel.writeInbound("RE: 3");
channel.writeInbound("RE: 4");
channel.writeInbound("RE: 5");
channel.writeInbound("RE: 6");
channel.write("7");
channel.write("8");
channel.write("9");
channel.write("10");
Assert.assertEquals(channel.readOutbound(), "1");
Assert.assertEquals(channel.readOutbound(), "2");
Assert.assertEquals(channel.readOutbound(), "3");
Assert.assertEquals(channel.readOutbound(), "4");
Assert.assertEquals(channel.readOutbound(), "5");
Assert.assertEquals(channel.readOutbound(), "6");
Assert.assertEquals(channel.readOutbound(), "7");
Assert.assertEquals(channel.readOutbound(), "8");
Assert.assertEquals(channel.readOutbound(), "9");
Assert.assertEquals(channel.readOutbound(), (Object)null);
}
}