Java - 通过特定 AvroSerde 的输入动态更改 Kafka 数据类型
Java - Dynamically Change Kafka Data Type By Input for SpecificAvroSerde
import KafkaDataType;
...
...
final Serde<KafkaDataType> eventSchema = new SpecificAvroSerde<>();
...
...
StreamsBuilder builder = new StreamsBuilder();
KStream<String, KafkaDataType> eventStream = builder.stream(STREAM_TOPIC);
我的 KafkaDataType
是一个从关联的 .avsc
文件自动生成的 avro 模式。我的理解是 KafkaDataType
必须预先定义。但是,是否存在允许动态或通用 KafkaDataType
的现有方法?如果是这样,将不胜感激示例代码块。
目标是让 KafkaDataType
成为通用数据类型,这样具有不同 avro 模式的不同 Kafka 流可以交换进出并由 Java 代码处理。目前,对于每个不同的 avro 架构,我需要将 KafkaDataType
更改为特定的 Java 从 .avsc
架构自动生成 类。
如果事情需要更多说明,请告诉我。
My understanding is that KafkaDataType must be pre-defined.
对于使用 SpecificRecord / serde,是
However are there existing methods that allow for a dynamic or generic KafkaDataType?
Avro SpecificRecord 类(例如那些生成的)从 GenericRecord
扩展而来,您可以将其与 GenericSerde 一起使用而不是您的特定类型来处理同一流中的多种类型的记录
import KafkaDataType;
...
...
final Serde<KafkaDataType> eventSchema = new SpecificAvroSerde<>();
...
...
StreamsBuilder builder = new StreamsBuilder();
KStream<String, KafkaDataType> eventStream = builder.stream(STREAM_TOPIC);
我的 KafkaDataType
是一个从关联的 .avsc
文件自动生成的 avro 模式。我的理解是 KafkaDataType
必须预先定义。但是,是否存在允许动态或通用 KafkaDataType
的现有方法?如果是这样,将不胜感激示例代码块。
目标是让 KafkaDataType
成为通用数据类型,这样具有不同 avro 模式的不同 Kafka 流可以交换进出并由 Java 代码处理。目前,对于每个不同的 avro 架构,我需要将 KafkaDataType
更改为特定的 Java 从 .avsc
架构自动生成 类。
如果事情需要更多说明,请告诉我。
My understanding is that KafkaDataType must be pre-defined.
对于使用 SpecificRecord / serde,是
However are there existing methods that allow for a dynamic or generic KafkaDataType?
Avro SpecificRecord 类(例如那些生成的)从 GenericRecord
扩展而来,您可以将其与 GenericSerde 一起使用而不是您的特定类型来处理同一流中的多种类型的记录