在 logstash 中为 sql_last_value 使用 table 的 id?
Using an id of a table for sql_last_value in logstash?
我在 logstash
输入的 jdbc
插件中有一个 MySQL 语句。
statement => "SELECT * from TEST where id > :sql_last_value"
我的 table 没有任何 date
或 datetime
字段。所以我试图通过使用 scheduler
逐分钟检查来更新索引,是否有任何新行已添加到 table.
我应该只能更新新记录,而不是更新现有记录的现有值更改。所以要做到这一点,我有这种 logstash
输入:
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://myhostmachine:3306/mydb"
jdbc_user => "root"
jdbc_password => "root"
jdbc_validate_connection => true
jdbc_driver_library => "/mypath/mysql-connector-java-5.1.39-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
schedule => "* * * * *"
statement => "SELECT * from mytable where id > :sql_last_value"
use_column_value => true
tracking_column => id
last_run_metadata_path => "/path/.logstash_jdbc_last_run"
clean_run => true
}
}
因此,每当我创建索引和 运行 这个 logstash
文件以上传文档时,它根本不会被上传。文档计数显示为零。我确保在 运行 logstash
conf 文件之前删除了 .logstash_jdbc_last_run
。
部分 logstash 控制台输出:
[2016-11-02T16:33:00,294][INFO ][logstash.inputs.jdbc ]
(0.002000s) SELECT count(*) AS count
FROM (SELECT * from TEST where
id > '2016-11-02 11:02:00') AS t1
LIMIT 1
这通过逐分钟检查正确的方式继续进行,但随后它没有获得记录。它是如何工作的?
我错过了什么吗?任何帮助都将不胜感激。
您需要像这样修改您的 logstash 配置:
jdbc {
jdbc_connection_string => "jdbc:mysql://myhostmachine:3306/mydb"
jdbc_user => "root"
jdbc_password => "root"
jdbc_validate_connection => true
jdbc_driver_library => "/mypath/mysql-connector-java-5.1.39-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
schedule => "* * * * *"
statement => "SELECT * from TEST where id > :sql_last_value"
use_column_value => true
tracking_column => "id"
tracking_column_type => "numeric"
clean_run => true
last_run_metadata_path => "/mypath/.logstash_jdbc_last_run"
}
最后五项设置对您的情况很重要。还要确保删除 .logstash_jdbc_last_run
文件,即使 clean_run => true
删除了它。
我在 logstash
输入的 jdbc
插件中有一个 MySQL 语句。
statement => "SELECT * from TEST where id > :sql_last_value"
我的 table 没有任何 date
或 datetime
字段。所以我试图通过使用 scheduler
逐分钟检查来更新索引,是否有任何新行已添加到 table.
我应该只能更新新记录,而不是更新现有记录的现有值更改。所以要做到这一点,我有这种 logstash
输入:
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://myhostmachine:3306/mydb"
jdbc_user => "root"
jdbc_password => "root"
jdbc_validate_connection => true
jdbc_driver_library => "/mypath/mysql-connector-java-5.1.39-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
schedule => "* * * * *"
statement => "SELECT * from mytable where id > :sql_last_value"
use_column_value => true
tracking_column => id
last_run_metadata_path => "/path/.logstash_jdbc_last_run"
clean_run => true
}
}
因此,每当我创建索引和 运行 这个 logstash
文件以上传文档时,它根本不会被上传。文档计数显示为零。我确保在 运行 logstash
conf 文件之前删除了 .logstash_jdbc_last_run
。
部分 logstash 控制台输出:
[2016-11-02T16:33:00,294][INFO ][logstash.inputs.jdbc ] (0.002000s) SELECT count(*) AS
count
FROM (SELECT * from TEST where id > '2016-11-02 11:02:00') ASt1
LIMIT 1
这通过逐分钟检查正确的方式继续进行,但随后它没有获得记录。它是如何工作的?
我错过了什么吗?任何帮助都将不胜感激。
您需要像这样修改您的 logstash 配置:
jdbc {
jdbc_connection_string => "jdbc:mysql://myhostmachine:3306/mydb"
jdbc_user => "root"
jdbc_password => "root"
jdbc_validate_connection => true
jdbc_driver_library => "/mypath/mysql-connector-java-5.1.39-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
schedule => "* * * * *"
statement => "SELECT * from TEST where id > :sql_last_value"
use_column_value => true
tracking_column => "id"
tracking_column_type => "numeric"
clean_run => true
last_run_metadata_path => "/mypath/.logstash_jdbc_last_run"
}
最后五项设置对您的情况很重要。还要确保删除 .logstash_jdbc_last_run
文件,即使 clean_run => true
删除了它。