KStream:读取和写入 Avro 记录时出错

KStream: Error Reading and Writing Avro records

我正在尝试将从一个主题读取的 avro 记录写入另一个主题,目的是在我使该路由正常工作后通过转换来增强它。我已经将 KStream 与来自示例之一的 avro 代码一起使用,并进行了一些修改以连接到架构注册表以检索 avro 架构。

streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "mysql-stream-processing");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
final Serde<GenericRecord> keySerde = new GenericAvroSerde(
        new CachedSchemaRegistryClient(schemaRegistryUrl, 100),
        Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
                schemaRegistryUrl));
final Serde<GenericRecord> valueSerde = new GenericAvroSerde(
        new CachedSchemaRegistryClient(schemaRegistryUrl, 100),
        Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
                schemaRegistryUrl));
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);

final KStreamBuilder builder = new KStreamBuilder();

final KStream<GenericRecord, GenericRecord> record = builder.stream("dbserver1.employees.employees");

record.print(keySerde, valueSerde);

record.to(keySerde, valueSerde, "newtopic");


record.foreach((key, val) -> System.out.println(key.toString()+"  "+val.toString()));
final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);

streams.cleanUp();
streams.start();

当 运行 print() 工作时,我可以在控制台中看到记录,但我无法将记录写入“newtopic”,失败并出现以下错误

Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=dbserver1.employees.employees, partition=0, offset=0
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:217)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:627)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: io.confluent.examples.streams.utils.GenericAvroSerializer / value: io.confluent.examples.streams.utils.GenericAvroSerializer) is not compatible to the actual key or value type (key type: [B / value type: [B). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
        at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:81)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:198)
        ... 2 more
Caused by: java.lang.ClassCastException: [B cannot be cast to org.apache.avro.generic.GenericRecord
        at io.confluent.examples.streams.utils.GenericAvroSerializer.serialize(GenericAvroSerializer.java:25)
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:77)
        at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:79)
        ... 5 more

我猜你需要配置正确的 Serdes:

要么设置正确的全局 Serdes,要么为每个运算符指定 Serdes。如果一个操作员需要一个 Serde,那么它有一个相应的重载以 Serdes 为参数。