如何在 Kafka 流中使用带有 KTable 的自定义序列化程序?

How to use custom serializers with KTable in Kafka streams?

当我将 groupBy 引入我的 KStream 时,我在 KStreamKTable 的序列化过程中遇到了一些错误。据我了解,一旦你在 KStream 上有一个 aggregatereduce,Kafka 会尝试将其转换为 KTable 由于必要的洗牌,因此 Kafka 必须再次序列化记录。所以,我原来的 KStream 只是像这样将记录从 JSON 映射到 AVRO,并且工作正常。

    @StreamListener("notification-input-channel")
    @SendTo("notification-output-avro-channel")
    public KStream<String, NotificationAvro> process(KStream<String, PosInvoice> input) {
        log.info("received PosInvoice JSON: {}", input);
        KStream<String, NotificationAvro> notificationAvroKStream = input
                .filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
                .mapValues(v -> recordBuilder.getNotificationAvro(v));
        notificationAvroKStream.foreach((k, v) -> log.info(String.format("Notification avro - key: %s, value: %s", k, v)));
        return notificationAvroKStream;
    }

然后我介绍了 groupByKeyreduce 我意识到它转换为 KTable 因此它需要 Serdesapplication.yaml 文件.但不幸的是我无法配置默认的 Serdes 因为我有其他类型的序列化。因此我决定在 KTable 拓扑上连载。我正在尝试实施此解决方案 .

我尝试使用自定义 serdes 实现的部分代码无法正常工作 (Materialized.with(CustomSerdes.String(), CustomSerdes.NotificationAvro()))。首先,我认为我不需要 KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("mystore");,但如果没有它,它也不起作用,我找不到一个不是 KeyValueBytes... 的物化,我可以在其中定义我的 serdes CustomSerdes.String(), CustomSerdes.NotificationAvro()

根据我在link上提到的答案,他们也使用了final StreamsBuilder builder = new StreamsBuilder();。但是因为我使用 spring-kafka 计算它,所以我没有这个选项,或者如果我有我不知道如何使用。

@Service
@Slf4j
@EnableBinding(PosListenerJsonAvroBinding.class)
public class NotificationJsonAvroProcessorService {
    @Autowired
    RecordBuilder recordBuilder;

    @StreamListener("notification-input-channel")
    @SendTo("notification-output-avro-channel")
    public KStream<String, NotificationAvro> process(KStream<String, PosInvoice> input) {
        log.info("received PosInvoice JSON: {}", input);
        KStream<String, NotificationAvro> notificationAvroKStream = input
                .filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
                .map((k, v) -> new KeyValue<>(v.getCustomerCardNo(), recordBuilder.getNotificationAvro(v)));
        notificationAvroKStream.foreach((k, v) -> log.info(String.format("Notification avro - key: %s, value: %s", k, v)));

        // *********************************************
        // IS THERE A KeyValueStoreSupplier THAT I CAN PASS ALSO MY SERDES INSTEAD OF Bytes?
        // KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("mystore");
        KTable<String, NotificationAvro> convertedTable = notificationAvroKStream
                .toTable(
                        // *********************************************
                        // HOW TO MATERIALIZE KTABLE VALUES WITH SERDES ?
                        Materialized
                                // .as(storeSupplier) // this is not necessary
                                .with(CustomSerdes.String(), CustomSerdes.NotificationAvro())
                        // *********************************************
                )
                .groupBy((cardNo, notificationAvro) -> KeyValue.pair(cardNo, notificationAvro))
                .reduce(
                        (aggValue, newValue) -> {
                            newValue.setTotalLoyaltyPoints(newValue.getEarnedLoyaltyPoints() + aggValue.getTotalLoyaltyPoints());
                            return newValue;
                        },
                        (aggValue, oldValue) -> oldValue
                );
        KStream<String, NotificationAvro> notificationAggAvroKStream = convertedTable.toStream();
        notificationAggAvroKStream.foreach((k, v) -> log.info(String.format("Notification agg avro - key: %s, value: %s", k, v)));

        return notificationAggAvroKStream;
    }
}

自定义 serdes:

@Service
public class CustomSerdes extends Serdes {
    private static final String schema_registry_url = "http://localhost:8081";
    private final static Map<String, String> serdeConfig = Collections
            .singletonMap("schema.registry.url", schema_registry_url);
    public static Serde<NotificationAvro> NotificationAvro() {
        final Serde<NotificationAvro> notificationAvroSerde = new SpecificAvroSerde<>();
        notificationAvroSerde.configure(serdeConfig, false);
        return notificationAvroSerde;
    }
}

和错误:

Exception in thread "NotificationJsonAvroProcessorService-process-applicationId-3e262d96-19ca-438d-a2b8-9d3c2e9bb4ab-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: ClassCastException while producing data to topic NotificationJsonAvroProcessorService-process-applicationId-KTABLE-AGGREGATE-STATE-STORE-0000000010-repartition. A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.streams.kstream.internals.ChangedSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: org.apache.kafka.streams.kstream.internals.Change). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, #to(String topic, Produced<K, V> produced) with Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))). ... ... Caused by: java.lang.ClassCastException: class com.github.felipegutierrez.explore.spring.model.NotificationAvro cannot be cast to class java.lang.String (com.github.felipegutierrez.explore.spring.model.NotificationAvro is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')

所以,我通过阅读 解决了问题,它使用了已弃用的 .groupByKey(...)Serialized.with(...)。我正在使用 Grouped.with(CustomSerdes.String(), CustomSerdes.NotificationAvro()).

KStream<String, NotificationAvro> notificationAvroKStream = input
     .filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
     .map((k, v) -> new KeyValue<>(v.getCustomerCardNo(), recordBuilder.getNotificationAvro(v)))
     .groupByKey(Grouped.with(CustomSerdes.String(), CustomSerdes.NotificationAvro()))
     .reduce((aggValue, newValue) -> {
          newValue.setTotalLoyaltyPoints(newValue.getEarnedLoyaltyPoints() + aggValue.getTotalLoyaltyPoints());
          return newValue;
     })
     .toStream();
notificationAvroKStream.foreach((k, v) -> log.info(String.format("Notification avro agg - key: %s, value: %s", k, v)));
return notificationAvroKStream;