如何为我的 kafka 消费者使用现有的 avro 模式?

How to utilize existing avro schema for my kafka consumer?

我正在使用 Debezium SQL 服务器连接器来捕获更改数据,连接器会自动生成模式并将模式注册到模式注册表中,这意味着我没有 avro 模式文件。在这种情况下,如何使用此模式编写消费者阅读数据?看过很多文章使用avro schema文件读取消费者数据,schema registry中这个payload只有一个schema

如果我在本地创建一个 avro 文件并让我的消费者使用它,那么我必须注册一个具有不同名称的重复模式。

我的问题是如何使用由 kafka 连接器注册的模式编写 Java 消费者 API。非常感谢。

这是我的价值观架构:

{"subject":"new.dbo.locations-value","version":1,"id":102,"schema":"{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"new.dbo.locations\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"display_id\",\"type\":\"string\"},{\"name\":\"first_name\",\"type\":\"string\"},{\"name\":\"last_name\",\"type\":\"string\"},{\"name\":\"location_id\",\"type\":\"string\"},{\"name\":\"location_name\",\"type\":\"string\"},{\"name\":\"location_sub_type_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"location_time_zone\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"parent_organization_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"type\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"new.dbo.locations.Value\"}],\"default\":null},{\"name\":\"after\",\"type\":[\"null\",\"Value\"],\"default\":null},{\"name\":\"source\",\"type\":{\"type\":\"record\",\"name\":\"Source\",\"namespace\":\"io.debezium.connector.sqlserver\",\"fields\":[{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"connector\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":\"long\"},{\"name\":\"snapshot\",\"type\":[{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"allowed\":\"true,last,false\"},\"connect.default\":\"false\",\"connect.name\":\"io.debezium.data.Enum\"},\"null\"],\"default\":\"false\"},{\"name\":\"db\",\"type\":\"string\"},{\"name\":\"schema\",\"type\":\"string\"},{\"name\":\"table\",\"type\":\"string\"},{\"name\":\"change_lsn\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"commit_lsn\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_serial_no\",\"type\":[\"null\",\"long\"],\"default\":null}],\"connect.name\":\"io.debezium.connector.sqlserver.Source\"}},{\"name\":\"op\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"transaction\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"ConnectDefault\",\"namespace\":\"io.confluent.connect.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"total_order\",\"type\":\"long\"},{\"name\":\"data_collection_order\",\"type\":\"long\"}]}],\"default\":null}],\"connect.name\":\"new.dbo.locations.Envelope\"}"}%

您不需要本地架构文件。您可以使用 KafkaConsumer<?, GenericRecord> 进行消费,这将使反序列化器下载并缓存每条消息的相应 ID+架构。

这种方法的缺点是您需要小心解析数据(很像原始数据 JSON)

如果您需要静态模式和编译后的 class 将允许进行严格的类型检查,请从 /subjects/:name/versions/latest

的注册表下载它