使用 Camel-Netty4 反序列化 Object

Deserialize an Object with Camel-Netty4

在客户端,我有一个 working-fine 套接字发送 Java Object:

Detail detail = new Detail(); //client object
Socket client = new Socket(host, port);
OutputStream out = client.getOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(out);
oos.writeObject(detail);
oos.flush();

注意:这是客户端套接字,因此无法更改。

在服务器端我有 camel-2.14.1 + Spring + netty4, 运行 in Jboss AS7,它有这个简单的消费规则:

from("netty4:tcp://0.0.0.0:6549?"+ 
     "keepAlive=true&sync=true&decoder=#detailDecoder&encoder=#detailEncoder")
  .log("server recive: ${body.getArea}")
  .processRef("someDetailProcessor") //loaded with Spring
  .log("response [${body.getArea}]");

我已经意识到我不能使用 StringDecoder/StringEncoder 到 deserialize/serialize Object,因为这个编码器正在等待更多 plain-text 之类的消息。

由于这个原因,我最终使用 ObjectDecoder/ObjectEncoder 像这样注入它们:

<bean id="objDecoder" class="org.apache.camel.component.netty4.ChannelHandlerFactories" factory-method="newObjectDecoder">
    <constructor-arg name="protocol" value="tcp"/>
</bean>
<bean id="objEncoder" class="org.apache.camel.component.netty4.ChannelHandlerFactories" factory-method="newObjectEncoder">
    <constructor-arg name="protocol" value="tcp"/>
</bean>

但是我的 object 超出了最大帧长度,抛出了异常 -

Closing channel as an exception was thrown from Netty. Caused by: [io.netty.handler.codec.TooLongFrameException - Adjusted frame length exceeds 1048576: 2901213193 - discarded]

我已经尝试设置 LengthFieldBasedFrameDecoder(它是 ObjectDecoder 的超级 class 并且还需要一个表示消息长度的整数 header 字段 body,所以没有用)。我也以不同的方式使用了 ByteToMessageDocoder(通过创建我自己的 class 并尝试将 ByteBuf 解码为细节)但一点也不幸运。

有人知道如何实现吗?我只需要收到一个简单的 object,应该不会那么难吧?

好吧,在为此苦苦挣扎之后,我想出了一个解决方法:

我决定使用 ReplayingDecoder 而不是 ObjectDecoderByteToMessageDecoder 因为:

  1. ObjectDecoderObjectOutputStream (quote)
  2. 不兼容
  3. ByteToMessaDecoder
    • 在执行实际的读取操作之前,您需要检查输入 ByteBuf 上是否有足够的字节可供读取。 (引用自 Netty in Action)
    • 应通过添加另一个解码器在管道中更早地处理帧检测。 (quote)
    • 一些方法如ByteBuf.readBytes(int)如果returned缓冲区没有释放或添加到out列表中会导致内存泄漏。 (quote)
    • 最后是因为与 ReplayingDecoder 相比,它在测试期间给了我更差的平均时间。可能是因为它使用了 ByteBuf.

ReplayingDecoder 也有一些限制(均引用自 Netty in Action):

  1. 并非支持 ByteBuf 上的所有操作,如果调用不支持的操作,它将抛出 UnreplayableOperationException
  2. ByteBuf.readableBytes() 不会 return 大多数时候你所期望的。

所以我不知道消息的结尾是什么样子的(如果我愿意的话DelimiterBasedFrameDecoder), neither the length of every arrival package (as it's said in this response),然后我结束了对字节进行分组,做这样的事情:

// to collect the object bytes
private ByteArrayOutputStream baos = new ByteArrayOutputStream(); 

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

    if(in.isReadable()){

        // wrIdx - rdIdx instead of .readableBytes()
        byte[] frame = new byte[in.writerIndex()-in.readerIndex()];

        // I read just what I get, so the signal is never thrown
        in.readBytes(frame);

        // collecting bytes
        baos.write(frame);

        // it'll achieve this only when all the bytes from
        // the incoming object have been collected
        try{
            out.add(SerializationUtils.deserialize(baos.toByteArray()));
        }
        catch(Exception e){}
    }
}

待办事项

  • 我可能需要比 org.apache.commons.lang.SerializationUtils 更好的东西来反序列化
  • 如果反序列化后的对象不是我需要的类型怎么办。我怎样才能丢弃它?

希望这可以帮助到别人!