Kafka 连接自定义转换以将无模式 Json 转换为 Avro
Kafka connect custom transforms to convert schema-less Json to Avro
我正在尝试构建一个从 Kafka 读取 json 数据(无模式)的系统,将其转换为 avro 并将其推送到 s3。
我已经能够使用 KStreams 和 KSQL 实现 json 到 avro 的转换。我想知道使用 Kafka Connect 的自定义转换是否可以实现同样的事情。
这是我目前尝试过的方法:
public class JsontoAvroConverter<R extends ConnectRecord<R>> implements Transformation<R> {
public static final String OVERVIEW_DOC = "Transform Payload to Custom Format";
private static final String PURPOSE = "transforming payload";
public static final ConfigDef CONFIG_DEF = new ConfigDef();
@Override
public void configure(Map<String, ?> props) {
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
@Override
public void close() {
}
@Override
public R apply(R record) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
properties.setProperty("acks", "1");
properties.setProperty("retries", "10");
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", KafkaAvroSerializer.class.getName());
properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");
avro_Schema updatedSchema = makeUpdatedSchema();
return newRecord(record, updatedSchema);
}
private avro_Schema makeUpdatedSchema() {
avro_Schema.Builder avro_record = avro_Schema.newBuilder()
.setName("test")
.setTry$(1);
return avro_record.build();
}
protected Object operatingValue(R record) {
return record.value();
}
protected R newRecord(R record, avro_Schema updatedSchema) {
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, record.value(), record.timestamp());
}
}
其中 avro_schema 是我在 avsc 文件中指定的架构名称。
我不确定这是否是正确的方法,但我面临的问题是,当调用 newRecord() 函数时,它期望 updatedSchema 是 Schema 类型,但我为其提供自定义 avro_Schema 类型。
此外,我保存到 updatedSchema 中的 avro_record.build() 并不是真正的架构,而是转换后的记录本身。但是我不能只将记录主题、key(=null) 和 updatedRecord 传递给 newRecord 函数。它分别需要架构和值。
我的问题是:
- 甚至可以使用 KafkaConnect 并且不使用 KStreams 或 KSQL 将 json 转换为 avro 吗? - 因为这两种选择都需要设置独立的服务。
- 如何将自定义 avro 架构传递给 newRecord 函数,然后单独提供数据。
很抱歉,如果这个问题已经得到解答,我确实回答了一些其他问题,但 none 似乎回答了我的疑问。如果您需要任何其他详细信息,请告诉我。谢谢!
KafkaConnect 自定义转换器只需要向传入的 JSON 添加模式。接收器 属性 format.class=io.confluent.connect.s3.format.avro.AvroFormat 将负责其余的工作。
没有模式,记录值是一个映射,有了模式,它就变成了一个结构。我不得不修改我的代码如下:
@Override
public R apply(R record) {
final Map<String,?> value = requireMap(record.value(),PURPOSE);
Schema updatedSchema = makeUpdatedSchema();
final Struct updatedValue = new Struct(updatedSchema);
for (Field field : updatedSchema.fields()) {
updatedValue.put(field.name(), value.get(field.name()));
}
return newRecord(record, updatedSchema, updatedValue);
}
private Schema makeUpdatedSchema() {
final SchemaBuilder builder = SchemaBuilder.struct()
.name("json_schema")
.field("name",Schema.STRING_SCHEMA)
.field("try",Schema.INT64_SCHEMA);
return builder.build();
}
感谢@OneCricketeer 澄清我的疑惑!
我正在尝试构建一个从 Kafka 读取 json 数据(无模式)的系统,将其转换为 avro 并将其推送到 s3。
我已经能够使用 KStreams 和 KSQL 实现 json 到 avro 的转换。我想知道使用 Kafka Connect 的自定义转换是否可以实现同样的事情。
这是我目前尝试过的方法:
public class JsontoAvroConverter<R extends ConnectRecord<R>> implements Transformation<R> {
public static final String OVERVIEW_DOC = "Transform Payload to Custom Format";
private static final String PURPOSE = "transforming payload";
public static final ConfigDef CONFIG_DEF = new ConfigDef();
@Override
public void configure(Map<String, ?> props) {
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
@Override
public void close() {
}
@Override
public R apply(R record) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
properties.setProperty("acks", "1");
properties.setProperty("retries", "10");
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", KafkaAvroSerializer.class.getName());
properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");
avro_Schema updatedSchema = makeUpdatedSchema();
return newRecord(record, updatedSchema);
}
private avro_Schema makeUpdatedSchema() {
avro_Schema.Builder avro_record = avro_Schema.newBuilder()
.setName("test")
.setTry$(1);
return avro_record.build();
}
protected Object operatingValue(R record) {
return record.value();
}
protected R newRecord(R record, avro_Schema updatedSchema) {
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, record.value(), record.timestamp());
}
}
其中 avro_schema 是我在 avsc 文件中指定的架构名称。
我不确定这是否是正确的方法,但我面临的问题是,当调用 newRecord() 函数时,它期望 updatedSchema 是 Schema 类型,但我为其提供自定义 avro_Schema 类型。
此外,我保存到 updatedSchema 中的 avro_record.build() 并不是真正的架构,而是转换后的记录本身。但是我不能只将记录主题、key(=null) 和 updatedRecord 传递给 newRecord 函数。它分别需要架构和值。
我的问题是:
- 甚至可以使用 KafkaConnect 并且不使用 KStreams 或 KSQL 将 json 转换为 avro 吗? - 因为这两种选择都需要设置独立的服务。
- 如何将自定义 avro 架构传递给 newRecord 函数,然后单独提供数据。
很抱歉,如果这个问题已经得到解答,我确实回答了一些其他问题,但 none 似乎回答了我的疑问。如果您需要任何其他详细信息,请告诉我。谢谢!
KafkaConnect 自定义转换器只需要向传入的 JSON 添加模式。接收器 属性 format.class=io.confluent.connect.s3.format.avro.AvroFormat 将负责其余的工作。
没有模式,记录值是一个映射,有了模式,它就变成了一个结构。我不得不修改我的代码如下:
@Override
public R apply(R record) {
final Map<String,?> value = requireMap(record.value(),PURPOSE);
Schema updatedSchema = makeUpdatedSchema();
final Struct updatedValue = new Struct(updatedSchema);
for (Field field : updatedSchema.fields()) {
updatedValue.put(field.name(), value.get(field.name()));
}
return newRecord(record, updatedSchema, updatedValue);
}
private Schema makeUpdatedSchema() {
final SchemaBuilder builder = SchemaBuilder.struct()
.name("json_schema")
.field("name",Schema.STRING_SCHEMA)
.field("try",Schema.INT64_SCHEMA);
return builder.build();
}
感谢@OneCricketeer 澄清我的疑惑!