如何从 InputStream 读取使用 Chronicle Wire 序列化的数据?

How to read data serialized with Chronicle Wire from InputStream?

一些数据通过 Chronicle Wire 序列化到输出流。

Object m = ... ;
OutputStream out = ... ;

WireType.RAW                               //
        .apply(Bytes.elasticByteBuffer())  //
        .getValueOut().object(m)           //
        .bytes().copyTo(out)
;

我想从输入流中取回它们。

InputStream in = ... ;

WireType.RAW
        .apply(Bytes.elasticByteBuffer())
        .getValueIn()
        ???
;

Object m = ???; // How to initialize m ?

如何从 in 读取我的初始对象 m

假设您对数据的长度有所了解并一次性读取它。还假设您希望重用缓冲区以避免产生垃圾。为了最大限度地减少延迟数据,通常读取 to/from NIO 通道。

我提出了创建此示例的问题,改进对 Input/OutputStream 和非 Marshallable 对象 https://github.com/OpenHFT/Chronicle-Wire/issues/111

的支持

这应该可以有效地完成您想要的操作,而不会每次都产生垃圾。

package net.openhft.chronicle.wire;

import net.openhft.chronicle.bytes.Bytes;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;

public class WireToOutputStream {
    private final Bytes<ByteBuffer> bytes = Bytes.elasticHeapByteBuffer(128);
    private final Wire wire;
    private final DataOutputStream dos;

    public WireToOutputStream(WireType wireType, OutputStream os) {
        wire = wireType.apply(bytes);
        dos = new DataOutputStream(os);
    }

    public Wire getWire() {
        wire.clear();
        return wire;
    }

    public void flush() throws IOException {
        int length = Math.toIntExact(bytes.readRemaining());
        dos.writeInt(length);
        dos.write(bytes.underlyingObject().array(), 0, length);
    }
}

package net.openhft.chronicle.wire;

import net.openhft.chronicle.bytes.Bytes;

import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.StreamCorruptedException;
import java.nio.ByteBuffer;

public class InputStreamToWire {
    private final Bytes<ByteBuffer> bytes = Bytes.elasticHeapByteBuffer(128);
    private final Wire wire;
    private final DataInputStream dis;

    public InputStreamToWire(WireType wireType, InputStream is) {
        wire = wireType.apply(bytes);
        dis = new DataInputStream(is);
    }

    public Wire readOne() throws IOException {
        wire.clear();
        int length = dis.readInt();
        if (length < 0) throw new StreamCorruptedException();
        bytes.ensureCapacity(length);
        byte[] array = bytes.underlyingObject().array();
        dis.readFully(array, 0, length);
        bytes.readPositionRemaining(0, length);
        return wire;
    }
}

然后您可以执行以下操作

package net.openhft.chronicle.wire;

import net.openhft.chronicle.core.util.ObjectUtils;
import org.junit.Test;

import java.io.IOException;
import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;

import static org.junit.Assert.assertEquals;

public class WireToOutputStreamTest {
    @Test
    public void testVisSocket() throws IOException {
        ServerSocket ss = new ServerSocket(0);
        Socket s = new Socket("localhost", ss.getLocalPort());
        Socket s2 = ss.accept();
        WireToOutputStream wtos = new WireToOutputStream(WireType.RAW, s.getOutputStream());

        Wire wire = wtos.getWire();
        AnObject ao = new AnObject();
        ao.value = 12345;
        ao.text = "Hello";
        // write the type is needed.
        wire.getValueOut().typeLiteral(AnObject.class);
        Wires.writeMarshallable(ao, wire);
        wtos.flush();

        InputStreamToWire istw = new InputStreamToWire(WireType.RAW, s2.getInputStream());
        Wire wire2 = istw.readOne();
        Class type = wire2.getValueIn().typeLiteral();
        Object ao2 = ObjectUtils.newInstance(type);
        Wires.readMarshallable(ao2, wire2, true);
        System.out.println(ao2);
        ss.close();
        s.close();
        s2.close();
        assertEquals(ao.toString(), ao2.toString());
    }

    public static class AnObject implements Serializable {
        long value;
        String text;

        @Override
        public String toString() {
            return "AnObject{" +
                    "value=" + value +
                    ", text='" + text + '\'' +
                    '}';
        }
    }
}

示例代码

 // On Sender side
 Object m = ... ;
 OutputStream out = ... ;

 WireToOutputStream wireToOutputStream = new 
 WireToOutputStream(WireType.TEXT, out);

 Wire wire = wireToOutputStream.getWire();
 wire.getValueOut().typeLiteral(m.getClass());
 Wires.writeMarshallable(m, wire);
 wireToOutputStream.flush();

 // On Receiver side
 InputStream in = ... ;

 InputStreamToWire inputStreamToWire = new InputStreamToWire(WireType.TEXT, in);

 Wire wire2 = inputStreamToWire.readOne();
 Class type = wire2.getValueIn().typeLiteral();
 Object m = ObjectUtils.newInstance(type);
 Wires.readMarshallable(m, wire2, true);

如果您的 DTO 扩展 Marshallable,此代码会简单得多,但无论您是否扩展接口,这都将有效。也就是说,您不需要扩展 Serializable。

此外,如果您知道类型是什么,则无需每次都写。

上面的助手类已添加到最新的SNAPSHOT