反序列化来自 Kafka 消费者的 Java 个对象
Deserializing Java objects from Kafka consumer
我有一个 Kafka 消费者,当前配置为:
kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
但我真正想要的是能够改用 Kryo 解串器:
public class KryoPOJODeserializer<T> implements Deserializer<T> {
private Kryo kryo = new Kryo();
@Override
public void configure(Map props, boolean isKey) {
kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
kryo.register( Arrays.asList( "" ).getClass(), new ArraysAsListSerializer() );
}
@Override
public T deserialize(String topic, byte[] data) {
// Deserialize the serialized object.
return kryo.readObject(new Input(data), T.class);
}
@Override
public void close() {
}
}
我想不通的是,是否可以为不同的主题重用同一个消费者(每个主题都有不同类型的 POJO)?如果我的消费者配置是:
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KryoPOJODeserializer.class.getName());
或者,我是否必须为每个主题设置一个单独的消费者?
或者,我是否必须删除我的反序列化器的泛型部分,总是 return 一个对象,并将该对象转换为客户端代码中的相关 POJO?类似于:
public class KryoPOJODeserializer implements Deserializer {
private Kryo kryo = new Kryo();
@Override
public void configure(Map props, boolean isKey) {
kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
kryo.register( Arrays.asList( "" ).getClass(), new ArraysAsListSerializer() );
}
@Override
public Object deserialize(String topic, byte[] data) {
// Deserialize the serialized object.
return kryo.readClassAndObject(new Input(new ByteArrayInputStream(data)));
}
@Override
public void close() {
}
}
后者可以,但感觉有点脏。
感谢任何建议!
您可以通过将 Deserializer
个实例直接传递给消费者来使用您的原始方法:
KafkaConsumer<String, Foo> consumer = new KafkaConsumer<>(properties,
new StringDeserializer(), new KryoPOJODeserializer(Foo.class));
如果您想为多个主题重复使用相同的传入数据类型,则可以使用单个使用者设置对这些主题的订阅。如果您想要不同的对象类型作为值,那么您将需要使用多个消费者。
否则你的第二种方法也是有效的。
我有一个 Kafka 消费者,当前配置为:
kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
但我真正想要的是能够改用 Kryo 解串器:
public class KryoPOJODeserializer<T> implements Deserializer<T> {
private Kryo kryo = new Kryo();
@Override
public void configure(Map props, boolean isKey) {
kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
kryo.register( Arrays.asList( "" ).getClass(), new ArraysAsListSerializer() );
}
@Override
public T deserialize(String topic, byte[] data) {
// Deserialize the serialized object.
return kryo.readObject(new Input(data), T.class);
}
@Override
public void close() {
}
}
我想不通的是,是否可以为不同的主题重用同一个消费者(每个主题都有不同类型的 POJO)?如果我的消费者配置是:
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KryoPOJODeserializer.class.getName());
或者,我是否必须为每个主题设置一个单独的消费者?
或者,我是否必须删除我的反序列化器的泛型部分,总是 return 一个对象,并将该对象转换为客户端代码中的相关 POJO?类似于:
public class KryoPOJODeserializer implements Deserializer {
private Kryo kryo = new Kryo();
@Override
public void configure(Map props, boolean isKey) {
kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
kryo.register( Arrays.asList( "" ).getClass(), new ArraysAsListSerializer() );
}
@Override
public Object deserialize(String topic, byte[] data) {
// Deserialize the serialized object.
return kryo.readClassAndObject(new Input(new ByteArrayInputStream(data)));
}
@Override
public void close() {
}
}
后者可以,但感觉有点脏。
感谢任何建议!
您可以通过将 Deserializer
个实例直接传递给消费者来使用您的原始方法:
KafkaConsumer<String, Foo> consumer = new KafkaConsumer<>(properties,
new StringDeserializer(), new KryoPOJODeserializer(Foo.class));
如果您想为多个主题重复使用相同的传入数据类型,则可以使用单个使用者设置对这些主题的订阅。如果您想要不同的对象类型作为值,那么您将需要使用多个消费者。
否则你的第二种方法也是有效的。