未找到 mysql DB table 的 kafka 接收器连接器

kafka sink connector with mysql DB table not found

我试图将 kafka 接收器连接器配置到 mysql 数据库。 Kafka 主题在 AVRO 格式中具有价值,我想将数据转储到 mysql。我收到错误提示 table 未找到(Table 'airflow.mytopic' 不存在)。我期待在 'myschema.mytopic' 中创建 table,但它正在寻找气流中的 table。我启用了 "auto.create": "true" 期望 table 在任何需要的地方创建。

我正在使用 Confluent Kafka 5.4.1 并手动启动它

配置:

"topics": "mytopic",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://<mysqlDB>:3306/myschema",
"connection.user": "db_user",
"connection.password": "db_pwd",
"tasks.max": "1",
"auto.evolve": "true",
"auto.create": "true",
"transforms": "routeRecords",
"transforms.routeRecords.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.routeRecords.replacement": "",
"transforms.routeRecords.regex": "(.*)",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8084",
"connection.attempts": "1",
"dialect.name": "MySqlDatabaseDialect",
"table.name.format": "myschema.mytopic"

错误堆栈:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: Exception chain:
java.sql.SQLSyntaxErrorException: Table 'airflow.mytopic' doesn't exist

    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:122)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
    ... 10 more
Caused by: java.sql.SQLException: Exception chain:
java.sql.SQLSyntaxErrorException: Table 'airflow.mytopic' doesn't exist
    
    at io.confluent.connect.jdbc.sink.JdbcSinkTask.getAllMessagesException(JdbcSinkTask.java:150)
    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:102)
    ... 11 more

知道错误的原因吗?

通过降级 mysql 驱动程序 (mysql-connector-java-5.1.17.jar) 解决了问题,下面是配置

"topics": "mytopic",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://<mysqlDB>:3306/myschema",
"connection.user": "db_user",
"connection.password": "db_pwd",
"tasks.max": "1",
"insert.mode": "insert",
"auto.evolve": "true",
"auto.create": "true",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8084",
"connection.attempts": "1",
"dialect.name": "MySqlDatabaseDialect",
"table.name.format": "myschema.mytopic"