Logstash :sql_last_value 显示错误的垃圾日期(显示 6 个月前的日期为最后 运行 时间)

Logstash :sql_last_value is showing wrong junk date (Showing 6 month's old date as last run time)

我观察到一个非常奇怪的问题 我正在使用 logstash + jdbc 将数据从 Oracle 数据库加载到 Elasticsearch 下面是我的配置文件的样子

input{
  jdbc{
    clean_run => "false"
    jdbc_driver_library => "<path_to_ojdbc8-12.1.0.jar>"
    jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
    jdbc_connection_string => "<connection_string>"
    jdbc_user => "<usename>"
    jdbc_password_filepath => ".\pwd.txt"
    statement=> "SELECT * FROM customers WHERE CUSTOMER_NAME  LIKE 'PE%' AND UPD_DATE  > :sql_last_value "
    schedule=>"*/1 * * * * "
    use_column_value => true
    tracking_column_type => "timestamp"
    
    tracking_column => "upd_date"
    
    last_run_metadata_path =>"<path to logstash_metadata>"
    record_last_run => true
    }
}

filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["@version","@timestamp"]
  }
}
output {
      elasticsearch{
    hosts => ["<host>"]
    index => "<index_name>"
    document_id=>"%{[@metadata][_id]}"
    user => "<user>"
    password => "<pwd>"
}

   stdout{
      codec => dots
   }
}

现在,今天是 2021 年 3 月 8 日,我每分钟都会触发此文件。 当我加载 first-time 时,一切都很好 -:sql_last_value 是 '1970-01-01 00:00:00.000000 +00: 00'

但是在第一次加载之后,理想情况下 logstash_metadata 应该显示“2021-03-08 ”但奇怪的是它正在更新为 --- 2020 -09-11 01:05:09.000000000 Z in logstash_metadata (:sql_last_value)

如您所见,差异接近 180 天

我尝试了很多次,但它仍然以相同的方式更新,因此我的增量加载被搞砸了

我的logstash版本是7.10.2

非常感谢您的帮助!

注意:我没有使用分页,因为结果集中的结果数量对于我的查询来说总是非常少

记录的日期是最后处理行的日期。

看到您的查询,您对从数据库读取的记录没有特定的顺序。 Logstash jdbc 输入插件将您的查询包含在按 [1] 排序行的查询中,1 是它排序的列的序号。

因此,要以正确的顺序处理记录并获得最新的 upd_date 值,您需要 upd_date 成为 select 语句中的第一列。

input{
  jdbc{
    clean_run => "false"
    jdbc_driver_library => "<path_to_ojdbc8-12.1.0.jar>"
    jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
    jdbc_connection_string => "<connection_string>"
    jdbc_user => "<usename>"
    jdbc_password_filepath => ".\pwd.txt"
    statement=> "SELECT c.UPD_DATE, c.CUSTOMER_NAME, c.<Other field> 
                 FROM customers c 
                 WHERE c.CUSTOMER_NAME LIKE 'PE%' AND c.UPD_DATE > :sql_last_value 
                 ORDER BY c.UPD_DATE ASC"
    schedule=>"*/1 * * * * "
    use_column_value => true
    tracking_column_type => "timestamp"
    tracking_column => "upd_date"        
    last_run_metadata_path =>"<path to logstash_metadata>"
    record_last_run => true
    }
}

另请注意,即使您设置了 jdbc_page_size,此方法也会在第一次 logstash 运行 时耗尽 table。如果你想要这个,那很好。

但是如果你想让logstash每分钟运行一批X行并停止直到下一次执行,那么你必须结合使用jdbc_page_size和带限制的查询来让logstash检索以正确的顺序准确记录您想要的记录数量。在 SQL 服务器中它是这样工作的:

input{
  jdbc{
    jdbc_driver_library => ...
    jdbc_driver_class => ...
    jdbc_connection_string => ...
    jdbc_user => ...
    jdbc_password_filepath => ...
    statement=> "SELECT TOP 10000 c.UPD_DATE, c.CUSTOMER_NAME
                 FROM customers c 
                 WHERE c.CUSTOMER_NAME  LIKE 'PE%' AND c.UPD_DATE  > :sql_last_value 
                 ORDER BY c.UPD_DATE ASC"
    schedule=>"*/1 * * * * "
    use_column_value => true
    tracking_column_type => "timestamp"
    tracking_column => "upd_date"
    jdbc_page_size => 10000
    last_run_metadata_path =>"<path to logstash_metadata>"
    record_last_run => true
    }
}

对于 Oracle DB,您必须根据版本更改查询,或者使用 仅获取第一行 x 行;使用 Oracle 12,或旧版本的 ROWNUM。

无论如何,我建议您查看日志以检查查询 logstash 运行s。