卡夫卡流:POJO serialization/deserialization

Kafka Streams: POJO serialization/deserialization

我们可以使用 Kafka Streams 中的什么 class/method 来 serialize/deserialize Java 对象到字节数组,反之亦然?以下 link 建议使用 ByteArrayOutputStream 和 ObjectOutputStream 但它们不是线程安全的。

还有另一个选项可以使用 ObjectMapper、ObjectReader(用于线程安全),但这是从 POJO -> JSON -> bytearray 进行转换。似乎这个选项是一个广泛的选项。想检查是否有直接的方法将对象转换为字节数组,反之亦然,这是线程安全的。请推荐

import org.apache.kafka.common.serialization.Serializer;
public class HouseSerializer<T> implements Serializer<T>{
    private Class<T> tClass;
    public HouseSerializer(){

    }

    @SuppressWarnings("unchecked")
    @Override
    public void configure(Map configs, boolean isKey) {
        tClass = (Class<T>) configs.get("POJOClass");       
    }

    @Override
    public void close() {
    }

    @Override
    public byte[] serialize(String topic, T data) {
        //Object serialization to be performed here
        return null;
    }
}


注意:Kafka 版本 - 0.10.1

Wanted to check if there is a direct way to translate object into bytearray

我建议您考虑使用 Avro serialization with the Confluent Schema Registry, if possible, but not required. JSON is a good fall back, but takes more space "on the wire", and so MsgPack 作为替代方案。

See Avro code example here

以上示例使用 avro-maven-pluginsrc/main/resources/avro 模式文件生成 LogLine class。


否则how to serialize your object into a byte array就看你的了,比如一个String一般打包成

[(length of string) (UTF8 encoded bytes)]

而布尔值是单个 0 或 1 位

which is threadsafe

我理解这种担忧,但您通常不会在线程之间共享反序列化数据。你send/read/process每条消息独立一个。