如何防止使用 Kafka connect sink 插入 Kafka 消息中不需要的属性?
How to prevent unwanted attributes in Kafka message getting inserted using Kafka connect sink?
我正在使用 Java 生产者应用程序将消息发布到我的 Kafka 主题中,代码如下。
String mySchema = "{"type": "record","name": "MyData","namespace": "com.qwe.rty","doc": "MyData Schema","fields": [{"name": "f1","type": ["null", "string"],"default" : null}, {"name": "f2","type": ["null", "string"],"default" : null}, {"name": "f3","type": ["null", "string"],"default" : null}]}";
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://192.168.0.1:8081");
KafkaProducer producer = new KafkaProducer(props);
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(mySchema);
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("f1", "data1");
avroRecord.put("f2", "data2");
avroRecord.put("f3", "data3");
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("my_topic_1",avroRecord);
try {
System.out.println("send from producer.serialized.avro.Sender6n");
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
try{
System.out.println("flush from producer.serialized.avro.Sender6n");
producer.flush();
}catch(Exception e){
e.printStackTrace();
}
消息在 Avro 中。我正在使用 Kafka connect jdbc sink 将其插入到我的 Oracle table,使用下面的配置;
name=jdbc-sink-avro
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=my_topic_1
connection.url=jdbc:oracle:thin:@192.168.0.1:1521:user01
connection.user=user_01
connection.password=user_01
auto.create=true
table.name.format=my_table_1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://192.168.0.1:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://192.168.0.1:8081
producer.retries=1
这很好用。但我不希望 f3 插入到我的 table,这意味着,我只需要一个 2 列 table。换句话说,我想防止 f3 被插入。我怎样才能做到这一点?
使用 Kafka Connect 的 Single Message Transform (SMT) 功能。具体来说 ReplaceField
和 blacklist
。
你可以看到一个例子here ("Field Masking and Whitelist/Blacklists")。
我正在使用 Java 生产者应用程序将消息发布到我的 Kafka 主题中,代码如下。
String mySchema = "{"type": "record","name": "MyData","namespace": "com.qwe.rty","doc": "MyData Schema","fields": [{"name": "f1","type": ["null", "string"],"default" : null}, {"name": "f2","type": ["null", "string"],"default" : null}, {"name": "f3","type": ["null", "string"],"default" : null}]}";
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://192.168.0.1:8081");
KafkaProducer producer = new KafkaProducer(props);
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(mySchema);
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("f1", "data1");
avroRecord.put("f2", "data2");
avroRecord.put("f3", "data3");
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("my_topic_1",avroRecord);
try {
System.out.println("send from producer.serialized.avro.Sender6n");
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
try{
System.out.println("flush from producer.serialized.avro.Sender6n");
producer.flush();
}catch(Exception e){
e.printStackTrace();
}
消息在 Avro 中。我正在使用 Kafka connect jdbc sink 将其插入到我的 Oracle table,使用下面的配置;
name=jdbc-sink-avro
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=my_topic_1
connection.url=jdbc:oracle:thin:@192.168.0.1:1521:user01
connection.user=user_01
connection.password=user_01
auto.create=true
table.name.format=my_table_1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://192.168.0.1:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://192.168.0.1:8081
producer.retries=1
这很好用。但我不希望 f3 插入到我的 table,这意味着,我只需要一个 2 列 table。换句话说,我想防止 f3 被插入。我怎样才能做到这一点?
使用 Kafka Connect 的 Single Message Transform (SMT) 功能。具体来说 ReplaceField
和 blacklist
。
你可以看到一个例子here ("Field Masking and Whitelist/Blacklists")。