实时向 ElasticSearch 提供 MySQL table 数据的最佳方式是什么?
What's the best way to feed MySQL table data to ElasticSearch in real-time?
我需要将 MySQL(在 AWS 上使用 RDS 部署)table 数据实时或接近实时(可能延迟几分钟)输入 ElasticSearch,加入一对table 正在处理中。
我考察的第一个方案是Flink。但经过一些研究后,我找不到一种方法来流式传输 table 数据更改,因为 table 不是仅附加的。
然后我发现有些人在谈论 CDC(Change Data Capture),基本上是将 MySQL binlog 更改流式传输到 lambda 并解析它然后 post 到 ElasticSearch,但这听起来太过分了复杂且容易出错。
是否有任何经过行业验证的方法可以将非附加 table 同步到 ElasticSearch?
您可以使用 logstash 脚本从 mysql 获取数据到 elasticsearch。
示例 Logstash 代码
input {
jdbc {
jdbc_driver_library => "<pathToYourDataBaseDriver>\mysql-connector-java-5.1.39.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/ecomdb"
jdbc_user => <db username>
jdbc_password => <db password>
tracking_column¹ => "regdate"
use_column_value²=>true
statement => "SELECT * FROM ecomdb.customer where regdate >:sql_last_value;"
schedule³ => " * * * * * *"
}
}
output {
elasticsearch {
document_id⁴=> "%{id}"
document_type => "doc"
index => "test"
hosts => ["http://localhost:9200"]
}
stdout{
codec => rubydebug
}
}
我需要将 MySQL(在 AWS 上使用 RDS 部署)table 数据实时或接近实时(可能延迟几分钟)输入 ElasticSearch,加入一对table 正在处理中。
我考察的第一个方案是Flink。但经过一些研究后,我找不到一种方法来流式传输 table 数据更改,因为 table 不是仅附加的。
然后我发现有些人在谈论 CDC(Change Data Capture),基本上是将 MySQL binlog 更改流式传输到 lambda 并解析它然后 post 到 ElasticSearch,但这听起来太过分了复杂且容易出错。
是否有任何经过行业验证的方法可以将非附加 table 同步到 ElasticSearch?
您可以使用 logstash 脚本从 mysql 获取数据到 elasticsearch。
示例 Logstash 代码
input {
jdbc {
jdbc_driver_library => "<pathToYourDataBaseDriver>\mysql-connector-java-5.1.39.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/ecomdb"
jdbc_user => <db username>
jdbc_password => <db password>
tracking_column¹ => "regdate"
use_column_value²=>true
statement => "SELECT * FROM ecomdb.customer where regdate >:sql_last_value;"
schedule³ => " * * * * * *"
}
}
output {
elasticsearch {
document_id⁴=> "%{id}"
document_type => "doc"
index => "test"
hosts => ["http://localhost:9200"]
}
stdout{
codec => rubydebug
}
}