Debezium - MySQL 连接器 - Kinesis - 服务未启动
Debezium - MySQL Connector - Kinesis - Service Not Starting
我正在尝试让 Debezium 将我的 CDC 事件发送到 Kinesis,该服务似乎已启动但存在错误,而且它似乎没有向 Kinesis 发送任何内容。我在 Debezium 网站上遵循了这个设置指南:
https://debezium.io/documentation/reference/1.2/operations/debezium-server.html
这是我的配置文件:
debezium.sink.type=kinesis
debezium.sink.kinesis.region=us-east-1
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=127.0.0.1
debezium.source.database.id=12345
debezium.source.database.port=3306
debezium.source.database.user=*****
debezium.source.database.password=******
debezium.source.database.dbname=inventory
debezium.source.database.server.name=127.0.0.1
debezium.source.schema.whitelist=inventory
当我 运行 服务器使用 ./run.sh
时,服务器似乎开始启动然后我得到一个错误:
2020-07-17 19:11:22,380 INFO [io.deb.ser.BaseChangeConsumer] (main) Using 'io.debezium.server.BaseChangeConsumer$$Lambda/694452085@2abf4075' stream name mapper
2020-07-17 19:11:22,754 INFO [io.deb.ser.kin.KinesisChangeConsumer] (main) Using default KinesisClient 'software.amazon.awssdk.services.kinesis.DefaultKinesisClient@5d5f10b2'
2020-07-17 19:11:22,755 INFO [io.deb.ser.DebeziumServer] (main) Consumer 'io.debezium.server.kinesis.KinesisChangeConsumer' instantiated
2020-07-17 19:11:22,926 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
converter.type = key
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = true
2020-07-17 19:11:22,928 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
converter.type = value
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = false
2020-07-17 19:11:22,936 INFO [io.deb.emb.EmbeddedEngine$EmbeddedConfig] (main) EmbeddedConfig values:
access.control.allow.methods =
access.control.allow.origin =
admin.listeners = null
bootstrap.servers = [localhost:9092]
client.dns.lookup = default
config.providers = []
connector.client.config.override.policy = None
header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
key.converter = class org.apache.kafka.connect.json.JsonConverter
listeners = null
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
offset.flush.interval.ms = 0
offset.flush.timeout.ms = 5000
offset.storage.file.filename = data/offsets.dat
offset.storage.partitions = null
offset.storage.replication.factor = null
offset.storage.topic =
plugin.path = null
rest.advertised.host.name = null
rest.advertised.listener = null
rest.advertised.port = null
rest.extension.classes = []
rest.host.name = null
rest.port = 8083
ssl.client.auth = none
task.shutdown.graceful.timeout.ms = 5000
topic.tracking.allow.reset = true
topic.tracking.enable = true
value.converter = class org.apache.kafka.connect.json.JsonConverter
2020-07-17 19:11:22,937 INFO [org.apa.kaf.con.run.WorkerConfig] (main) Worker configuration property 'internal.key.converter' is deprecated and may be removed in an upcoming release. The specified value 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this property can be safely removed from the worker configuration.
2020-07-17 19:11:22,937 INFO [org.apa.kaf.con.run.WorkerConfig] (main) Worker configuration property 'internal.value.converter' is deprecated and may be removed in an upcoming release. The specified value 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this property can be safely removed from the worker configuration.
2020-07-17 19:11:22,939 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
converter.type = key
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = true
2020-07-17 19:11:22,940 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
converter.type = value
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = true
2020-07-17 19:11:22,942 INFO [io.deb.ser.DebeziumServer] (main) Engine executor started
2020-07-17 19:11:22,948 INFO [org.apa.kaf.con.sto.FileOffsetBackingStore] (pool-3-thread-1) Starting FileOffsetBackingStore with file data/offsets.dat
2020-07-17 19:11:22,989 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) Starting MySqlConnectorTask with configuration:
2020-07-17 19:11:22,990 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) connector.class = io.debezium.connector.mysql.MySqlConnector
2020-07-17 19:11:22,990 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) offset.flush.interval.ms = 0
2020-07-17 19:11:22,990 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.user = mysqluser
2020-07-17 19:11:22,990 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.dbname = inventory
2020-07-17 19:11:22,990 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) offset.storage.file.filename = data/offsets.dat
2020-07-17 19:11:22,990 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.hostname = 127.0.0.1
2020-07-17 19:11:22,990 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.id = 12345
2020-07-17 19:11:22,990 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.password = ********
2020-07-17 19:11:22,990 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) name = kinesis
2020-07-17 19:11:22,990 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.server.name = 127.0.0.1
2020-07-17 19:11:22,990 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.port = 3306
2020-07-17 19:11:22,991 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) schema.whitelist = inventory
2020-07-17 19:11:23,063 INFO [io.quarkus] (main) debezium-server-dist 1.2.0.Final on JVM (powered by Quarkus 1.5.0.Final) started in 1.177s. Listening on: http://0.0.0.0:8080
2020-07-17 19:11:23,065 INFO [io.quarkus] (main) Profile prod activated.
2020-07-17 19:11:23,065 INFO [io.quarkus] (main) Installed features: [cdi, smallrye-health]
2020-07-17 19:11:23,276 ERROR [io.deb.rel.his.KafkaDatabaseHistory] (pool-3-thread-1) The 'database.history.kafka.topic' value is invalid: A value is required
2020-07-17 19:11:23,276 ERROR [io.deb.rel.his.KafkaDatabaseHistory] (pool-3-thread-1) The 'database.history.kafka.bootstrap.servers' value is invalid: A value is required**
2020-07-17 19:11:23,277 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) Stopping down connector
2020-07-17 19:11:23,277 INFO [io.deb.con.mys.MySqlConnectorTask] (pool-3-thread-1) Stopping MySQL connector task
2020-07-17 19:11:23,278 INFO [org.apa.kaf.con.sto.FileOffsetBackingStore] (pool-3-thread-1) Stopped FileOffsetBackingStore
2020-07-17 19:11:23,278 INFO [io.deb.ser.ConnectorLifecycle] (pool-3-thread-1) Connector completed: success = 'false', message = 'Unable to initialize and start connector's task class 'io.debezium.connector.mysql.MySqlConnectorTask' with config: {name=kinesis, connector.class=io.debezium.connector.mysql.MySqlConnector, database.id=12345, schema.whitelist=inventory, database.port=3306, database.user=username, database.hostname=127.0.0.1, offset.storage.file.filename=data/offsets.dat, database.password=********, offset.flush.interval.ms=0, database.server.name=127.0.0.1, database.dbname=inventory}', error = '{}': org.apache.kafka.connect.errors.ConnectException: Error configuring an instance of KafkaDatabaseHistory; check the logs for details
at io.debezium.relational.history.KafkaDatabaseHistory.configure(KafkaDatabaseHistory.java:180)
at io.debezium.connector.mysql.MySqlSchema.<init>(MySqlSchema.java:139)
at io.debezium.connector.mysql.MySqlTaskContext.<init>(MySqlTaskContext.java:86)
at io.debezium.connector.mysql.MySqlTaskContext.<init>(MySqlTaskContext.java:52)
at io.debezium.connector.mysql.MySqlConnectorTask.createAndStartTaskContext(MySqlConnectorTask.java:357)
at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:143)
at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:101)
at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:756)
at io.debezium.embedded.ConvertingEngineBuilder.run(ConvertingEngineBuilder.java:170)
at io.debezium.server.DebeziumServer.lambda$start(DebeziumServer.java:133)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
^C2020-07-18 00:00:44,245 INFO [io.deb.ser.DebeziumServer] (main) Received request to stop the engine
2020-07-18 00:00:44,245 INFO [io.deb.emb.EmbeddedEngine] (main) Stopping the embedded engine
2020-07-18 00:00:44,254 INFO [io.quarkus] (main) debezium-server-dist stopped in 0.028s
你可以大约 3/4 的方式下来我得到两个相关的错误:
io.deb.rel.his.KafkaDatabaseHistory
使用 Kinesis 源我不确定它为什么抛出这些与 Kafka 相关的错误。有谁知道为什么会发生这种情况或解决方案?
您需要指定 debezium.source.database.history
property for the mysql 连接器。它的默认值为 io.debezium.relational.history.KafkaDatabaseHistory
,因此对于非 Kafka 部署请设置以下值之一:
io.debezium.relational.history.FileDatabaseHistory
(连同 debezium.source.database.history.file.filename
属性);
io.debezium.relational.history.MemoryDatabaseHistory
用于测试环境。
我正在尝试让 Debezium 将我的 CDC 事件发送到 Kinesis,该服务似乎已启动但存在错误,而且它似乎没有向 Kinesis 发送任何内容。我在 Debezium 网站上遵循了这个设置指南:
https://debezium.io/documentation/reference/1.2/operations/debezium-server.html
这是我的配置文件:
debezium.sink.type=kinesis
debezium.sink.kinesis.region=us-east-1
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=127.0.0.1
debezium.source.database.id=12345
debezium.source.database.port=3306
debezium.source.database.user=*****
debezium.source.database.password=******
debezium.source.database.dbname=inventory
debezium.source.database.server.name=127.0.0.1
debezium.source.schema.whitelist=inventory
当我 运行 服务器使用 ./run.sh
时,服务器似乎开始启动然后我得到一个错误:
2020-07-17 19:11:22,380 INFO [io.deb.ser.BaseChangeConsumer] (main) Using 'io.debezium.server.BaseChangeConsumer$$Lambda/694452085@2abf4075' stream name mapper
2020-07-17 19:11:22,754 INFO [io.deb.ser.kin.KinesisChangeConsumer] (main) Using default KinesisClient 'software.amazon.awssdk.services.kinesis.DefaultKinesisClient@5d5f10b2'
2020-07-17 19:11:22,755 INFO [io.deb.ser.DebeziumServer] (main) Consumer 'io.debezium.server.kinesis.KinesisChangeConsumer' instantiated
2020-07-17 19:11:22,926 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
converter.type = key
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = true
2020-07-17 19:11:22,928 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
converter.type = value
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = false
2020-07-17 19:11:22,936 INFO [io.deb.emb.EmbeddedEngine$EmbeddedConfig] (main) EmbeddedConfig values:
access.control.allow.methods =
access.control.allow.origin =
admin.listeners = null
bootstrap.servers = [localhost:9092]
client.dns.lookup = default
config.providers = []
connector.client.config.override.policy = None
header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
key.converter = class org.apache.kafka.connect.json.JsonConverter
listeners = null
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
offset.flush.interval.ms = 0
offset.flush.timeout.ms = 5000
offset.storage.file.filename = data/offsets.dat
offset.storage.partitions = null
offset.storage.replication.factor = null
offset.storage.topic =
plugin.path = null
rest.advertised.host.name = null
rest.advertised.listener = null
rest.advertised.port = null
rest.extension.classes = []
rest.host.name = null
rest.port = 8083
ssl.client.auth = none
task.shutdown.graceful.timeout.ms = 5000
topic.tracking.allow.reset = true
topic.tracking.enable = true
value.converter = class org.apache.kafka.connect.json.JsonConverter
2020-07-17 19:11:22,937 INFO [org.apa.kaf.con.run.WorkerConfig] (main) Worker configuration property 'internal.key.converter' is deprecated and may be removed in an upcoming release. The specified value 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this property can be safely removed from the worker configuration.
2020-07-17 19:11:22,937 INFO [org.apa.kaf.con.run.WorkerConfig] (main) Worker configuration property 'internal.value.converter' is deprecated and may be removed in an upcoming release. The specified value 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this property can be safely removed from the worker configuration.
2020-07-17 19:11:22,939 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
converter.type = key
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = true
2020-07-17 19:11:22,940 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
converter.type = value
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = true
2020-07-17 19:11:22,942 INFO [io.deb.ser.DebeziumServer] (main) Engine executor started
2020-07-17 19:11:22,948 INFO [org.apa.kaf.con.sto.FileOffsetBackingStore] (pool-3-thread-1) Starting FileOffsetBackingStore with file data/offsets.dat
2020-07-17 19:11:22,989 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) Starting MySqlConnectorTask with configuration:
2020-07-17 19:11:22,990 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) connector.class = io.debezium.connector.mysql.MySqlConnector
2020-07-17 19:11:22,990 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) offset.flush.interval.ms = 0
2020-07-17 19:11:22,990 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.user = mysqluser
2020-07-17 19:11:22,990 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.dbname = inventory
2020-07-17 19:11:22,990 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) offset.storage.file.filename = data/offsets.dat
2020-07-17 19:11:22,990 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.hostname = 127.0.0.1
2020-07-17 19:11:22,990 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.id = 12345
2020-07-17 19:11:22,990 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.password = ********
2020-07-17 19:11:22,990 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) name = kinesis
2020-07-17 19:11:22,990 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.server.name = 127.0.0.1
2020-07-17 19:11:22,990 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.port = 3306
2020-07-17 19:11:22,991 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) schema.whitelist = inventory
2020-07-17 19:11:23,063 INFO [io.quarkus] (main) debezium-server-dist 1.2.0.Final on JVM (powered by Quarkus 1.5.0.Final) started in 1.177s. Listening on: http://0.0.0.0:8080
2020-07-17 19:11:23,065 INFO [io.quarkus] (main) Profile prod activated.
2020-07-17 19:11:23,065 INFO [io.quarkus] (main) Installed features: [cdi, smallrye-health]
2020-07-17 19:11:23,276 ERROR [io.deb.rel.his.KafkaDatabaseHistory] (pool-3-thread-1) The 'database.history.kafka.topic' value is invalid: A value is required
2020-07-17 19:11:23,276 ERROR [io.deb.rel.his.KafkaDatabaseHistory] (pool-3-thread-1) The 'database.history.kafka.bootstrap.servers' value is invalid: A value is required**
2020-07-17 19:11:23,277 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) Stopping down connector
2020-07-17 19:11:23,277 INFO [io.deb.con.mys.MySqlConnectorTask] (pool-3-thread-1) Stopping MySQL connector task
2020-07-17 19:11:23,278 INFO [org.apa.kaf.con.sto.FileOffsetBackingStore] (pool-3-thread-1) Stopped FileOffsetBackingStore
2020-07-17 19:11:23,278 INFO [io.deb.ser.ConnectorLifecycle] (pool-3-thread-1) Connector completed: success = 'false', message = 'Unable to initialize and start connector's task class 'io.debezium.connector.mysql.MySqlConnectorTask' with config: {name=kinesis, connector.class=io.debezium.connector.mysql.MySqlConnector, database.id=12345, schema.whitelist=inventory, database.port=3306, database.user=username, database.hostname=127.0.0.1, offset.storage.file.filename=data/offsets.dat, database.password=********, offset.flush.interval.ms=0, database.server.name=127.0.0.1, database.dbname=inventory}', error = '{}': org.apache.kafka.connect.errors.ConnectException: Error configuring an instance of KafkaDatabaseHistory; check the logs for details
at io.debezium.relational.history.KafkaDatabaseHistory.configure(KafkaDatabaseHistory.java:180)
at io.debezium.connector.mysql.MySqlSchema.<init>(MySqlSchema.java:139)
at io.debezium.connector.mysql.MySqlTaskContext.<init>(MySqlTaskContext.java:86)
at io.debezium.connector.mysql.MySqlTaskContext.<init>(MySqlTaskContext.java:52)
at io.debezium.connector.mysql.MySqlConnectorTask.createAndStartTaskContext(MySqlConnectorTask.java:357)
at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:143)
at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:101)
at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:756)
at io.debezium.embedded.ConvertingEngineBuilder.run(ConvertingEngineBuilder.java:170)
at io.debezium.server.DebeziumServer.lambda$start(DebeziumServer.java:133)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
^C2020-07-18 00:00:44,245 INFO [io.deb.ser.DebeziumServer] (main) Received request to stop the engine
2020-07-18 00:00:44,245 INFO [io.deb.emb.EmbeddedEngine] (main) Stopping the embedded engine
2020-07-18 00:00:44,254 INFO [io.quarkus] (main) debezium-server-dist stopped in 0.028s
你可以大约 3/4 的方式下来我得到两个相关的错误: io.deb.rel.his.KafkaDatabaseHistory
使用 Kinesis 源我不确定它为什么抛出这些与 Kafka 相关的错误。有谁知道为什么会发生这种情况或解决方案?
您需要指定 debezium.source.database.history
property for the mysql 连接器。它的默认值为 io.debezium.relational.history.KafkaDatabaseHistory
,因此对于非 Kafka 部署请设置以下值之一:
io.debezium.relational.history.FileDatabaseHistory
(连同debezium.source.database.history.file.filename
属性);io.debezium.relational.history.MemoryDatabaseHistory
用于测试环境。