Docker,Debezium 没有将数据从 mssql 流式传输到 elasticsearch
Docker, Debezium not streaming data from mssql to elasticsearch
我按照这个例子将数据从 mysql 流式传输到 elasticsearch
https://github.com/debezium/debezium-examples/tree/master/unwrap-smt#elasticsearch-sink
该示例本身在我的本地计算机上运行良好。
但在我的例子中,我想将数据从 mssql(在另一台服务器上,而不是 docker)流式传输到 elasticsearch。
所以在“docker-compose-es.yaml”文件中我删除了“mysql”部分并删除了mysql链接。
并为 elastic 和 mssql 创建了我自己的 connectors/sink:
{
"name": "Test-connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.hostname": "192.168.1.234",
"database.port": "1433",
"database.user": "user",
"database.password": "pass",
"database.dbname": "Test",
"database.server.name": "MyServer",
"table.include.list": "dbo.TEST_A",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.testA"
}
}
{
"name": "elastic-sink-test",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "TEST_A",
"connection.url": "http://localhost:9200/",
"transforms": "unwrap,key",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.unwrap.drop.tombstones": "false",
"transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.key.field": "SQ",
"key.ignore": "false",
"type.name": "TEST_A",
"behavior.on.null.values": "delete"
}
}
添加这些时,kafka connect I/O 正在努力工作并且有超过 40GB 的输入,请参见下图:
在 kafka 日志中,它看起来像是经历了所有 tables。这是 table 日志之一:
2021-06-17 10:20:10,414 - INFO [data-plane-kafka-request-handler-5:Logging@66] - [Partition MyServer.dbo.TemplateGroup-0 broker=1] Log loaded for partition MyServer.dbo.TemplateGroup-0 with initial high watermark 0
2021-06-17 10:20:10,509 - INFO [data-plane-kafka-request-handler-3:Logging@66] - Creating topic MyServer.dbo.TemplateMeter with configuration {} and initial partition assignment Map(0 -> ArrayBuffer(1))
2021-06-17 10:20:10,516 - INFO [data-plane-kafka-request-handler-3:Logging@66] - [KafkaApi-1] Auto creation of topic MyServer.dbo.TemplateMeter with 1 partitions and replication factor 1 is successful
2021-06-17 10:20:10,526 - INFO [data-plane-kafka-request-handler-7:Logging@66] - [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(MyServer.dbo.TemplateMeter-0)
2021-06-17 10:20:10,528 - INFO [data-plane-kafka-request-handler-7:Logging@66] - [Log partition=MyServer.dbo.TemplateMeter-0, dir=/kafka/data/1] Loading producer state till offset 0 with message format version 2
数据库只有2GB。我不知道为什么它有这么高的输入。
test_a 没有在 elasticsearch 中创建索引 运行 这个命令:
curl http://localhost:9200/_aliases?pretty=true
有谁知道我如何从这里排除故障或指出正确的方向?
提前致谢!
how I troubleshoot from here
docker compose logs
?
修改Kafka Connect的log4j.properties and/or Elasitcsearch进程获取更多日志?
使用普通的 Kafka 消费者查看数据是否真的读入了 TEST_A
主题?
in the "docker-compose-es.yaml" ....
如果 Debezium 在容器中 运行,则 Elasticsearch 在 localhost:9200
时不可用
将该值更改为 http://elastic:9200
、like shown in the es-sink.json
我按照这个例子将数据从 mysql 流式传输到 elasticsearch https://github.com/debezium/debezium-examples/tree/master/unwrap-smt#elasticsearch-sink 该示例本身在我的本地计算机上运行良好。
但在我的例子中,我想将数据从 mssql(在另一台服务器上,而不是 docker)流式传输到 elasticsearch。
所以在“docker-compose-es.yaml”文件中我删除了“mysql”部分并删除了mysql链接。 并为 elastic 和 mssql 创建了我自己的 connectors/sink:
{
"name": "Test-connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.hostname": "192.168.1.234",
"database.port": "1433",
"database.user": "user",
"database.password": "pass",
"database.dbname": "Test",
"database.server.name": "MyServer",
"table.include.list": "dbo.TEST_A",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.testA"
}
}
{
"name": "elastic-sink-test",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "TEST_A",
"connection.url": "http://localhost:9200/",
"transforms": "unwrap,key",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.unwrap.drop.tombstones": "false",
"transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.key.field": "SQ",
"key.ignore": "false",
"type.name": "TEST_A",
"behavior.on.null.values": "delete"
}
}
添加这些时,kafka connect I/O 正在努力工作并且有超过 40GB 的输入,请参见下图:
在 kafka 日志中,它看起来像是经历了所有 tables。这是 table 日志之一:
2021-06-17 10:20:10,414 - INFO [data-plane-kafka-request-handler-5:Logging@66] - [Partition MyServer.dbo.TemplateGroup-0 broker=1] Log loaded for partition MyServer.dbo.TemplateGroup-0 with initial high watermark 0
2021-06-17 10:20:10,509 - INFO [data-plane-kafka-request-handler-3:Logging@66] - Creating topic MyServer.dbo.TemplateMeter with configuration {} and initial partition assignment Map(0 -> ArrayBuffer(1))
2021-06-17 10:20:10,516 - INFO [data-plane-kafka-request-handler-3:Logging@66] - [KafkaApi-1] Auto creation of topic MyServer.dbo.TemplateMeter with 1 partitions and replication factor 1 is successful
2021-06-17 10:20:10,526 - INFO [data-plane-kafka-request-handler-7:Logging@66] - [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(MyServer.dbo.TemplateMeter-0)
2021-06-17 10:20:10,528 - INFO [data-plane-kafka-request-handler-7:Logging@66] - [Log partition=MyServer.dbo.TemplateMeter-0, dir=/kafka/data/1] Loading producer state till offset 0 with message format version 2
数据库只有2GB。我不知道为什么它有这么高的输入。
test_a 没有在 elasticsearch 中创建索引 运行 这个命令:
curl http://localhost:9200/_aliases?pretty=true
有谁知道我如何从这里排除故障或指出正确的方向?
提前致谢!
how I troubleshoot from here
docker compose logs
?
修改Kafka Connect的log4j.properties and/or Elasitcsearch进程获取更多日志?
使用普通的 Kafka 消费者查看数据是否真的读入了 TEST_A
主题?
in the "docker-compose-es.yaml" ....
如果 Debezium 在容器中 运行,则 Elasticsearch 在 localhost:9200
时不可用将该值更改为 http://elastic:9200
、like shown in the es-sink.json