如果所有源数据库中的主键都相同,我们可以为多个源数据库创建单个 JDBC 接收器连接器吗?

Can we make Single JDBC Sink Connector for multiple source db if primary key is same in all source DB?

下面是我的 JDBC 接收器连接器配置属性。

    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "transforms.dropPrefix.replacement": "",
    "table.name.format": "kafka_${topic}",
    "connection.password": "********",
    "tasks.max": "3",
    "topics": "aiq.db1.Test1,aiq.db1.Test2,aiq.db2.Topic1,aiq.db2.Topic2",
    "batch.size": "3000",
    "transforms": "dropPrefix",
    "transforms.dropPrefix.regex": "aiq.(.*)",
    "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "value.converter.schema.registry.url": "http://localhost:8081",
    "value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.RecordNameStrategy",
    "auto.evolve": "true",
    "connection.user": "admin",
    "name": "MSSQL_jdbc_sink_connect",
    "errors.tolerance": "all",
    "auto.create": "true",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "connection.url": "jdbc:sqlserver://mssql",
    "insert.mode": "upsert",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://localhost:8081",
    "pk.mode": "record_value",
    "pk.fields": "id"

如果我使用它,则连接器正在寻找 db1 或 db2,这是源数据库并给出此错误。

com.microsoft.sqlserver.jdbc.SQLServerException: Database 'db2' does not exist. Make sure that the name is entered correctly.

at io.confluent.connect.jdbc.sink.JdbcSinkTask.getAllMessagesException(JdbcSinkTask.java:150)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:102)
... 11 more
[2022-01-25 06:09:09,582] WARN Write of 500 records failed, remainingRetries=10 (io.confluent.connect.jdbc.sink.JdbcSinkTask:92)
com.microsoft.sqlserver.jdbc.SQLServerException: Database 'db2' does not exist. Make sure that the name is entered correctly.

请告诉我我能否创建一个 JDBC 接收器连接器,它使用多个数据库作为源主题。 如果这种情况可能发生,那么如何使用 JDBC Sink Connector 实现此目的?

据我所知,connection.url 一次只能引用一个数据库,对于该数据库的经过身份验证的用户。

如果您需要将不同的主题写入不同的数据库,请复制您的连接器配置,并更改相应的配置

我已经使用了这些属性并且在这种情况下对我有用(如果我们有多个源数据库和一个目标数据库来存储该数据)

table.name.format=iq_${topic}
transforms=dropPrefix
transforms.dropPrefix.replacement=_transferiq_
transforms.dropPrefix.regex=iq.(.*).transferiq.(.*)
transforms.dropPrefix.type=org.apache.kafka.connect.transforms.RegexRouter