Confluent:Topic 不是在加载连接器后创建的
Confluent:Topic was not created after loading a Connector
我尝试连接到多个数据库,包括 MySQL 和 MSSQL,我没有遇到任何问题。
但是当我尝试连接到某个远程 MySQL 数据库(只能从我公司的网络访问),其中包含一个包含近 300 万条记录的视图时,连接器被加载并且状态说是 运行ning 但没有创建任何主题来使用其中的数据。
可能是什么原因?我在哪里可以找到正确的日志文件以了解发生了什么?
下面是连接器的示例:
{
"name": "mysql-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"incrementing.column.name": "Id",
"tasks.max": "1",
"table.types": "VIEW",
"table.whitelist": "ticket_rep",
"mode": "incrementing",
"topic.prefix": "mysql-",
"name": "mysql-source",
"validate.non.null": "false",
"connection.url": "jdbc:mysql://XX.XXX.XX.XX:3306/database?
user=user&password=password"
}
}
这些是我 运行 融合日志连接时的日志结果:
> [2018-09-11 16:37:57,382] ERROR Failed to run query for table TimestampIncrementingTableQuerier{name='foo', query='null', topicPrefix='mysql-', timestampColumn='null', incrementingColumn='id'}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask:247)
java.sql.SQLException: Java heap space
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:975)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:1025)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:201)
at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:84)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:55)
at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:225)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:179)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2018-09-11 16:38:02,523] ERROR Failed to run query for table TimestampIncrementingTableQuerier{name='foo', query='null', topicPrefix='mysql-', timestampColumn='null', incrementingColumn='id'}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask:247)
com.mysql.cj.jdbc.exceptions.PacketTooBigException: Packet for query is too large (7,562,612 > 4,194,304). You can change this value on the server by setting the 'max_allowed_packet' variable.
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:107)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:975)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:1025)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:201)
at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:84)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:55)
at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:225)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:179)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
com.mysql.cj.jdbc.exceptions.PacketTooBigException: Packet for query is too large
是 MySQL 端的问题,可以通过增加 max_allowed_packet
变量的值来解决。为此,您需要包括
max_allowed_packet=512M
在my.cnf
(或my.ini
,取决于OS你是运行)文件(在[mysqld]
部分)然后重新启动MySQL。重启后MySQL,
SHOW VARIABLES LIKE 'max_allowed_packet';
应该 return 您在 MySQL 配置中设置的值 file.For 有关此错误的更多详细信息,您可以参考 MySQL documentation。
java.sql.SQLException: Java heap space
,表示 Kafka 连接 运行 堆外 space。可以通过 运行
来控制起始和最大堆大小
KAFKA_HEAP_OPTS="-Xms512m -Xmx1g" connect-standalone connect-worker.properties mysql-source-connector.properties
设置起始堆大小为 512 MB,最大大小为 1 GB。您可能需要根据需要更改尺寸。
我尝试连接到多个数据库,包括 MySQL 和 MSSQL,我没有遇到任何问题。
但是当我尝试连接到某个远程 MySQL 数据库(只能从我公司的网络访问),其中包含一个包含近 300 万条记录的视图时,连接器被加载并且状态说是 运行ning 但没有创建任何主题来使用其中的数据。
可能是什么原因?我在哪里可以找到正确的日志文件以了解发生了什么?
下面是连接器的示例:
{
"name": "mysql-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"incrementing.column.name": "Id",
"tasks.max": "1",
"table.types": "VIEW",
"table.whitelist": "ticket_rep",
"mode": "incrementing",
"topic.prefix": "mysql-",
"name": "mysql-source",
"validate.non.null": "false",
"connection.url": "jdbc:mysql://XX.XXX.XX.XX:3306/database?
user=user&password=password"
}
}
这些是我 运行 融合日志连接时的日志结果:
> [2018-09-11 16:37:57,382] ERROR Failed to run query for table TimestampIncrementingTableQuerier{name='foo', query='null', topicPrefix='mysql-', timestampColumn='null', incrementingColumn='id'}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask:247)
java.sql.SQLException: Java heap space
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:975)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:1025)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:201)
at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:84)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:55)
at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:225)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:179)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2018-09-11 16:38:02,523] ERROR Failed to run query for table TimestampIncrementingTableQuerier{name='foo', query='null', topicPrefix='mysql-', timestampColumn='null', incrementingColumn='id'}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask:247)
com.mysql.cj.jdbc.exceptions.PacketTooBigException: Packet for query is too large (7,562,612 > 4,194,304). You can change this value on the server by setting the 'max_allowed_packet' variable.
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:107)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:975)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:1025)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:201)
at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:84)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:55)
at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:225)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:179)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
com.mysql.cj.jdbc.exceptions.PacketTooBigException: Packet for query is too large
是 MySQL 端的问题,可以通过增加 max_allowed_packet
变量的值来解决。为此,您需要包括
max_allowed_packet=512M
在my.cnf
(或my.ini
,取决于OS你是运行)文件(在[mysqld]
部分)然后重新启动MySQL。重启后MySQL,
SHOW VARIABLES LIKE 'max_allowed_packet';
应该 return 您在 MySQL 配置中设置的值 file.For 有关此错误的更多详细信息,您可以参考 MySQL documentation。
java.sql.SQLException: Java heap space
,表示 Kafka 连接 运行 堆外 space。可以通过 运行
KAFKA_HEAP_OPTS="-Xms512m -Xmx1g" connect-standalone connect-worker.properties mysql-source-connector.properties
设置起始堆大小为 512 MB,最大大小为 1 GB。您可能需要根据需要更改尺寸。