Spring Cloud Stream 正在 Confluent Schema Registry 中生成没有任何字段的 Avro Schema

Spring Cloud Stream is generating Avro Schema in Confluent Schema Registry without any fields

项目描述

我在 Spring Cloud Data Flow 中有一个流,它来自具有 Avro 模式的 Kafka 主题(通过 Confluent Schema Registry),进行一些轻量级处理,然后汇入另一个主题。

问题描述

从第一个主题开始使用效果很好,并在日志中得到验证。但是,当我尝试制作第二个主题时,它不起作用。如果我将 spring.cloud.stream.kafka.binder.producer-properties.auto.register.schemas 设置为 true,那么它会 returns 这个异常:

org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: "bytes", failedMessage=GenericMessage [payload=byte[214], headers={deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=, kafka_receivedTopic=<my-source-topic>, target-protocol=kafka, kafka_offset=10209, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@180bdb9b, id=fbb4b321-0ece-f5c1-2399-29a1f41cd024, kafka_receivedPartitionId=0, kafka_receivedTimestamp=1630351912519, contentType=application/avro, kafka_groupId=<my-scdf-stream-name>, timestamp=1630352217225}
…
    Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: "bytes"
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409 

如果我将 spring.cloud.stream.kafka.binder.producer-properties.auto.register.schemas 设置为 false,我会得到这个:

Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema: "bytes"
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403

如果我删除模式注册表中的现有模式并重试,将 auto.register.schemas 设置为 true,那么它将显示为它为我生成的模式:

{"subject":"<my-sink-topic>-value","version":12,"id":28,"schema":"\"bytes\""}

注意:当我看到 Error retrieving Avro schema: "bytes" 时,我想知道那里是否有问题,这就是为什么它在模式注册表中生成一个仅使用 "bytes" 作为模式的主题。这是否表明这里出了什么问题?

无论如何,记录现在显示在主题中,并且只是没有字段的单个字符串,即,如下所示:

null    "R220e2388-60ef-4887-b766-9f11ffa948a7\fcreate\u0010<schema-name>\u0000\u0002\u0012Kalamazoo\u0000\u0000\u0002\u00142021-07-07\u0002\u00142021-07-07\u0002\u0012fake-dataB61138 \u0000\u0002\u00142021-07-07\u0002\n49006"

null 部分是密钥,鉴于没有密钥,这在某种程度上是可以预料的)。但这与它应该是的有很大不同(请注意,我正在使用 kafka-avro-console-consumer 来讨论这个主题)。

这就是我的处理器 Java 代码的样子:

  @Bean
  public Function<GenericRecord, GenericRecord> process() {
    return genericRecord -> {
      try {
        logger.info(genericRecord.toString());
        genericRecord.put("val", "fake-data");


      } catch (Exception e) {
        logger.error("Failed to process record");
        e.printStackTrace();
      }

      return genericRecord;
    };
  }
}

似乎 Spring 中发生了某些事情,导致无法正确生成此 GenericRecord 的架构。

关于为什么为这些 GenericRecords 生成的写入模式没有字段的任何想法?

相关资源

我查看了以下指南和帖子,但据我所知,我正在按照那里的说明做事:

一些相关配置:

spring.cloud.stream.kafka.binder.producer-properties.key.serializer=org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.producer-properties.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.cloud.stream.bindings.output.content-type=application/*+avro

重现问题的示例代码:

https://github.com/Anant/avro-scdf-schema-registry

事实证明,我们需要根据 this example.

useNativeEncoding 设置为 true

有关相关文档,请参见此处:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_producer_properties