我应该如何在 logstash 中使用 sql_last_value?
How should I use sql_last_value in logstash?
我不太清楚 sql_last_value
在我这样陈述时做了什么:
statement => "SELECT * from mytable where id > :sql_last_value"
我可以稍微理解使用它的原因,它不会浏览整个数据库 table 以更新字段,而是只更新新添加的记录。如果我错了请纠正我。
所以我想做的是,使用 logstash
创建索引:
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://hostmachine:3306/db"
jdbc_user => "root"
jdbc_password => "root"
jdbc_validate_connection => true
jdbc_driver_library => "/path/mysql_jar/mysql-connector-java-5.1.39-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
schedule => "* * * * *"
statement => "SELECT * from mytable where id > :sql_last_value"
use_column_value => true
tracking_column => id
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
}
}
output {
elasticsearch {
#protocol => http
index => "myindex"
document_type => "message_logs"
document_id => "%{id}"
action => index
hosts => ["http://myhostmachine:9402"]
}
}
执行此操作后,文档根本不会上传到索引。我哪里错了?
如有任何帮助,我们将不胜感激。
如果您的 table 中有时间戳列(例如 last_updated
),您最好使用它而不是 ID 列。因此,当一条记录被更新时,您也可以修改该时间戳,并且 jdbc
输入插件将获取该记录(即 ID 列不会更改其值,更新后的记录也不会被获取)
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://hostmachine:3306/db"
jdbc_user => "root"
jdbc_password => "root"
jdbc_validate_connection => true
jdbc_driver_library => "/path/mysql_jar/mysql-connector-java-5.1.39-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
schedule => "* * * * *"
statement => "SELECT * from mytable where last_updated > :sql_last_value"
}
}
如果您仍然决定保留 ID 列,则应删除 $HOME/.logstash_jdbc_last_run
文件并重试。
有几件事需要注意:
如果您之前 运行 Logstash 没有计划,那么在 运行 具有计划的 Logstash 之前,删除文件:
$HOME/.logstash_jdbc_last_run
在 Windows 中,此文件位于:
C:\Users\<Username>\.logstash_jdbc_last_run
Logstash 配置中的 "statement =>" 应该有 "order by" tracking_column.
tracking_column应该给对了
这是 Logstash 配置文件的示例:
input {
jdbc {
# MySQL DB jdbc connection string to our database, softwaredevelopercentral
jdbc_connection_string => "jdbc:mysql://localhost:3306/softwaredevelopercentral?autoReconnect=true&useSSL=false"
# The user we wish to execute our statement as
jdbc_user => "root"
# The user password
jdbc_password => ""
# The path to our downloaded jdbc driver
jdbc_driver_library => "D:\Programs\MySQLJava\mysql-connector-java-6.0.6.jar"
# The name of the driver class for MySQL DB
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
# our query
schedule => "* * * * *"
statement => "SELECT * FROM student WHERE studentid > :sql_last_value order by studentid"
use_column_value => true
tracking_column => "studentid"
}
}
output {
stdout { codec => json_lines }
elasticsearch {
hosts => ["localhost:9200"]
index => "students"
document_type => "student"
document_id => "%{studentid}"
}
}
要查看相同的工作示例,您可以查看我的博客 post:
http://softwaredevelopercentral.blogspot.com/2017/10/elasticsearch-logstash-kibana-tutorial.html
简而言之,sql_last_value
允许您保留上次 sql 运行 的数据作为其名称 sugets。
此值在您安排查询 时特别有用。但为什么 ... ?
因为您可以根据存储在 sql_last_value
和 中的值创建 sql 语句条件,所以避免检索已为您的 logstash 输入摄取或在上次管道执行后更新的行.
使用时的注意事项sql_last_value
- 默认情况下,此变量存储最后 运行 的时间戳。当您需要根据
creation_date
last_update
等列提取数据时很有用..
- 您可以通过使用特定 table 的列值跟踪它来定义
sql_last_value
的值。当您需要基于摄取自动增量数据时很有用。为此,您需要指定 use_column_value => true
and tracking_column => "column_name_to_track"
.
以下示例会将最后一个 mytable 行的 id 存储到 :sql_last_value
中,以便在下一次执行中摄取之前未摄取的行,这意味着其 id 大于最后摄取的 id 的行。
input {
jdbc {
# ...
schedule => "* * * * *"
statement => "SELECT * from mytable where id > :sql_last_value"
use_column_value => true
tracking_column => id
}
}
极其重要!!!
当您在管道中使用多个输入时,每个输入块都会覆盖最后一个输入块的 sql_last_value
值。为了避免这种行为,您可以使用 last_run_metadata_path => "/path/to/sql_last_value/of_your_pipeline.yml"
选项,这意味着每个管道将在不同的文件中存储自己的值。
我不太清楚 sql_last_value
在我这样陈述时做了什么:
statement => "SELECT * from mytable where id > :sql_last_value"
我可以稍微理解使用它的原因,它不会浏览整个数据库 table 以更新字段,而是只更新新添加的记录。如果我错了请纠正我。
所以我想做的是,使用 logstash
创建索引:
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://hostmachine:3306/db"
jdbc_user => "root"
jdbc_password => "root"
jdbc_validate_connection => true
jdbc_driver_library => "/path/mysql_jar/mysql-connector-java-5.1.39-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
schedule => "* * * * *"
statement => "SELECT * from mytable where id > :sql_last_value"
use_column_value => true
tracking_column => id
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
}
}
output {
elasticsearch {
#protocol => http
index => "myindex"
document_type => "message_logs"
document_id => "%{id}"
action => index
hosts => ["http://myhostmachine:9402"]
}
}
执行此操作后,文档根本不会上传到索引。我哪里错了?
如有任何帮助,我们将不胜感激。
如果您的 table 中有时间戳列(例如 last_updated
),您最好使用它而不是 ID 列。因此,当一条记录被更新时,您也可以修改该时间戳,并且 jdbc
输入插件将获取该记录(即 ID 列不会更改其值,更新后的记录也不会被获取)
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://hostmachine:3306/db"
jdbc_user => "root"
jdbc_password => "root"
jdbc_validate_connection => true
jdbc_driver_library => "/path/mysql_jar/mysql-connector-java-5.1.39-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
schedule => "* * * * *"
statement => "SELECT * from mytable where last_updated > :sql_last_value"
}
}
如果您仍然决定保留 ID 列,则应删除 $HOME/.logstash_jdbc_last_run
文件并重试。
有几件事需要注意:
如果您之前 运行 Logstash 没有计划,那么在 运行 具有计划的 Logstash 之前,删除文件:
$HOME/.logstash_jdbc_last_run
在 Windows 中,此文件位于:
C:\Users\<Username>\.logstash_jdbc_last_run
Logstash 配置中的 "statement =>" 应该有 "order by" tracking_column.
tracking_column应该给对了
这是 Logstash 配置文件的示例:
input {
jdbc {
# MySQL DB jdbc connection string to our database, softwaredevelopercentral
jdbc_connection_string => "jdbc:mysql://localhost:3306/softwaredevelopercentral?autoReconnect=true&useSSL=false"
# The user we wish to execute our statement as
jdbc_user => "root"
# The user password
jdbc_password => ""
# The path to our downloaded jdbc driver
jdbc_driver_library => "D:\Programs\MySQLJava\mysql-connector-java-6.0.6.jar"
# The name of the driver class for MySQL DB
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
# our query
schedule => "* * * * *"
statement => "SELECT * FROM student WHERE studentid > :sql_last_value order by studentid"
use_column_value => true
tracking_column => "studentid"
}
}
output {
stdout { codec => json_lines }
elasticsearch {
hosts => ["localhost:9200"]
index => "students"
document_type => "student"
document_id => "%{studentid}"
}
}
要查看相同的工作示例,您可以查看我的博客 post: http://softwaredevelopercentral.blogspot.com/2017/10/elasticsearch-logstash-kibana-tutorial.html
简而言之,sql_last_value
允许您保留上次 sql 运行 的数据作为其名称 sugets。
此值在您安排查询 时特别有用。但为什么 ... ?
因为您可以根据存储在 sql_last_value
和 中的值创建 sql 语句条件,所以避免检索已为您的 logstash 输入摄取或在上次管道执行后更新的行.
使用时的注意事项sql_last_value
- 默认情况下,此变量存储最后 运行 的时间戳。当您需要根据
creation_date
last_update
等列提取数据时很有用.. - 您可以通过使用特定 table 的列值跟踪它来定义
sql_last_value
的值。当您需要基于摄取自动增量数据时很有用。为此,您需要指定use_column_value => true
andtracking_column => "column_name_to_track"
.
以下示例会将最后一个 mytable 行的 id 存储到 :sql_last_value
中,以便在下一次执行中摄取之前未摄取的行,这意味着其 id 大于最后摄取的 id 的行。
input {
jdbc {
# ...
schedule => "* * * * *"
statement => "SELECT * from mytable where id > :sql_last_value"
use_column_value => true
tracking_column => id
}
}
极其重要!!!
当您在管道中使用多个输入时,每个输入块都会覆盖最后一个输入块的 sql_last_value
值。为了避免这种行为,您可以使用 last_run_metadata_path => "/path/to/sql_last_value/of_your_pipeline.yml"
选项,这意味着每个管道将在不同的文件中存储自己的值。