日志存储未在 Elasticsearch 中加载准确数量的记录,并且每次命中结果都在不断变化
Log stash not loading exact number of records in Elasticsearch and on every hit results are keep changing
问题陈述:Logstash 没有正确地将所有记录从数据库加载到 elasticsearch,每次我点击相同的 api 都会得到不同的结果(但有时是正确的,但每次点击都会发生变化,并且只显示嵌套称呼下的记录子集场地)。 logstash 机制看起来很零散,加载结果不一致,尤其是在一对多场景中。
http://localhost:9200/staffsalutation/_search
我观察到 logstash logstash-7.8.0 的奇怪行为,同时从 2 个表加载记录,查询和配置如下
查询:
select s.update_time, s.staff_id 作为 staff_id, birth_date, first_name, last_name, 性别, hire_date,
st.title AS title_nm, st.from_date AS title_frm_dt, st.to_date AS title_to_dt
来自工作人员
LEFT JOIN 称呼 st ON s.staff_id = st.staff_id
按 s.update_time
排序
input {
jdbc {
jdbc_connection_string => "jdbc:postgresql://localhost:5432/postgres"
jdbc_driver_library => "C:\Users\NS\.m2\repository\org\postgresql\postgresql\42.2.11\postgresql-42.2.11.jar"
jdbc_user => "postgres"
jdbc_password => "postgres"
jdbc_driver_class => "org.postgresql.Driver"
schedule => "* * * * *"
statement => "select e.update_time, e.emp_no as staff_id, birth_date, first_name, last_name, gender, hire_date, t.title AS title_nm, t.from_date AS title_frm_dt, t.to_date AS title_to_dt
from employees e
LEFT JOIN titles t
ON e.emp_no = t.emp_no
order by e.update_time"
add_field => { "doctype" => "employee" }
tracking_column_type => "timestamp"
use_column_value =>true
tracking_column => update_time
jdbc_fetch_size => "50000"
}
}
filter {
aggregate {
task_id => "%{staff_id}"
code => "
map['staff_id'] = event.get('staff_id')
map['birth_date'] = event.get('birth_date')
map['first_name'] = event.get('first_name')
map['last_name'] = event.get('last_name')
map['gender'] = event.get('gender')
map['hire_date'] = event.get('hire_date')
map['salutations'] ||= []
map['salutations'] << {
'title_nm' => event.get('title_nm'),'title_frm_dt' => event.get('title_frm_dt'),
'title_to_dt' => event.get('title_to_dt')
}
event.cancel()
"
push_previous_map_as_event => true
timeout => 30
}
}
output {
elasticsearch {
document_id => "%{staff_id}"
index => "staffsalutation"
}
file {
path => "test.log"
codec => line
}
}
找到解决方案!
- 需要在查询中使用 order by 子句,以便记录按 emp_no 排序并且
logstash 可以搜索和聚合依赖实体,如标题(如一对多)。
select e.update_time, e.emp_no as staff_id, birth_date, first_name,
last_name, gender, hire_date, t.title AS title_nm, t.from_date AS
title_frm_dt, t.to_date AS title_to_dt from employees e LEFT
JOIN titles t ON e.emp_no = t.emp_no order by e.emp_no
- 由于这里使用聚合需要有单线程来处理其他记录
它会导致聚合问题(这就是您将根据上面的 url 多次调用以搜索索引时获得的随机结果)。虽然它看起来是一个性能损失,因为只有 1 个工作线程将处理记录,但可以通过调用具有异构记录集的多个 logstash 配置文件来缓解它,例如一个文件中的第一个 100 emp_no 和另一个文件中的第二百个,以便 logstash 可以并行执行它们。
所以像下面这样执行
logstash -f logstash_config.conf -w 1
问题陈述:Logstash 没有正确地将所有记录从数据库加载到 elasticsearch,每次我点击相同的 api 都会得到不同的结果(但有时是正确的,但每次点击都会发生变化,并且只显示嵌套称呼下的记录子集场地)。 logstash 机制看起来很零散,加载结果不一致,尤其是在一对多场景中。 http://localhost:9200/staffsalutation/_search 我观察到 logstash logstash-7.8.0 的奇怪行为,同时从 2 个表加载记录,查询和配置如下
查询:
select s.update_time, s.staff_id 作为 staff_id, birth_date, first_name, last_name, 性别, hire_date,
st.title AS title_nm, st.from_date AS title_frm_dt, st.to_date AS title_to_dt
来自工作人员
LEFT JOIN 称呼 st ON s.staff_id = st.staff_id
按 s.update_time
input {
jdbc {
jdbc_connection_string => "jdbc:postgresql://localhost:5432/postgres"
jdbc_driver_library => "C:\Users\NS\.m2\repository\org\postgresql\postgresql\42.2.11\postgresql-42.2.11.jar"
jdbc_user => "postgres"
jdbc_password => "postgres"
jdbc_driver_class => "org.postgresql.Driver"
schedule => "* * * * *"
statement => "select e.update_time, e.emp_no as staff_id, birth_date, first_name, last_name, gender, hire_date, t.title AS title_nm, t.from_date AS title_frm_dt, t.to_date AS title_to_dt
from employees e
LEFT JOIN titles t
ON e.emp_no = t.emp_no
order by e.update_time"
add_field => { "doctype" => "employee" }
tracking_column_type => "timestamp"
use_column_value =>true
tracking_column => update_time
jdbc_fetch_size => "50000"
}
}
filter {
aggregate {
task_id => "%{staff_id}"
code => "
map['staff_id'] = event.get('staff_id')
map['birth_date'] = event.get('birth_date')
map['first_name'] = event.get('first_name')
map['last_name'] = event.get('last_name')
map['gender'] = event.get('gender')
map['hire_date'] = event.get('hire_date')
map['salutations'] ||= []
map['salutations'] << {
'title_nm' => event.get('title_nm'),'title_frm_dt' => event.get('title_frm_dt'),
'title_to_dt' => event.get('title_to_dt')
}
event.cancel()
"
push_previous_map_as_event => true
timeout => 30
}
}
output {
elasticsearch {
document_id => "%{staff_id}"
index => "staffsalutation"
}
file {
path => "test.log"
codec => line
}
}
找到解决方案!
- 需要在查询中使用 order by 子句,以便记录按 emp_no 排序并且 logstash 可以搜索和聚合依赖实体,如标题(如一对多)。
select e.update_time, e.emp_no as staff_id, birth_date, first_name, last_name, gender, hire_date, t.title AS title_nm, t.from_date AS title_frm_dt, t.to_date AS title_to_dt from employees e LEFT JOIN titles t ON e.emp_no = t.emp_no order by e.emp_no
- 由于这里使用聚合需要有单线程来处理其他记录 它会导致聚合问题(这就是您将根据上面的 url 多次调用以搜索索引时获得的随机结果)。虽然它看起来是一个性能损失,因为只有 1 个工作线程将处理记录,但可以通过调用具有异构记录集的多个 logstash 配置文件来缓解它,例如一个文件中的第一个 100 emp_no 和另一个文件中的第二百个,以便 logstash 可以并行执行它们。 所以像下面这样执行
logstash -f logstash_config.conf -w 1