Logstash:同一管道的多个 tracking_columns

Logstash: multiple tracking_columns for the same pipeline

我有一个 Logstash 管道从连接到 tables A 和 B 的 MS SQL 视图获取数据,并将非规范化数据放入 ES。

最初,INSERTS 或 UPDATES 只能发生在 table A 上。因此,要配置 Logstash 以仅获取自上次轮询循环迭代以来新插入或更新的记录,我定义了tracking_column 字段引用 table 中的 updatedDate 时间戳列 A:

jdbc {
  #Program Search
  jdbc_connection_string => "jdbc:sqlserver://__DB_LISTNER__"
  jdbc_user => “admin”
  jdbc_password => “admin”
  jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
  jdbc_driver_library => "/usr/share/logstash/drivers/mssql-jdbc-6.2.2.jre8.jar"
  sql_log_level => "info"
  tracking_column => "updated_date_timestamp"
  use_column_value => true
  tracking_column_type => "timestamp"
  schedule => "*/1 * * * *"
  statement => "select *, updateDate as updated_date_timestamp from dbo.MyView where (updateDate > :sql_last_value and updateDate < getdate()) order by updateDate ASC"
  last_run_metadata_path => "/usr/share/logstash/myfolder/.logstash_jdbc_last_run"
}

现在,更新也可以发生在 table B 中。有了这个新要求,我很困惑如何配置 Logstash 来跟踪 table B 上的更改以及同一条管道。我可以为同一管道定义多个 tracking_columns 吗?

我想到但不确定的另外两个选项是:

  1. 从 table A 和 B 的 updateDate 字段生成一个复合值,该值将由 tracking_column 引用。但是我不确定 SQL 查询应该是什么样子?
  2. 创建另一个管道,仅跟踪 table B 的更改。不过,我看到这种方法的缺点是现有管道和新管道将在初始迭代中进行重复工作,以便处理数据库视图中的所有记录。

请告诉我我应该怎么走?

我找到了这个 ES discussion that suggests to use a function to select greatest value of provided dates in the SQL query. For the SQL server there is GREATEST function, but it is not recognised by SQL server I am currently using. Long story short, as a workaround I found iff() 函数,我用它来比较日期。所以我的 SQL 查询如下所示:

select *, iif(A.updatedDate>B.updatedDate, A.updatedDate, B.updatedDate) as updated_date_timestamp from dbo.MyView where (iif(A.updatedDate>B.updatedDate, A.updatedDate, B.updatedDate) > :sql_last_value and iif(A.updatedDate>B.updatedDate, A.updatedDate, B.updatedDate) < getdate()) order by iif(A.updatedDate>B.updatedDate, A.updatedDate, B.updatedDate) ASC, id ASC