kafka 流 joinWindow 并自动创建一个 avrò 模式
kafka streams joinWindow and auto create a avrò schema
当我使用 kafka 流加入时 windows,自动创建一个 avro 模式
像这样
" * KSTREAM-JOINTHIS-0000000125-store-changelog-value"**
我想知道,为什么这样可以创建avro schema?
有我的代码:
Serde<FactCallProviderMessage> specificAvroSerdeForCallProviderMessage = ProcessStreamUtil.getAndRegisterSerde(isKeySerde);
KStream<String, FactCallProviderMessage> callProviderMessageKStream = builder.stream(
callProviderMessageTopic /* input topic */,
Consumed.with(Serdes.String(), specificAvroSerdeForCallProviderMessage));
public static <T extends SpecificRecord> Serde<T> getAndRegisterSerde(boolean isKeySerde) {
Serde<T> specificAvroSerde = new SpecificAvroSerde<T>();
specificAvroSerde.configure(Collections.singletonMap(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
MyConfig.getSchemaRegistryUrl()),
isKeySerde);
return specificAvroSerde;
}
Kafka Stream 在后台为 join 等有状态操作符创建所谓的 changelog 主题,以在 Kafka 集群中以容错方式备份状态。
如果您对输入消息使用 Avro 格式,您的输入消息将以 Avro 格式写入此更新日志主题。因此,写入时将为此更新日志主题注册相应的模式。
当我使用 kafka 流加入时 windows,自动创建一个 avro 模式
像这样 " * KSTREAM-JOINTHIS-0000000125-store-changelog-value"**
我想知道,为什么这样可以创建avro schema?
有我的代码:
Serde<FactCallProviderMessage> specificAvroSerdeForCallProviderMessage = ProcessStreamUtil.getAndRegisterSerde(isKeySerde);
KStream<String, FactCallProviderMessage> callProviderMessageKStream = builder.stream(
callProviderMessageTopic /* input topic */,
Consumed.with(Serdes.String(), specificAvroSerdeForCallProviderMessage));
public static <T extends SpecificRecord> Serde<T> getAndRegisterSerde(boolean isKeySerde) {
Serde<T> specificAvroSerde = new SpecificAvroSerde<T>();
specificAvroSerde.configure(Collections.singletonMap(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
MyConfig.getSchemaRegistryUrl()),
isKeySerde);
return specificAvroSerde;
}
Kafka Stream 在后台为 join 等有状态操作符创建所谓的 changelog 主题,以在 Kafka 集群中以容错方式备份状态。
如果您对输入消息使用 Avro 格式,您的输入消息将以 Avro 格式写入此更新日志主题。因此,写入时将为此更新日志主题注册相应的模式。