如何在 Kafka 流中使用带有 KTable 的自定义序列化程序?
How to use custom serializers with KTable in Kafka streams?
当我将 groupBy
引入我的 KStream
时,我在 KStream
到 KTable
的序列化过程中遇到了一些错误。据我了解,一旦你在 KStream
上有一个 aggregate
或 reduce
,Kafka 会尝试将其转换为 KTable
由于必要的洗牌,因此 Kafka 必须再次序列化记录。所以,我原来的 KStream
只是像这样将记录从 JSON
映射到 AVRO
,并且工作正常。
@StreamListener("notification-input-channel")
@SendTo("notification-output-avro-channel")
public KStream<String, NotificationAvro> process(KStream<String, PosInvoice> input) {
log.info("received PosInvoice JSON: {}", input);
KStream<String, NotificationAvro> notificationAvroKStream = input
.filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
.mapValues(v -> recordBuilder.getNotificationAvro(v));
notificationAvroKStream.foreach((k, v) -> log.info(String.format("Notification avro - key: %s, value: %s", k, v)));
return notificationAvroKStream;
}
然后我介绍了 groupByKey
和 reduce
我意识到它转换为 KTable
因此它需要 Serdes
在 application.yaml
文件.但不幸的是我无法配置默认的 Serdes
因为我有其他类型的序列化。因此我决定在 KTable
拓扑上连载。我正在尝试实施此解决方案 .
我尝试使用自定义 serdes 实现的部分代码无法正常工作 (Materialized.with(CustomSerdes.String(), CustomSerdes.NotificationAvro())
)。首先,我认为我不需要 KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("mystore");
,但如果没有它,它也不起作用,我找不到一个不是 KeyValueBytes...
的物化,我可以在其中定义我的 serdes CustomSerdes.String(), CustomSerdes.NotificationAvro()
。
根据我在link上提到的答案,他们也使用了final StreamsBuilder builder = new StreamsBuilder();
。但是因为我使用 spring-kafka
计算它,所以我没有这个选项,或者如果我有我不知道如何使用。
@Service
@Slf4j
@EnableBinding(PosListenerJsonAvroBinding.class)
public class NotificationJsonAvroProcessorService {
@Autowired
RecordBuilder recordBuilder;
@StreamListener("notification-input-channel")
@SendTo("notification-output-avro-channel")
public KStream<String, NotificationAvro> process(KStream<String, PosInvoice> input) {
log.info("received PosInvoice JSON: {}", input);
KStream<String, NotificationAvro> notificationAvroKStream = input
.filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
.map((k, v) -> new KeyValue<>(v.getCustomerCardNo(), recordBuilder.getNotificationAvro(v)));
notificationAvroKStream.foreach((k, v) -> log.info(String.format("Notification avro - key: %s, value: %s", k, v)));
// *********************************************
// IS THERE A KeyValueStoreSupplier THAT I CAN PASS ALSO MY SERDES INSTEAD OF Bytes?
// KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("mystore");
KTable<String, NotificationAvro> convertedTable = notificationAvroKStream
.toTable(
// *********************************************
// HOW TO MATERIALIZE KTABLE VALUES WITH SERDES ?
Materialized
// .as(storeSupplier) // this is not necessary
.with(CustomSerdes.String(), CustomSerdes.NotificationAvro())
// *********************************************
)
.groupBy((cardNo, notificationAvro) -> KeyValue.pair(cardNo, notificationAvro))
.reduce(
(aggValue, newValue) -> {
newValue.setTotalLoyaltyPoints(newValue.getEarnedLoyaltyPoints() + aggValue.getTotalLoyaltyPoints());
return newValue;
},
(aggValue, oldValue) -> oldValue
);
KStream<String, NotificationAvro> notificationAggAvroKStream = convertedTable.toStream();
notificationAggAvroKStream.foreach((k, v) -> log.info(String.format("Notification agg avro - key: %s, value: %s", k, v)));
return notificationAggAvroKStream;
}
}
自定义 serdes:
@Service
public class CustomSerdes extends Serdes {
private static final String schema_registry_url = "http://localhost:8081";
private final static Map<String, String> serdeConfig = Collections
.singletonMap("schema.registry.url", schema_registry_url);
public static Serde<NotificationAvro> NotificationAvro() {
final Serde<NotificationAvro> notificationAvroSerde = new SpecificAvroSerde<>();
notificationAvroSerde.configure(serdeConfig, false);
return notificationAvroSerde;
}
}
和错误:
Exception in thread
"NotificationJsonAvroProcessorService-process-applicationId-3e262d96-19ca-438d-a2b8-9d3c2e9bb4ab-StreamThread-1"
org.apache.kafka.streams.errors.StreamsException: ClassCastException
while producing data to topic
NotificationJsonAvroProcessorService-process-applicationId-KTABLE-AGGREGATE-STATE-STORE-0000000010-repartition.
A serializer (key:
org.apache.kafka.common.serialization.StringSerializer / value:
org.apache.kafka.streams.kstream.internals.ChangedSerializer) is not
compatible to the actual key or value type (key type: java.lang.String
/ value type: org.apache.kafka.streams.kstream.internals.Change).
Change the default Serdes in StreamConfig or provide correct Serdes
via method parameters (for example if using the DSL, #to(String topic, Produced<K, V> produced)
with
Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))
).
...
...
Caused by: java.lang.ClassCastException: class
com.github.felipegutierrez.explore.spring.model.NotificationAvro
cannot be cast to class java.lang.String
(com.github.felipegutierrez.explore.spring.model.NotificationAvro is
in unnamed module of loader 'app'; java.lang.String is in module
java.base of loader 'bootstrap')
所以,我通过阅读 解决了问题,它使用了已弃用的 .groupByKey(...)
和 Serialized.with(...)
。我正在使用 Grouped.with(CustomSerdes.String(), CustomSerdes.NotificationAvro())
.
KStream<String, NotificationAvro> notificationAvroKStream = input
.filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
.map((k, v) -> new KeyValue<>(v.getCustomerCardNo(), recordBuilder.getNotificationAvro(v)))
.groupByKey(Grouped.with(CustomSerdes.String(), CustomSerdes.NotificationAvro()))
.reduce((aggValue, newValue) -> {
newValue.setTotalLoyaltyPoints(newValue.getEarnedLoyaltyPoints() + aggValue.getTotalLoyaltyPoints());
return newValue;
})
.toStream();
notificationAvroKStream.foreach((k, v) -> log.info(String.format("Notification avro agg - key: %s, value: %s", k, v)));
return notificationAvroKStream;
当我将 groupBy
引入我的 KStream
时,我在 KStream
到 KTable
的序列化过程中遇到了一些错误。据我了解,一旦你在 KStream
上有一个 aggregate
或 reduce
,Kafka 会尝试将其转换为 KTable
由于必要的洗牌,因此 Kafka 必须再次序列化记录。所以,我原来的 KStream
只是像这样将记录从 JSON
映射到 AVRO
,并且工作正常。
@StreamListener("notification-input-channel")
@SendTo("notification-output-avro-channel")
public KStream<String, NotificationAvro> process(KStream<String, PosInvoice> input) {
log.info("received PosInvoice JSON: {}", input);
KStream<String, NotificationAvro> notificationAvroKStream = input
.filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
.mapValues(v -> recordBuilder.getNotificationAvro(v));
notificationAvroKStream.foreach((k, v) -> log.info(String.format("Notification avro - key: %s, value: %s", k, v)));
return notificationAvroKStream;
}
然后我介绍了 groupByKey
和 reduce
我意识到它转换为 KTable
因此它需要 Serdes
在 application.yaml
文件.但不幸的是我无法配置默认的 Serdes
因为我有其他类型的序列化。因此我决定在 KTable
拓扑上连载。我正在尝试实施此解决方案
我尝试使用自定义 serdes 实现的部分代码无法正常工作 (Materialized.with(CustomSerdes.String(), CustomSerdes.NotificationAvro())
)。首先,我认为我不需要 KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("mystore");
,但如果没有它,它也不起作用,我找不到一个不是 KeyValueBytes...
的物化,我可以在其中定义我的 serdes CustomSerdes.String(), CustomSerdes.NotificationAvro()
。
根据我在link上提到的答案,他们也使用了final StreamsBuilder builder = new StreamsBuilder();
。但是因为我使用 spring-kafka
计算它,所以我没有这个选项,或者如果我有我不知道如何使用。
@Service
@Slf4j
@EnableBinding(PosListenerJsonAvroBinding.class)
public class NotificationJsonAvroProcessorService {
@Autowired
RecordBuilder recordBuilder;
@StreamListener("notification-input-channel")
@SendTo("notification-output-avro-channel")
public KStream<String, NotificationAvro> process(KStream<String, PosInvoice> input) {
log.info("received PosInvoice JSON: {}", input);
KStream<String, NotificationAvro> notificationAvroKStream = input
.filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
.map((k, v) -> new KeyValue<>(v.getCustomerCardNo(), recordBuilder.getNotificationAvro(v)));
notificationAvroKStream.foreach((k, v) -> log.info(String.format("Notification avro - key: %s, value: %s", k, v)));
// *********************************************
// IS THERE A KeyValueStoreSupplier THAT I CAN PASS ALSO MY SERDES INSTEAD OF Bytes?
// KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("mystore");
KTable<String, NotificationAvro> convertedTable = notificationAvroKStream
.toTable(
// *********************************************
// HOW TO MATERIALIZE KTABLE VALUES WITH SERDES ?
Materialized
// .as(storeSupplier) // this is not necessary
.with(CustomSerdes.String(), CustomSerdes.NotificationAvro())
// *********************************************
)
.groupBy((cardNo, notificationAvro) -> KeyValue.pair(cardNo, notificationAvro))
.reduce(
(aggValue, newValue) -> {
newValue.setTotalLoyaltyPoints(newValue.getEarnedLoyaltyPoints() + aggValue.getTotalLoyaltyPoints());
return newValue;
},
(aggValue, oldValue) -> oldValue
);
KStream<String, NotificationAvro> notificationAggAvroKStream = convertedTable.toStream();
notificationAggAvroKStream.foreach((k, v) -> log.info(String.format("Notification agg avro - key: %s, value: %s", k, v)));
return notificationAggAvroKStream;
}
}
自定义 serdes:
@Service
public class CustomSerdes extends Serdes {
private static final String schema_registry_url = "http://localhost:8081";
private final static Map<String, String> serdeConfig = Collections
.singletonMap("schema.registry.url", schema_registry_url);
public static Serde<NotificationAvro> NotificationAvro() {
final Serde<NotificationAvro> notificationAvroSerde = new SpecificAvroSerde<>();
notificationAvroSerde.configure(serdeConfig, false);
return notificationAvroSerde;
}
}
和错误:
Exception in thread "NotificationJsonAvroProcessorService-process-applicationId-3e262d96-19ca-438d-a2b8-9d3c2e9bb4ab-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: ClassCastException while producing data to topic NotificationJsonAvroProcessorService-process-applicationId-KTABLE-AGGREGATE-STATE-STORE-0000000010-repartition. A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.streams.kstream.internals.ChangedSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: org.apache.kafka.streams.kstream.internals.Change). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL,
#to(String topic, Produced<K, V> produced)
withProduced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))
). ... ... Caused by: java.lang.ClassCastException: class com.github.felipegutierrez.explore.spring.model.NotificationAvro cannot be cast to class java.lang.String (com.github.felipegutierrez.explore.spring.model.NotificationAvro is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
所以,我通过阅读 .groupByKey(...)
和 Serialized.with(...)
。我正在使用 Grouped.with(CustomSerdes.String(), CustomSerdes.NotificationAvro())
.
KStream<String, NotificationAvro> notificationAvroKStream = input
.filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
.map((k, v) -> new KeyValue<>(v.getCustomerCardNo(), recordBuilder.getNotificationAvro(v)))
.groupByKey(Grouped.with(CustomSerdes.String(), CustomSerdes.NotificationAvro()))
.reduce((aggValue, newValue) -> {
newValue.setTotalLoyaltyPoints(newValue.getEarnedLoyaltyPoints() + aggValue.getTotalLoyaltyPoints());
return newValue;
})
.toStream();
notificationAvroKStream.foreach((k, v) -> log.info(String.format("Notification avro agg - key: %s, value: %s", k, v)));
return notificationAvroKStream;