如果所有源数据库中的主键都相同,我们可以为多个源数据库创建单个 JDBC 接收器连接器吗?
Can we make Single JDBC Sink Connector for multiple source db if primary key is same in all source DB?
下面是我的 JDBC 接收器连接器配置属性。
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"transforms.dropPrefix.replacement": "",
"table.name.format": "kafka_${topic}",
"connection.password": "********",
"tasks.max": "3",
"topics": "aiq.db1.Test1,aiq.db1.Test2,aiq.db2.Topic1,aiq.db2.Topic2",
"batch.size": "3000",
"transforms": "dropPrefix",
"transforms.dropPrefix.regex": "aiq.(.*)",
"transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"value.converter.schema.registry.url": "http://localhost:8081",
"value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.RecordNameStrategy",
"auto.evolve": "true",
"connection.user": "admin",
"name": "MSSQL_jdbc_sink_connect",
"errors.tolerance": "all",
"auto.create": "true",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"connection.url": "jdbc:sqlserver://mssql",
"insert.mode": "upsert",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"pk.mode": "record_value",
"pk.fields": "id"
如果我使用它,则连接器正在寻找 db1 或 db2,这是源数据库并给出此错误。
com.microsoft.sqlserver.jdbc.SQLServerException: Database 'db2' does not exist. Make sure that the name is entered correctly.
at io.confluent.connect.jdbc.sink.JdbcSinkTask.getAllMessagesException(JdbcSinkTask.java:150)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:102)
... 11 more
[2022-01-25 06:09:09,582] WARN Write of 500 records failed, remainingRetries=10 (io.confluent.connect.jdbc.sink.JdbcSinkTask:92)
com.microsoft.sqlserver.jdbc.SQLServerException: Database 'db2' does not exist. Make sure that the name is entered correctly.
请告诉我我能否创建一个 JDBC 接收器连接器,它使用多个数据库作为源主题。
如果这种情况可能发生,那么如何使用 JDBC Sink Connector 实现此目的?
据我所知,connection.url
一次只能引用一个数据库,对于该数据库的经过身份验证的用户。
如果您需要将不同的主题写入不同的数据库,请复制您的连接器配置,并更改相应的配置
我已经使用了这些属性并且在这种情况下对我有用(如果我们有多个源数据库和一个目标数据库来存储该数据)
table.name.format=iq_${topic}
transforms=dropPrefix
transforms.dropPrefix.replacement=_transferiq_
transforms.dropPrefix.regex=iq.(.*).transferiq.(.*)
transforms.dropPrefix.type=org.apache.kafka.connect.transforms.RegexRouter
下面是我的 JDBC 接收器连接器配置属性。
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"transforms.dropPrefix.replacement": "",
"table.name.format": "kafka_${topic}",
"connection.password": "********",
"tasks.max": "3",
"topics": "aiq.db1.Test1,aiq.db1.Test2,aiq.db2.Topic1,aiq.db2.Topic2",
"batch.size": "3000",
"transforms": "dropPrefix",
"transforms.dropPrefix.regex": "aiq.(.*)",
"transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"value.converter.schema.registry.url": "http://localhost:8081",
"value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.RecordNameStrategy",
"auto.evolve": "true",
"connection.user": "admin",
"name": "MSSQL_jdbc_sink_connect",
"errors.tolerance": "all",
"auto.create": "true",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"connection.url": "jdbc:sqlserver://mssql",
"insert.mode": "upsert",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"pk.mode": "record_value",
"pk.fields": "id"
如果我使用它,则连接器正在寻找 db1 或 db2,这是源数据库并给出此错误。
com.microsoft.sqlserver.jdbc.SQLServerException: Database 'db2' does not exist. Make sure that the name is entered correctly.
at io.confluent.connect.jdbc.sink.JdbcSinkTask.getAllMessagesException(JdbcSinkTask.java:150)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:102)
... 11 more
[2022-01-25 06:09:09,582] WARN Write of 500 records failed, remainingRetries=10 (io.confluent.connect.jdbc.sink.JdbcSinkTask:92)
com.microsoft.sqlserver.jdbc.SQLServerException: Database 'db2' does not exist. Make sure that the name is entered correctly.
请告诉我我能否创建一个 JDBC 接收器连接器,它使用多个数据库作为源主题。 如果这种情况可能发生,那么如何使用 JDBC Sink Connector 实现此目的?
据我所知,connection.url
一次只能引用一个数据库,对于该数据库的经过身份验证的用户。
如果您需要将不同的主题写入不同的数据库,请复制您的连接器配置,并更改相应的配置
我已经使用了这些属性并且在这种情况下对我有用(如果我们有多个源数据库和一个目标数据库来存储该数据)
table.name.format=iq_${topic}
transforms=dropPrefix
transforms.dropPrefix.replacement=_transferiq_
transforms.dropPrefix.regex=iq.(.*).transferiq.(.*)
transforms.dropPrefix.type=org.apache.kafka.connect.transforms.RegexRouter