如何在 Kafka 中为字节创建通用反序列化器?
How to create a generic deserializer for bytes in Kafka?
我有一个包含多种消息类型的原型文件。我想为这些消息创建一个通用的反序列化器。
我是否需要发送带有 Kafka 消息头的这些消息类型,以便消费者可以使用此类型信息反序列化这些消息?这是最佳做法还是有其他解决方案?
反序列化方法示例;
public Object deserialize(String topic, Headers headers, byte[] data) {
if(headers[0].equals("Person")){
return Person.parseFrom(data);
} else if....
}
我的原型文件;
message Person {
uint64 number = 1;
string name = 2;
}
message Event {
string msg = 1;
code = 2;
}
message Data {
string inf = 1;
string desc = 2;
}
....
Do I need to send these message's types with Kafka message header so
Consumer can deserialize these messages with this type information?
如果您的 KafkaConsumer
只使用来自特定主题且只有特定类型 (class) 消息的消息,那么您可以在反序列化器配置中这样配置 class例如,您的配置中的 value.class
、key.class
等,您可以使用 configs.get("value.class")
或 configs.get("key.class")
在 Deserializer 中使用 configure(),然后将它们存储在成员变量。
void configure(java.util.Map<java.lang.String,?> configs,
boolean isKey)
如果您的主题包含不同类型的消息,或者您的消费者订阅了不同的主题,每个主题都包含不同类型的消息,那么将 class 存储在 Headers 中应该是合适的。
另一种方法是编写包装器 class。
class MessageWrapper {
private Class messageClass;
private byte[] messageData;
ProtobufSchema schema;
}
然后在数据中你可以 de-serialize MessageWrapper
。这里的 messageData
类型可以是 Person
、Data
或 Event
,而 messageClass
应该可以帮助您进行解析。
例如,
mapper.readerFor(messageWrapper.getMessageClass())
.with(messageWrapper.getSchema())
.readValue(messageWrapper.getMessageData());
一旦你得到 object,你可以检查它是 instanceof
Person
或 Event
或 Data
您还可以查看 Generating Protobuf schema from POJO definition 并省略 MessageWrapper
中的 schema
字段
片段
ProtobufMapper mapper = new ProtobufMapper()
ProtobufSchema schemaWrapper = mapper.generateSchemaFor(messageWrapper.getMessageClass())
NativeProtobufSchema nativeProtobufSchema = schemaWrapper.getSource();
String asProtofile = nativeProtobufSchema.toString();
我有一个包含多种消息类型的原型文件。我想为这些消息创建一个通用的反序列化器。
我是否需要发送带有 Kafka 消息头的这些消息类型,以便消费者可以使用此类型信息反序列化这些消息?这是最佳做法还是有其他解决方案?
反序列化方法示例;
public Object deserialize(String topic, Headers headers, byte[] data) {
if(headers[0].equals("Person")){
return Person.parseFrom(data);
} else if....
}
我的原型文件;
message Person {
uint64 number = 1;
string name = 2;
}
message Event {
string msg = 1;
code = 2;
}
message Data {
string inf = 1;
string desc = 2;
}
....
Do I need to send these message's types with Kafka message header so Consumer can deserialize these messages with this type information?
如果您的 KafkaConsumer
只使用来自特定主题且只有特定类型 (class) 消息的消息,那么您可以在反序列化器配置中这样配置 class例如,您的配置中的 value.class
、key.class
等,您可以使用 configs.get("value.class")
或 configs.get("key.class")
在 Deserializer 中使用 configure(),然后将它们存储在成员变量。
void configure(java.util.Map<java.lang.String,?> configs,
boolean isKey)
如果您的主题包含不同类型的消息,或者您的消费者订阅了不同的主题,每个主题都包含不同类型的消息,那么将 class 存储在 Headers 中应该是合适的。
另一种方法是编写包装器 class。
class MessageWrapper {
private Class messageClass;
private byte[] messageData;
ProtobufSchema schema;
}
然后在数据中你可以 de-serialize MessageWrapper
。这里的 messageData
类型可以是 Person
、Data
或 Event
,而 messageClass
应该可以帮助您进行解析。
例如,
mapper.readerFor(messageWrapper.getMessageClass())
.with(messageWrapper.getSchema())
.readValue(messageWrapper.getMessageData());
一旦你得到 object,你可以检查它是 instanceof
Person
或 Event
或 Data
您还可以查看 Generating Protobuf schema from POJO definition 并省略 MessageWrapper
schema
字段
片段
ProtobufMapper mapper = new ProtobufMapper()
ProtobufSchema schemaWrapper = mapper.generateSchemaFor(messageWrapper.getMessageClass())
NativeProtobufSchema nativeProtobufSchema = schemaWrapper.getSource();
String asProtofile = nativeProtobufSchema.toString();