如何用logstash动态定义elasticsearch索引?
How to dynamically define elasticsearch index with logstash?
请参阅下面的 logstash 配置文件,使用 jdbc 插件将记录从 mysql 数据库提取到 elasticsearch 索引中。如何对其进行修改,以便根据在数据库中找到的 company_id
值生成单独的索引,如下所示:company_%{company_id}_user_events
。
这可以动态完成,还是需要为每个公司 ID 创建单独的 logstash 配置文件并进行预配置和硬编码?是否有一些中间立场,例如脚本或模板?
如果有帮助,可以将 company_id
字段添加到数据库中的 ahoy_events
table,而不是 "added" 通过用户关联的方式现在。
当前logstash.conf
input {
jdbc {
jdbc_driver_library => "/opt/mysql-connector-java-5.1.47-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://mysql:3306/${DB_NAME}"
jdbc_user => "${DB_USER}"
jdbc_password => "${DB_PASSWORD}"
schedule => "* * * * *"
statement => "select * from ahoy_events where time > :sql_last_value"
}
}
filter {
jdbc_streaming {
jdbc_driver_library => "/opt/mysql-connector-java-5.1.47-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://mysql:3306/${DB_NAME}"
jdbc_user => "${DB_USER}"
jdbc_password => "${DB_PASSWORD}"
statement => "select * from users where id = :user"
parameters => { "user" => "user_id" }
target => "user"
}
jdbc_streaming {
jdbc_driver_library => "/opt/mysql-connector-java-5.1.47-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://mysql:3306/${DB_NAME}"
jdbc_user => "${DB_USER}"
jdbc_password => "${DB_PASSWORD}"
statement => "select * from visits where id = :visits"
parameters => { "visits" => "visit_id" }
target => "visits"
}
mutate {
add_field => { "company_id" => "%{[user][0][company_id]}"}
}
jdbc_streaming {
jdbc_driver_library => "/opt/mysql-connector-java-5.1.47-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://mysql:3306/${DB_NAME}"
jdbc_user => "${DB_USER}"
jdbc_password => "${DB_PASSWORD}"
statement => "select * from companies where id = :company_id"
parameters => { "company_id" => "company_id" }
target => "company"
}
json {
source => "properties"
target => "properties"
}
mutate {
add_field => { "user_name" => "%{[user][0][name]}" }
}
mutate {
add_field => { "company_name" => "%{[company][0][name]}" }
}
mutate {
rename => { "[visits][0]" => "visit" }
}
mutate {
remove_field => ["visits", "company", "user"]
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "user_events-%{+YYYY.MM.dd}"
document_id => "%{id}"
}
}
所需的结果是具有 company_id 命名空间的索引:
company_%{company_id}_user_events
这样我就可以稍后以相同的模式添加其他索引
company_%{company_id}_other_records
不是 100% 确定,但从技术上讲应该很简单:
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "company_%{company_id}_events-%{+YYYY.MM.dd}"
document_id => "%{id}"
}
}
请参阅下面的 logstash 配置文件,使用 jdbc 插件将记录从 mysql 数据库提取到 elasticsearch 索引中。如何对其进行修改,以便根据在数据库中找到的 company_id
值生成单独的索引,如下所示:company_%{company_id}_user_events
。
这可以动态完成,还是需要为每个公司 ID 创建单独的 logstash 配置文件并进行预配置和硬编码?是否有一些中间立场,例如脚本或模板?
如果有帮助,可以将 company_id
字段添加到数据库中的 ahoy_events
table,而不是 "added" 通过用户关联的方式现在。
当前logstash.conf
input {
jdbc {
jdbc_driver_library => "/opt/mysql-connector-java-5.1.47-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://mysql:3306/${DB_NAME}"
jdbc_user => "${DB_USER}"
jdbc_password => "${DB_PASSWORD}"
schedule => "* * * * *"
statement => "select * from ahoy_events where time > :sql_last_value"
}
}
filter {
jdbc_streaming {
jdbc_driver_library => "/opt/mysql-connector-java-5.1.47-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://mysql:3306/${DB_NAME}"
jdbc_user => "${DB_USER}"
jdbc_password => "${DB_PASSWORD}"
statement => "select * from users where id = :user"
parameters => { "user" => "user_id" }
target => "user"
}
jdbc_streaming {
jdbc_driver_library => "/opt/mysql-connector-java-5.1.47-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://mysql:3306/${DB_NAME}"
jdbc_user => "${DB_USER}"
jdbc_password => "${DB_PASSWORD}"
statement => "select * from visits where id = :visits"
parameters => { "visits" => "visit_id" }
target => "visits"
}
mutate {
add_field => { "company_id" => "%{[user][0][company_id]}"}
}
jdbc_streaming {
jdbc_driver_library => "/opt/mysql-connector-java-5.1.47-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://mysql:3306/${DB_NAME}"
jdbc_user => "${DB_USER}"
jdbc_password => "${DB_PASSWORD}"
statement => "select * from companies where id = :company_id"
parameters => { "company_id" => "company_id" }
target => "company"
}
json {
source => "properties"
target => "properties"
}
mutate {
add_field => { "user_name" => "%{[user][0][name]}" }
}
mutate {
add_field => { "company_name" => "%{[company][0][name]}" }
}
mutate {
rename => { "[visits][0]" => "visit" }
}
mutate {
remove_field => ["visits", "company", "user"]
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "user_events-%{+YYYY.MM.dd}"
document_id => "%{id}"
}
}
所需的结果是具有 company_id 命名空间的索引:
company_%{company_id}_user_events
这样我就可以稍后以相同的模式添加其他索引
company_%{company_id}_other_records
不是 100% 确定,但从技术上讲应该很简单:
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "company_%{company_id}_events-%{+YYYY.MM.dd}"
document_id => "%{id}"
}
}