Kafka Consumer 中反序列化 protostuff 对象的问题
Issue in deserialize protostuff object in Kafka Consumer
在 Kafka Consumer 中将 byte[] 反序列化为 protostuff 对象时出现以下异常
java.lang.NegativeArraySizeException
at com.dyuproject.protostuff.GraphIOUtil.mergeDelimitedFrom(GraphIOUtil.java:209)
at com.gme.protocols.protostuff.GmeTrade.readExternal(GmeTrade.java:2772)
at java.io.ObjectInputStream.readExternalData(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.readObject(Unknown Source)
使用以下代码将 protostuff 对象转换为 byte[]。
public static byte[] toBytes(Object o)
{
try
{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(o);
oos.close();
byte[] b = baos.toByteArray();
return b;
}
catch (IOException e)
{
return new byte[0];
}
}
使用主题为 'XX' 的 Kafka 生产者发送那个 byte[],其中 byte[] 长度仅为 240。
使用 Kafka 消费者收到该记录。 record.value().length (byte[]) 长度与我从生产者端发送的相同 240。
使用以下代码将该字节[]反序列化为对象。
public static Object fromBytes(byte[] bytes)
{
try
{
return new ObjectInputStream(new ByteArrayInputStream(bytes)).readObject();
}
catch (Exception e)
{
e.printStackTrace();
return null;
}
}
出现上述异常。我做错了什么?
使用 kafka_2.11-0.9.0.0 供您参考。还有其他需要的东西吗?
我找到了解决方案。 toBytes 和 fromBytes 中存在问题。
需要使用ProtostuffIOUtil.toByteArray方法将其转换为byte[]。
序列化
public static byte[] toBytes(Foo o)
{
LinkedBuffer BUFFER = LinkedBuffer.allocate(1024*1024);
Schema<Foo> SCHEMA = Foo.getSchema();
return ProtostuffIOUtil.toByteArray(o, SCHEMA, BUFFER);
}
再次需要使用 ProtostuffIOUtil.mergeFrom 方法将 byte[] 转换为对象。
反序列化
public static Foo fromBytes(byte[] bytes)
{
Foo tmp = Foo.getSchema().newMessage();
ProtostuffIOUtil.mergeFrom(bytes, tmp, Foo.getSchema());
return tmp;
}
注意:Serialization/Deserialization 和 ObjectOutputStream/ObjectInputStream 不适用于 protostuff 对象。
在 Kafka Consumer 中将 byte[] 反序列化为 protostuff 对象时出现以下异常
java.lang.NegativeArraySizeException
at com.dyuproject.protostuff.GraphIOUtil.mergeDelimitedFrom(GraphIOUtil.java:209)
at com.gme.protocols.protostuff.GmeTrade.readExternal(GmeTrade.java:2772)
at java.io.ObjectInputStream.readExternalData(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.readObject(Unknown Source)
使用以下代码将 protostuff 对象转换为 byte[]。
public static byte[] toBytes(Object o)
{
try
{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(o);
oos.close();
byte[] b = baos.toByteArray();
return b;
}
catch (IOException e)
{
return new byte[0];
}
}
使用主题为 'XX' 的 Kafka 生产者发送那个 byte[],其中 byte[] 长度仅为 240。 使用 Kafka 消费者收到该记录。 record.value().length (byte[]) 长度与我从生产者端发送的相同 240。
使用以下代码将该字节[]反序列化为对象。
public static Object fromBytes(byte[] bytes)
{
try
{
return new ObjectInputStream(new ByteArrayInputStream(bytes)).readObject();
}
catch (Exception e)
{
e.printStackTrace();
return null;
}
}
出现上述异常。我做错了什么? 使用 kafka_2.11-0.9.0.0 供您参考。还有其他需要的东西吗?
我找到了解决方案。 toBytes 和 fromBytes 中存在问题。 需要使用ProtostuffIOUtil.toByteArray方法将其转换为byte[]。
序列化
public static byte[] toBytes(Foo o)
{
LinkedBuffer BUFFER = LinkedBuffer.allocate(1024*1024);
Schema<Foo> SCHEMA = Foo.getSchema();
return ProtostuffIOUtil.toByteArray(o, SCHEMA, BUFFER);
}
再次需要使用 ProtostuffIOUtil.mergeFrom 方法将 byte[] 转换为对象。
反序列化
public static Foo fromBytes(byte[] bytes)
{
Foo tmp = Foo.getSchema().newMessage();
ProtostuffIOUtil.mergeFrom(bytes, tmp, Foo.getSchema());
return tmp;
}
注意:Serialization/Deserialization 和 ObjectOutputStream/ObjectInputStream 不适用于 protostuff 对象。