如何在 Kafka 中为字节创建通用反序列化器?

How to create a generic deserializer for bytes in Kafka?

我有一个包含多种消息类型的原型文件。我想为这些消息创建一个通用的反序列化器。

我是否需要发送带有 Kafka 消息头的这些消息类型,以便消费者可以使用此类型信息反序列化这些消息?这是最佳做法还是有其他解决方案?

反序列化方法示例;

public Object deserialize(String topic, Headers headers, byte[] data) {
    if(headers[0].equals("Person")){
        return Person.parseFrom(data);
    } else if....
}

我的原型文件;

message Person {
    uint64 number = 1;
    string name = 2;
}

message Event {
    string msg = 1;
    code = 2;
}

message Data {
    string inf = 1;
    string desc = 2;
}

....

Do I need to send these message's types with Kafka message header so Consumer can deserialize these messages with this type information?

如果您的 KafkaConsumer 只使用来自特定主题且只有特定类型 (class) 消息的消息,那么您可以在反序列化器配置中这样配置 class例如,您的配置中的 value.classkey.class 等,您可以使用 configs.get("value.class")configs.get("key.class") 在 Deserializer 中使用 configure(),然后将它们存储在成员变量。

void configure(java.util.Map<java.lang.String,?> configs,
               boolean isKey)

如果您的主题包含不同类型的消息,或者您的消费者订阅了不同的主题,每个主题都包含不同类型的消息,那么将 class 存储在 Headers 中应该是合适的。

另一种方法是编写包装器 class。

class MessageWrapper {
   private Class messageClass;
   private byte[] messageData;
   ProtobufSchema schema;
}

然后在数据中你可以 de-serialize MessageWrapper。这里的 messageData 类型可以是 PersonDataEvent,而 messageClass 应该可以帮助您进行解析。 例如,

mapper.readerFor(messageWrapper.getMessageClass())
   .with(messageWrapper.getSchema())
   .readValue(messageWrapper.getMessageData());

一旦你得到 object,你可以检查它是 instanceof PersonEventData

您还可以查看 Generating Protobuf schema from POJO definition 并省略 MessageWrapper

中的 schema 字段

片段

ProtobufMapper mapper = new ProtobufMapper()
ProtobufSchema schemaWrapper = mapper.generateSchemaFor(messageWrapper.getMessageClass())
NativeProtobufSchema nativeProtobufSchema = schemaWrapper.getSource();

String asProtofile = nativeProtobufSchema.toString();