保持 SQL 服务器数据库和 Elasticsearch 索引同步
Keep SQL Server database and Elasticsearch index synced
我正在尝试从 SQL 服务器同步数据。我尝试使用以下 link.
这里的问题是,Logstash 配置中的语句是针对 MySQL 的。我试图将语句转换为 SQL 服务器语句。在我 运行 它之后,没有文件被索引。
我在 Logstash 配置文件中的语句如下所示:
statement => "SELECT TOP 100 PERCENT *, DATEDIFF(s, '1970-01-01 00:00:00', modification_time) AS unix_ts_in_secs FROM es_table WHERE (DATEDIFF(s, '1970-01-01 00:00:00', modification_time) > :sql_last_value AND modification_time < getutcdate()) ORDER BY modification_time ASC"
以及以下输出:
[2020-02-25T11:55:50,092][INFO ][logstash.inputs.jdbc ][main] (0.007739s) SELECT TOP (1) count(*) AS [COUNT] FROM (SELECT TOP 100 PERCENT *, DATEDIFF(s, '1970-01-01 00:00:00', modification_time) AS unix_ts_in_secs FROM es_table WHERE (DATEDIFF(s, '1970-01-01 00:00:00', modification_time) > 0 AND modification_time < getutcdate()) ORDER BY modification_time ASC) AS [T1]
[2020-02-25T11:55:55,202][INFO ][logstash.inputs.jdbc ][main] (0.001840s) SELECT CAST(SERVERPROPERTY('ProductVersion') AS varchar)
[2020-02-25T11:55:55,208][INFO ][logstash.inputs.jdbc ][main] (0.001305s) SELECT TOP (1) count(*) AS [COUNT] FROM (SELECT TOP 100 PERCENT *, DATEDIFF(s, '1970-01-01 00:00:00', modification_time) AS unix_ts_in_secs FROM es_table WHERE (DATEDIFF(s, '1970-01-01 00:00:00', modification_time) > 0 AND modification_time < getutcdate()) ORDER BY modification_time ASC) AS [T1]
[2020-02-25T11:56:00,338][INFO ][logstash.inputs.jdbc ][main] (0.002202s) SELECT CAST(SERVERPROPERTY('ProductVersion') AS varchar)
[2020-02-25T11:56:00,349][INFO ][logstash.inputs.jdbc ][main] (0.002047s) SELECT TOP (1) count(*) AS [COUNT] FROM (SELECT TOP 100 PERCENT *, DATEDIFF(s, '1970-01-01 00:00:00', modification_time) AS unix_ts_in_secs FROM es_table WHERE (DATEDIFF(s, '1970-01-01 00:00:00', modification_time) > 0 AND modification_time < getutcdate()) ORDER BY modification_time ASC) AS [T1]
因此,语句是 运行ning,但没有任何内容被编入索引。
我的 Logstash 配置如下所示:
input {
jdbc {
jdbc_driver_library => "<driver>"
jdbc_driver_class => "<class>"
jdbc_connection_string => "<connection>"
jdbc_user => <user>
jdbc_password => <pw>
jdbc_paging_enabled => true
tracking_column => "unix_ts_in_secs"
use_column_value => true
tracking_column_type => "numeric"
schedule => "*/5 * * * * *"
statement => "SELECT TOP 100 PERCENT *, DATEDIFF(s, '1970-01-01 00:00:00', modification_time) AS unix_ts_in_secs FROM es_table WHERE (DATEDIFF(s, '1970-01-01 00:00:00', modification_time) > :sql_last_value AND modification_time < getutcdate()) ORDER BY modification_time ASC"
}
}
filter {
mutate {
copy => { "id" => "[@metadata][_id]"}
remove_field => ["id", "@version", "unix_ts_in_secs"]
}
}
output {
stdout { codec => "rubydebug"}
elasticsearch {
index => "rdbms_sync_idx"
document_id => "%{[@metadata][_id]}"
}
}
可能是您遗漏了“last_run_metadata_path”
跟踪器文件用于捕获 "state"。
官方文档中的例子是,
input {
jdbc {
statement => "SELECT * FROM mgd.seq_sequence WHERE _sequence_key > ? AND _sequence_key < ? + ? ORDER BY _sequence_key ASC"
prepared_statement_bind_values => [":sql_last_value", ":sql_last_value", 4]
prepared_statement_name => "foobar"
use_prepared_statements => true
use_column_value => true
tracking_column_type => "numeric"
tracking_column => "_sequence_key"
last_run_metadata_path => "/elastic/tmp/testing/confs/test-jdbc-int-sql_last_value.yml"
# ... other configuration bits
}
}
我正在尝试从 SQL 服务器同步数据。我尝试使用以下 link.
这里的问题是,Logstash 配置中的语句是针对 MySQL 的。我试图将语句转换为 SQL 服务器语句。在我 运行 它之后,没有文件被索引。 我在 Logstash 配置文件中的语句如下所示:
statement => "SELECT TOP 100 PERCENT *, DATEDIFF(s, '1970-01-01 00:00:00', modification_time) AS unix_ts_in_secs FROM es_table WHERE (DATEDIFF(s, '1970-01-01 00:00:00', modification_time) > :sql_last_value AND modification_time < getutcdate()) ORDER BY modification_time ASC"
以及以下输出:
[2020-02-25T11:55:50,092][INFO ][logstash.inputs.jdbc ][main] (0.007739s) SELECT TOP (1) count(*) AS [COUNT] FROM (SELECT TOP 100 PERCENT *, DATEDIFF(s, '1970-01-01 00:00:00', modification_time) AS unix_ts_in_secs FROM es_table WHERE (DATEDIFF(s, '1970-01-01 00:00:00', modification_time) > 0 AND modification_time < getutcdate()) ORDER BY modification_time ASC) AS [T1]
[2020-02-25T11:55:55,202][INFO ][logstash.inputs.jdbc ][main] (0.001840s) SELECT CAST(SERVERPROPERTY('ProductVersion') AS varchar)
[2020-02-25T11:55:55,208][INFO ][logstash.inputs.jdbc ][main] (0.001305s) SELECT TOP (1) count(*) AS [COUNT] FROM (SELECT TOP 100 PERCENT *, DATEDIFF(s, '1970-01-01 00:00:00', modification_time) AS unix_ts_in_secs FROM es_table WHERE (DATEDIFF(s, '1970-01-01 00:00:00', modification_time) > 0 AND modification_time < getutcdate()) ORDER BY modification_time ASC) AS [T1]
[2020-02-25T11:56:00,338][INFO ][logstash.inputs.jdbc ][main] (0.002202s) SELECT CAST(SERVERPROPERTY('ProductVersion') AS varchar)
[2020-02-25T11:56:00,349][INFO ][logstash.inputs.jdbc ][main] (0.002047s) SELECT TOP (1) count(*) AS [COUNT] FROM (SELECT TOP 100 PERCENT *, DATEDIFF(s, '1970-01-01 00:00:00', modification_time) AS unix_ts_in_secs FROM es_table WHERE (DATEDIFF(s, '1970-01-01 00:00:00', modification_time) > 0 AND modification_time < getutcdate()) ORDER BY modification_time ASC) AS [T1]
因此,语句是 运行ning,但没有任何内容被编入索引。
我的 Logstash 配置如下所示:
input {
jdbc {
jdbc_driver_library => "<driver>"
jdbc_driver_class => "<class>"
jdbc_connection_string => "<connection>"
jdbc_user => <user>
jdbc_password => <pw>
jdbc_paging_enabled => true
tracking_column => "unix_ts_in_secs"
use_column_value => true
tracking_column_type => "numeric"
schedule => "*/5 * * * * *"
statement => "SELECT TOP 100 PERCENT *, DATEDIFF(s, '1970-01-01 00:00:00', modification_time) AS unix_ts_in_secs FROM es_table WHERE (DATEDIFF(s, '1970-01-01 00:00:00', modification_time) > :sql_last_value AND modification_time < getutcdate()) ORDER BY modification_time ASC"
}
}
filter {
mutate {
copy => { "id" => "[@metadata][_id]"}
remove_field => ["id", "@version", "unix_ts_in_secs"]
}
}
output {
stdout { codec => "rubydebug"}
elasticsearch {
index => "rdbms_sync_idx"
document_id => "%{[@metadata][_id]}"
}
}
可能是您遗漏了“last_run_metadata_path”
跟踪器文件用于捕获 "state"。
官方文档中的例子是,
input {
jdbc {
statement => "SELECT * FROM mgd.seq_sequence WHERE _sequence_key > ? AND _sequence_key < ? + ? ORDER BY _sequence_key ASC"
prepared_statement_bind_values => [":sql_last_value", ":sql_last_value", 4]
prepared_statement_name => "foobar"
use_prepared_statements => true
use_column_value => true
tracking_column_type => "numeric"
tracking_column => "_sequence_key"
last_run_metadata_path => "/elastic/tmp/testing/confs/test-jdbc-int-sql_last_value.yml"
# ... other configuration bits
}
}