Spring Cloud Stream 中的 Kafka Schema 注册是如何发生的?

How does Kafka Schema registration happen in Spring Cloud Stream?

我正在尝试了解如何将 Spring Cloud Streams 与 Kafka Binder 一起使用。

目前,我正在尝试用我的 Confluent Schema Registry 注册一个 AVRO schema 并向一个主题发送消息。

我无法理解 Spring Cloud Streams 在幕后如何进行模式注册。

让我们从 Spring 个 Cloud Stream 示例中提取 this example

AVRO 架构位于 src/resources/avro

mvn:compile 目标是 运行 时,将生成 AVRO 模式的 POJO,生产者可以 post 数据。

但我无法理解的是 Spring Cloud Stream 如何向 AVRO 注册模式?

    @Autowired
    StreamBridge streamBridge;

    @Bean
    public Supplier<Sensor> supplier() {
        return () -> {
            Sensor sensor = new Sensor();
            sensor.setId(UUID.randomUUID().toString() + "-v1");
            sensor.setAcceleration(random.nextFloat() * 10);
            sensor.setVelocity(random.nextFloat() * 100);
            sensor.setTemperature(random.nextFloat() * 50);
            return sensor;
        };
    }

    @Bean
    public Consumer<Sensor> receiveAndForward() {
        return s -> streamBridge.send("sensor-out-0", s);
    }

    @Bean
    Consumer<Sensor> receive() {
        return s -> System.out.println("Received Sensor: " + s);
    }

创建bean的时候就完成了吗?

还是在发送第一条消息时完成?如果是这样,那么 Spring Stream 如何知道从哪里找到 .avsc 文件?

基本上幕后发生了什么?

文档中似乎没有提到这一点。

谢谢。

您的序列化策略(在本例中为 AVRO)始终在序列化器(对于生产者)和反序列化器(对于消费者)中处理。

您可以拥有 Avro(反)序列化键 and/or Avro(反)序列化值。这意味着应该将 KafkaAvroSerializer.class/KafkaAvroDeserializer.class 分别传递给 producer/consumer 配置。除此之外,还必须将 schema.registry.url 传递给客户端配置。

所以在幕后,spring 云流在创建您的 producers/consumers 时使您的应用程序与 avro 兼容(使用在 application.properties 或其他地方找到的配置)。您的客户端将在启动时连接到模式注册表(如果连接失败,日志会告诉您),但不会立即进行任何模式注册。

架构注册在发送的第一条消息上完成。如果您还没有,您会看到生成的 POJO 已经包含模式,因此 spring 云流根本不需要 .avsc 文件。例如,我最后生成的 Avro pojo 包含(第 4 行):

@org.apache.avro.specific.AvroGenerated
public class AvroBalanceMessage extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
    private static final long serialVersionUID = -539731109258473824L;
    public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse({\"type\":\"record\",\"name\":\"AvroBalanceMessage\",\"namespace\":\"tech.nermindedovic\",\"fields\"[{\"name\":\"accountNumber\",\"type\":\"long\",\"default\":0},{\"name\":\"routingNumber\",\"type\":\"long\",\"default\":0},{\"name\":\"balance\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"default\":\"0.00\"},{\"name\":\"errors\",\"type\":\"boolean\",\"default\":false}]}");
    public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
    ....... 

当生产者发送这个 pojo 时,它会向注册中心传达关于模式的当前版本。如果模式不在注册表中,则注册表将存储它并通过 ID 识别它。生产者将消息及其模式 ID 发送到 Kafka 代理。另一方面,消费者将收到此消息并检查它是否看到了 ID(存储在缓存中,因此您不必总是从注册表中检索架构),如果没有,它将与注册表通信以获取关于消息的此类信息。

有点超出 spring 云流的范围,但也可以使用 REST API 为 SR 手动注册模式。