Kafka 连接集成

Kafka Connect Integration

我正在尝试使用 Kafka Connect 从 Oracle 数据库中获取数据。 Kafka 连接器提供的默认对象是 "GenericRecord" type.This 使其过于具体,导致通过执行 record.getAsString("someIDENTIFIER") 获取数据的情况。是否有可能获得特定类型的对象而不是 GenericRecord 类型。

Kafka Connect 源连接器与 SourceRecord 对象一起工作,Kafka Connect worker 配置为使用 converterSourceRecord 序列化为二进制文件然后写入 Kafka 主题的表单。 Kafka Connect 附带了一个 JSON 转换器,而 Confluent 提供了一个 Avro 转换器。因此,写入 Kafka 的消息的二进制形式取决于您使用的转换器。

(同样,接收器连接器与 SinkRecord 对象一起工作,Kafka Connect worker 使用其转换器将从 Kafka 读取的消息的二进制形式反序列化为连接器处理的 SinkRecord 对象与。)

听起来您正在编写一个 Kafka 消费者并在那里看到 GenericRecord 个对象。如果是这样,那么您可能已经将 Kafka Connect worker 配置为使用 Confluent 的 Avro 转换器,对于像 JDBC 连接器这样的源连接器,它会将 SourceRecord 转换为 Kafka Connect 然后写入的 Avro 二进制格式卡夫卡主题。然后,您的客户端可能会使用配置有 Avro 反序列化器的 Kafka 使用者,除非您为反序列化器提供一个 Avro 模式来使用它,否则会将 Avro 编码的消息反序列化为 Avro GenericRecord.

但是,您可以将应用程序配置为了解特定版本的 Avro 模式,并让构建系统为该版本的 Avro 模式生成代码,以创建将反序列化 Avro-将消息编码为模式描述的内存形式。在 Java 中,这意味着您将从架构中生成 class,然后在您的代码中使用生成的 class 将 GenericRecord 复制到您的 class。有关 GenericRecord 的转换,请参阅 this complete consumer example, and specifically this line。在该示例中,LogLine 是从 Avro 模式生成的 class:

GenericRecord genericEvent = (GenericRecord) messageAndMetadata.message();
LogLine event = (LogLine) SpecificData.get().deepCopy(LogLine.SCHEMA$, genericEvent);

Avro 的一个显着优势是它直接支持模式演变,而 Confluent 的模式注册表利用了这一点。因此,虽然源连接器可能会为 table 生成 Avro 模式以响应数据库中 table 的结构变化,但只要数据库模式发生变化,Avro 模式就会向后 and/or 向前兼容,您的客户端应用程序使用的 Avro 库将自动从消息的 Avro 模式转换为您的应用程序使用的 Avro 模式。

当然,在某些时候您会更改您的应用程序以使用新的 Avro 模式,但这不必同时进行。事实上,如果您将架构注册表配置为强制架构版本向前和向后兼容,您可以更改您的客户端应用程序 beforeafter数据库已更改,JDBC 源连接器开始使用新版本的 Avro 架构。