弹性搜索仅索引数据库中最近添加的记录并忽略以前添加的记录
elastic search is indexing only recently added record in db and ignoring previously added records
我已经在我的 windows 机器上安装了 logstash 和 elasticSearch,配置如下(根据配置以 1 分钟的间隔轮询来自客户 table 的记录)
#1 Logstash 配置文件加载客户 table 数据并为其编制索引
logstash-config.conf
input {
jdbc {
jdbc_connection_string => "jdbc:postgresql://localhost:5432/postgres"
jdbc_driver_library => "C:\org\postgresql\postgresql\42.2.11\postgresql-42.2.11.jar"
jdbc_user => "test"
jdbc_password => "test"
jdbc_driver_class => "org.postgresql.Driver"
schedule => "* * * * *"
statement => "SELECT * FROM public.customer where id >:sql_last_value"
tracking_column_type => "numeric"
use_column_value =>true
tracking_column => id
}
}
output {
elasticsearch {
index => "customer_index"
document_type => "customer"
document_id => "%{id}"
hosts =>"localhost:9200"
}
stdout {
codec =>rubydebug
}
}
#2 在带有一些记录的数据库中创建 table
create table customer (id integer,name varchar);
select * from customer;
insert into customer values (1,'test1');
insert into customer values (2,'test2');
#3 启动logstash
e:\Software\logstash-7.8.0\bin>logstash -f logstash-config.conf
http://localhost:9600/
#4 启动弹性搜索
e:\Software\elasticsearch-7.8.0\bin>elasticsearch.bat
http://localhost:9200/_cat/indices
#4 命中获得API
4.1:没有return任何记录
http://localhost:9200/customer_index/_search?q=1
e.g.
"took": 86,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 0,
"relation": "eq"
},
"max_score": null,
"hits": []
}
}
4.2 Returns record
http://localhost:9200/customer_index/_search?q=2
e.g.
{
"took": 371,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 1,
"relation": "eq"
},
"max_score": 1.0,
"hits": [
{
"_index": "customer_index",
"_type": "customer",
"_id": "%{customer_id}",
"_score": 1.0,
"_source": {
"name": "test2",
"id": 17,
"@timestamp": "2020-07-06T06:41:00.343Z",
"@version": "1"
}
}
]
}
}
**Also what is customer_id here and how can I get the whole record as I indexed whole row i.e. select * from customer (Which means I should get all columns)**
4.3
Looks like index contains only the record which was added to index last
e.g. if I will execute in db
insert into customer values (2,'test2');
http://localhost:9200/customer_index/_search?q=2
will not return record
4.4 however Returns record
http://localhost:9200/customer_index/_search?q=3
jdbc {
jdbc_connection_string => "jdbc:postgresql://localhost:5432/postgres"
jdbc_driver_library => "C:\org\postgresql\postgresql\42.2.11\postgresql-42.2.11.jar"
jdbc_user => "test"
jdbc_password => "test"
jdbc_driver_class => "org.postgresql.Driver"
schedule => "* * * * *"
statement => "SELECT * FROM public.customer where id >:sql_last_value"
tracking_column_type => "numeric"
use_column_value =>true
tracking_column => id
**last_run_metadata_path => "C:\logstash\.sch_id_tracker_file"**
}
}
Here last_run_metadata_path captures the value of a given field when it ran last time so that As per schedule when it runs next time records with value
i.e. id >:sql_last_value would be considered to process and push to elasticsearch
我已经在我的 windows 机器上安装了 logstash 和 elasticSearch,配置如下(根据配置以 1 分钟的间隔轮询来自客户 table 的记录)
#1 Logstash 配置文件加载客户 table 数据并为其编制索引 logstash-config.conf
input {
jdbc {
jdbc_connection_string => "jdbc:postgresql://localhost:5432/postgres"
jdbc_driver_library => "C:\org\postgresql\postgresql\42.2.11\postgresql-42.2.11.jar"
jdbc_user => "test"
jdbc_password => "test"
jdbc_driver_class => "org.postgresql.Driver"
schedule => "* * * * *"
statement => "SELECT * FROM public.customer where id >:sql_last_value"
tracking_column_type => "numeric"
use_column_value =>true
tracking_column => id
}
}
output {
elasticsearch {
index => "customer_index"
document_type => "customer"
document_id => "%{id}"
hosts =>"localhost:9200"
}
stdout {
codec =>rubydebug
}
}
#2 在带有一些记录的数据库中创建 table
create table customer (id integer,name varchar);
select * from customer;
insert into customer values (1,'test1');
insert into customer values (2,'test2');
#3 启动logstash e:\Software\logstash-7.8.0\bin>logstash -f logstash-config.conf http://localhost:9600/
#4 启动弹性搜索 e:\Software\elasticsearch-7.8.0\bin>elasticsearch.bat http://localhost:9200/_cat/indices
#4 命中获得API 4.1:没有return任何记录 http://localhost:9200/customer_index/_search?q=1
e.g.
"took": 86,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 0,
"relation": "eq"
},
"max_score": null,
"hits": []
}
}
4.2 Returns record
http://localhost:9200/customer_index/_search?q=2
e.g.
{
"took": 371,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 1,
"relation": "eq"
},
"max_score": 1.0,
"hits": [
{
"_index": "customer_index",
"_type": "customer",
"_id": "%{customer_id}",
"_score": 1.0,
"_source": {
"name": "test2",
"id": 17,
"@timestamp": "2020-07-06T06:41:00.343Z",
"@version": "1"
}
}
]
}
}
**Also what is customer_id here and how can I get the whole record as I indexed whole row i.e. select * from customer (Which means I should get all columns)**
4.3
Looks like index contains only the record which was added to index last
e.g. if I will execute in db
insert into customer values (2,'test2');
http://localhost:9200/customer_index/_search?q=2
will not return record
4.4 however Returns record
http://localhost:9200/customer_index/_search?q=3
jdbc {
jdbc_connection_string => "jdbc:postgresql://localhost:5432/postgres"
jdbc_driver_library => "C:\org\postgresql\postgresql\42.2.11\postgresql-42.2.11.jar"
jdbc_user => "test"
jdbc_password => "test"
jdbc_driver_class => "org.postgresql.Driver"
schedule => "* * * * *"
statement => "SELECT * FROM public.customer where id >:sql_last_value"
tracking_column_type => "numeric"
use_column_value =>true
tracking_column => id
**last_run_metadata_path => "C:\logstash\.sch_id_tracker_file"**
}
}
Here last_run_metadata_path captures the value of a given field when it ran last time so that As per schedule when it runs next time records with value
i.e. id >:sql_last_value would be considered to process and push to elasticsearch