如何在 JDBC Sink Connector 配置中添加多个主题并获取多个目标表中的主题数据?
How to add multiple topics in JDBC Sink Connector configuration and get topics data in multiple target tables?
下面是我的 JDBC-Sink 连接器配置:
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
behavior.on.null.values=ignore
table.name.format=kafka_Address_V1, kafka_Attribute_V1
connection.password=***********
topics=Address,Attribute
task.max=3
batch.size=500
value.converter.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
value.converter.schema.registry.url=http://localhost:8081
auto.evolve=true
connection.user=user
name=sink-jdbc-connector
errors.tolerance=all
auto.create=true
value.converter=io.confluent.connect.avro.AvroConverter
connection.url=jdbc:sqlserver://localhost:DB;
insert.mode=upsert
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
pk.mode=record_value
pk.fields=id
如果我使用此配置,我将在目标数据库中以这种 kafka_Address_V1、kafka_Attribute_V1 格式获取单个 table,这是这两者的组合。
请告诉我如何使用 JDBC-Sink 连接器将不同的主题数据存储在不同的表中。
根据 the docs,table.name.format
采用单个值,默认使用主题名称本身。
编辑:由@OneCricketeer 提供,您也可以只使用 table.name.format=kafka_${topic}_V1
。下面的 SMT 可用于更复杂的名称转换。
要实现您想要的效果,您可以使用 RegExRouter
Single Message Transform 修改 Kafka Connect 处理的主题
试试这个:
transforms =changeTopicName
transforms.changeTopicName.type =org.apache.kafka.connect.transforms.RegexRouter
transforms.changeTopicName.regex =(.*)
transforms.changeTopicName.replacement =kafka__V1
下面是我的 JDBC-Sink 连接器配置:
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
behavior.on.null.values=ignore
table.name.format=kafka_Address_V1, kafka_Attribute_V1
connection.password=***********
topics=Address,Attribute
task.max=3
batch.size=500
value.converter.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
value.converter.schema.registry.url=http://localhost:8081
auto.evolve=true
connection.user=user
name=sink-jdbc-connector
errors.tolerance=all
auto.create=true
value.converter=io.confluent.connect.avro.AvroConverter
connection.url=jdbc:sqlserver://localhost:DB;
insert.mode=upsert
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
pk.mode=record_value
pk.fields=id
如果我使用此配置,我将在目标数据库中以这种 kafka_Address_V1、kafka_Attribute_V1 格式获取单个 table,这是这两者的组合。
请告诉我如何使用 JDBC-Sink 连接器将不同的主题数据存储在不同的表中。
根据 the docs,table.name.format
采用单个值,默认使用主题名称本身。
编辑:由@OneCricketeer 提供,您也可以只使用 table.name.format=kafka_${topic}_V1
。下面的 SMT 可用于更复杂的名称转换。
要实现您想要的效果,您可以使用 RegExRouter
Single Message Transform 修改 Kafka Connect 处理的主题
试试这个:
transforms =changeTopicName
transforms.changeTopicName.type =org.apache.kafka.connect.transforms.RegexRouter
transforms.changeTopicName.regex =(.*)
transforms.changeTopicName.replacement =kafka__V1