是否可能使用 Netty 接收到乱序或受限的消息?

Its possible to message received being out of order or limited using Netty?

嗯,我使用netty框架java,通过concox设备协议来处理消息,我的应用程序收到的消息似乎与预期的不同。

收到的消息是这样的

EF BF BD EF BF BD EF BF BD 0D 0A 78 78 1F 12 13 0A 1C 0F 12 1D EF BF BD

预期应该是

78 78 23 12 10 03 1D 0F 17 12 C7 02 6B 6E 38 0C 39 71 00 0B 15 0E 01 CC 00 24 95 00 13 93 00 02 3D 7C 01 09 27 35 0D 0A

78 78为起始位 0D 0A是停止位

这可能是什么?我们将此应用程序基础用于许多协议并且它们有效 客服说可能是buffer连接的问题,不知道怎么回事

我可以处理起停位错位的问题。 但是消息预期的剧照要大得多。

文档 link 是 http://www.iconcox.in/images/tr-06-protocol.pdf

我们的代码

public abstract class ExtendedObjectDecoder implements ChannelUpstreamHandler {

    @SuppressWarnings("rawtypes")
    public void handleUpstream(
            ChannelHandlerContext ctx, ChannelEvent evt) throws Exception {
        if (!(evt instanceof MessageEvent)) {
            ctx.sendUpstream(evt);
            return;
        }

        MessageEvent e = (MessageEvent) evt;
        Object originalMessage = e.getMessage();

        System.out.println((String) originalMessage);
    }
}

流水线

@Override
    public void initTrackerServers(List<TrackerServer> serverList) {
        serverList.add(new TrackerServer(new ServerBootstrap()) {
            @Override
            protected void addSpecificHandlers(ChannelPipeline pipeline) {
                pipeline.addLast("frameDecoder", new CharacterDelimiterFrameDecoder(4096, "$", "[=11=]"));
                pipeline.addLast("stringEncoder", new StringEncoder());
                pipeline.addLast("stringDecoder", new StringDecoder());
                pipeline.addLast("objectDecoder", new EquipProtocolDecoder(EquipProtocol.this));
            }
        });
        serverList.add(new TrackerServer(new ConnectionlessBootstrap()) {
            @Override
            protected void addSpecificHandlers(ChannelPipeline pipeline) {
                pipeline.addLast("stringEncoder", new StringEncoder());
                pipeline.addLast("stringDecoder", new StringDecoder());
                pipeline.addLast("objectDecoder", new EquipProtocolDecoder(EquipProtocol.this));
            }
        });
    }

Netty 被设计为实际协议之上的简化层,因此从中继承了许多语义。

一个重要的语义是 TCP 是基于 stream 的协议,这意味着当您读取任何数据时,您会按顺序获取所有数据,但数据可能会分散在多个数据包上,或者所有数据可能在同一个数据包中。

您的代码没有正确处理这个问题,这导致了您的问题。

你的情况是远程发送 2 "protocol specific packets",读取后,这些数据包在 "tcp packets"

上混淆了
  • 之前:Aaaaa Bbbbb Cccc Ddddd
  • 之后:Aaa aaaBbbbCccccDdddd

您需要一些东西来重新构造原始消息。

目前,您的管道存在一个 StringDecoder,然后是您的业务处理程序。不幸的是,StringDecoder 不适合处理原始传入数据的任务,并且可以 actually cause corruption 处理原始传入数据。

幸运的是,"tr-06" 在每个数据包中都有一个长度字段。 (检测开始位和停止位的存在是不可靠的,因为它们也可能在数据包内)我们可以使用这个长度字段将 "stream of bytes" 再次解码为“数据包。让我们收集一些信息,看看文档协议:

iv.Data Packet Format

The communication is transferred asynchronously in bytes.The total length of packets is (10+N) Bytes

...

4.2.Packet Length

Length = Protocol Number + Information Content + Information Serial Number + Error Check, totally (5+N)Bytes, because the Information Content is a variable length field.

我们最终想对 "framing" 个字节使用 pre-build 解决方案,所以让我们阅读 DelimiterBasedFrameDecoder 的文档。我们可以看到“3 字节长度字段位于 5 个字节 header 末尾,不要剥离 header” 的示例非常符合我们的需求,但并不完全符合我们的需求。让我们根据这些例子计算出正确的数据:

我们在协议示例中看到,长度字段在起始位之后,起始位是2个字节长,所以偏移量变为2

lengthFieldOffset   = 2

根据文档,数据包长度字段为1,这意味着它支持长度最大为255字节的数据包

lengthFieldLength   = 1

这个比较难计算,Netty假设数据包长度是数据包中包含长度字段后的所有字节,所以我们从netties的角度计算一下,看看它是如何排列的。

网络:1 + N + 2 + 2 + 2 协议:1 N + 2 + 2

lengthAdjustment    = 2

我们需要将长度字段调整 2

我们不想剥离任何字节(如果您对起始位和停止位不感兴趣,您可能希望稍后启用它)

initialBytesToStrip = 0

让我们把这些放在一起:

        @Override
        protected void addSpecificHandlers(ChannelPipeline pipeline) {
            pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(255, 2, 1, 2, 0));
            pipeline.addLast("stringEncoder", new StringEncoder());
            pipeline.addLast("stringDecoder", new StringDecoder());
            pipeline.addLast("objectDecoder", new EquipProtocolDecoder(EquipProtocol.this));
        }