弹性搜索中的Logstash重复数据
Logstash duplicate data in elasticsearch
有人可以帮助我使用 logstash。我想用 elastic 同步 PostgreSQL 问题是 es 中的当前设置重复数据。我知道创建字段“id”会有帮助,但 logstash 仍然会重新创建整个索引。
SQL
SELECT id, title, description, type,"updatedAt" FROM public.videos WHERE ("updatedAt" > :sql_last_value AND "updatedAt" < NOW())
jdbc {
jdbc_driver_library => "/usr/share/logstash/postgresql-jdbc.jar"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_connection_string => "jdbc:postgresql://{POSTGRES_URL}"
jdbc_user => "${POSTGRES_USERNAME}"
jdbc_password => "${POSTGRES_PASSWORD}"
jdbc_paging_enabled => true
tracking_column => "updatedAt"
tracking_column_type => "timestamp"
use_column_value => true
schedule => "*/5 * * * * *"
statement_filepath => "/usr/share/logstash/sql/video.sql"
tags => "video"
}
output {
if "video" in [tags] {
elasticsearch {
hosts => ["${ES_HOSTS}"]
index => "video"
# document_id => "%{[@metadata][_id]}"
user => "${ES_USER}"
password => "${ES_PASSWORD}"
cacert => '/etc/logstash/certificates/ca.crt'
doc_as_upsert => true
}
}
}
mysql 官方文档中的设置
jdbc {
jdbc_driver_library => "<path>/mysql-connector-java-8.0.16.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://<MySQL host>:3306/es_db"
jdbc_user => <my username>
jdbc_password => <my password>
jdbc_paging_enabled => true
tracking_column => "unix_ts_in_secs"
use_column_value => true
tracking_column_type => "numeric"
schedule => "*/5 * * * * *"
statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
}
关于您的输入部分,您需要确保您的tracking_column
实际上是一个可以跟踪的真实字段:
tracking_column => "updatedAt"
您的输出部分应该如下所示。正确设置 document_id
很重要,并且 [@metadata][_id]
在随机值中没有值,因此重复。
elasticsearch {
hosts => ["${ES_HOSTS}"]
index => "video"
document_id => "%{id}" <---- change this line
user => "${ES_USER}"
password => "${ES_PASSWORD}"
cacert => '/etc/logstash/certificates/ca.crt'
}
有人可以帮助我使用 logstash。我想用 elastic 同步 PostgreSQL 问题是 es 中的当前设置重复数据。我知道创建字段“id”会有帮助,但 logstash 仍然会重新创建整个索引。
SQL
SELECT id, title, description, type,"updatedAt" FROM public.videos WHERE ("updatedAt" > :sql_last_value AND "updatedAt" < NOW())
jdbc {
jdbc_driver_library => "/usr/share/logstash/postgresql-jdbc.jar"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_connection_string => "jdbc:postgresql://{POSTGRES_URL}"
jdbc_user => "${POSTGRES_USERNAME}"
jdbc_password => "${POSTGRES_PASSWORD}"
jdbc_paging_enabled => true
tracking_column => "updatedAt"
tracking_column_type => "timestamp"
use_column_value => true
schedule => "*/5 * * * * *"
statement_filepath => "/usr/share/logstash/sql/video.sql"
tags => "video"
}
output {
if "video" in [tags] {
elasticsearch {
hosts => ["${ES_HOSTS}"]
index => "video"
# document_id => "%{[@metadata][_id]}"
user => "${ES_USER}"
password => "${ES_PASSWORD}"
cacert => '/etc/logstash/certificates/ca.crt'
doc_as_upsert => true
}
}
}
mysql 官方文档中的设置
jdbc {
jdbc_driver_library => "<path>/mysql-connector-java-8.0.16.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://<MySQL host>:3306/es_db"
jdbc_user => <my username>
jdbc_password => <my password>
jdbc_paging_enabled => true
tracking_column => "unix_ts_in_secs"
use_column_value => true
tracking_column_type => "numeric"
schedule => "*/5 * * * * *"
statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
}
关于您的输入部分,您需要确保您的tracking_column
实际上是一个可以跟踪的真实字段:
tracking_column => "updatedAt"
您的输出部分应该如下所示。正确设置 document_id
很重要,并且 [@metadata][_id]
在随机值中没有值,因此重复。
elasticsearch {
hosts => ["${ES_HOSTS}"]
index => "video"
document_id => "%{id}" <---- change this line
user => "${ES_USER}"
password => "${ES_PASSWORD}"
cacert => '/etc/logstash/certificates/ca.crt'
}