在 Kstream Consumer Processor 中使用 Avro 创建状态存储

Creating a state store with Avro inside Kstream Consumer Processor

我有一个如下定义的消费者。它从主题中读取一条avro消息并构造一个聚合数据的statestore,它也是avro类型。

    @Bean
    public Consumer<KStream<String, InputEvent>> avroTest() {
        Serde<OutputEvent> serdeOutEvent = new SpecificAvroSerde<>(schemaRegistryClient);
        return st -> st.groupByKey().aggregate(OutputEvent::new, (key, currentEvent, outputEvent) -> {
            //aggregate here
            return outputEvent;
        }, Materialized.with(new Serdes.StringSerde(), serdeOutEvent).toStream();
    }

该函数能够从主题中读取消息并创建第一个聚合结果,但是当它尝试将其存储在 statestore 中时,收到 404 表示模式不存在。

Exception in thread "odoAvroTest-e4ef8e3e-ea1e-458c-b309-b2afefbeacec-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=odometer, partition=0, offset=0, stacktrace=org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema: {"type":"record","name": "" .... }
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:226)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:319)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:307)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getIdFromRegistry(CachedSchemaRegistryClient.java:165)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getId(CachedSchemaRegistryClient.java:297)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:73)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65)
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:59)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:50)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:27)
    at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:166)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:486)
    at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:103)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:363)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:425)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:912)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)

请告知是否需要进行其他配置调整才能使此工作正常进行。当我将输入更改为 hashmap 和/或简单的 POJO amd 使用 JSONSerde 时,代码似乎可以工作并创建聚合

这里的问题是 AVRO Serde 需要的 Schema Registry。当您在 Materialized.with() 中设置值 Serde 时,您必须将架构注册表配置设置为您的 serde。