弹性搜索中的Logstash重复数据

Logstash duplicate data in elasticsearch

有人可以帮助我使用 logstash。我想用 elastic 同步 PostgreSQL 问题是 es 中的当前设置重复数据。我知道创建字段“id”会有帮助,但 logstash 仍然会重新创建整个索引。

SQL

SELECT id, title, description, type,"updatedAt" FROM public.videos WHERE ("updatedAt" > :sql_last_value AND "updatedAt" < NOW())
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/postgresql-jdbc.jar"
    jdbc_driver_class => "org.postgresql.Driver"
    jdbc_connection_string => "jdbc:postgresql://{POSTGRES_URL}"
    jdbc_user => "${POSTGRES_USERNAME}"
    jdbc_password => "${POSTGRES_PASSWORD}"
    jdbc_paging_enabled => true
    tracking_column => "updatedAt"
    tracking_column_type => "timestamp"
    use_column_value => true
    schedule => "*/5 * * * * *"
    statement_filepath => "/usr/share/logstash/sql/video.sql"
    tags => "video"
  }

output {
  if "video" in [tags] {
    elasticsearch {
      hosts => ["${ES_HOSTS}"]
      index => "video"
      # document_id => "%{[@metadata][_id]}"
      user => "${ES_USER}"
      password => "${ES_PASSWORD}"
      cacert => '/etc/logstash/certificates/ca.crt'
      doc_as_upsert => true
    }
  }
}

mysql 官方文档中的设置

  jdbc {
    jdbc_driver_library => "<path>/mysql-connector-java-8.0.16.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://<MySQL host>:3306/es_db"
    jdbc_user => <my username>
    jdbc_password => <my password>
    jdbc_paging_enabled => true
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/5 * * * * *"
    statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
  }

关于您的输入部分,您需要确保您的tracking_column实际上是一个可以跟踪的真实字段:

tracking_column => "updatedAt"

您的输出部分应该如下所示。正确设置 document_id 很重要,并且 [@metadata][_id] 在随机值中没有值,因此重复。

elasticsearch {
  hosts => ["${ES_HOSTS}"]
  index => "video"
  document_id => "%{id}"                   <---- change this line
  user => "${ES_USER}"
  password => "${ES_PASSWORD}"
  cacert => '/etc/logstash/certificates/ca.crt'
}