Elasticsearch 基于公共字段合并多个索引
Elasticsearch merge multiple indexes based on common field
我正在使用 ELK 从来自两个不同数据库的数据中生成视图。一个是 mysql 另一个是 PostgreSQL。无法在这两个数据库实例之间编写连接查询。但是我有一个共同的领域叫“nic”。以下是每个索引中的文档。
MySQL
索引:user_detail
"_id": "871123365V",
"_source": {
"type": "db-poc-user",
"fname": "Iraj",
"@version": "1",
"field_lname": "Sanjeewa",
"nic": "871456365V",
"@timestamp": "2020-07-22T04:12:00.376Z",
"id": 2,
"lname": "Santhosh"
}
PostgreSQL
索引:track_details
"_id": "871456365V",
"_source": {
"@version": "1",
"nic": "871456365V",
"@timestamp": "2020-07-22T04:12:00.213Z",
"track": "ELK",
"type": "db-poc-ceg"
},
我想使用公共字段“nic”将两个索引合并为一个索引。并创建新索引。所以我可以在 Kibana 上创建可视化。如何实现?
Please note that each document in new index should have
"nic,fname,lname,track" as fields. Not the aggregation.
我会利用 enrich processor 来实现这一目标。
首先,您需要创建一个丰富策略(使用最小的索引,假设它是 user_detail
):
PUT /_enrich/policy/user-policy
{
"match": {
"indices": "user_detail",
"match_field": "nic",
"enrich_fields": ["fname", "lname"]
}
}
然后您可以执行该策略以创建浓缩索引
POST /_enrich/policy/user-policy/_execute
下一步需要您创建一个使用上述丰富内容的摄取管道 policy/index:
PUT /_ingest/pipeline/user_lookup
{
"description" : "Enriching user details with tracks",
"processors" : [
{
"enrich" : {
"policy_name": "user-policy",
"field" : "nic",
"target_field": "tmp",
"max_matches": "1"
}
},
{
"script": {
"if": "ctx.tmp != null",
"source": "ctx.putAll(ctx.tmp); ctx.remove('tmp');"
}
},
{
"remove": {
"field": ["@version", "@timestamp", "type"]
}
}
]
}
最后,您现在已准备好使用连接的数据创建目标索引。只需将 _reindex
API 与我们刚刚创建的摄取管道结合使用即可:
POST _reindex
{
"source": {
"index": "track_details"
},
"dest": {
"index": "user_tracks",
"pipeline": "user_lookup"
}
}
在 运行 之后,user_tracks
索引将包含您所需要的内容,例如:
{
"_index" : "user_tracks",
"_type" : "_doc",
"_id" : "0uA8dXMBU9tMsBeoajlw",
"_score" : 1.0,
"_source" : {
"fname" : "Iraj",
"nic" : "871456365V",
"lname" : "Santhosh",
"track" : "ELK"
}
}
如果您的源索引发生变化(新用户、更改名称等),您需要重新运行上述步骤,但在此之前,您需要删除摄取管道和摄取策略(按此顺序):
DELETE /_ingest/pipeline/user_lookup
DELETE /_enrich/policy/user-policy
之后您可以随意重新运行以上步骤
PS:请注意,我作弊了一点,因为 user_detail
中的记录在您的示例中没有相同的 nic
,但我猜它是一个 copy/paste问题。
我正在使用 ELK 从来自两个不同数据库的数据中生成视图。一个是 mysql 另一个是 PostgreSQL。无法在这两个数据库实例之间编写连接查询。但是我有一个共同的领域叫“nic”。以下是每个索引中的文档。
MySQL
索引:user_detail
"_id": "871123365V",
"_source": {
"type": "db-poc-user",
"fname": "Iraj",
"@version": "1",
"field_lname": "Sanjeewa",
"nic": "871456365V",
"@timestamp": "2020-07-22T04:12:00.376Z",
"id": 2,
"lname": "Santhosh"
}
PostgreSQL
索引:track_details
"_id": "871456365V",
"_source": {
"@version": "1",
"nic": "871456365V",
"@timestamp": "2020-07-22T04:12:00.213Z",
"track": "ELK",
"type": "db-poc-ceg"
},
我想使用公共字段“nic”将两个索引合并为一个索引。并创建新索引。所以我可以在 Kibana 上创建可视化。如何实现?
Please note that each document in new index should have "nic,fname,lname,track" as fields. Not the aggregation.
我会利用 enrich processor 来实现这一目标。
首先,您需要创建一个丰富策略(使用最小的索引,假设它是 user_detail
):
PUT /_enrich/policy/user-policy
{
"match": {
"indices": "user_detail",
"match_field": "nic",
"enrich_fields": ["fname", "lname"]
}
}
然后您可以执行该策略以创建浓缩索引
POST /_enrich/policy/user-policy/_execute
下一步需要您创建一个使用上述丰富内容的摄取管道 policy/index:
PUT /_ingest/pipeline/user_lookup
{
"description" : "Enriching user details with tracks",
"processors" : [
{
"enrich" : {
"policy_name": "user-policy",
"field" : "nic",
"target_field": "tmp",
"max_matches": "1"
}
},
{
"script": {
"if": "ctx.tmp != null",
"source": "ctx.putAll(ctx.tmp); ctx.remove('tmp');"
}
},
{
"remove": {
"field": ["@version", "@timestamp", "type"]
}
}
]
}
最后,您现在已准备好使用连接的数据创建目标索引。只需将 _reindex
API 与我们刚刚创建的摄取管道结合使用即可:
POST _reindex
{
"source": {
"index": "track_details"
},
"dest": {
"index": "user_tracks",
"pipeline": "user_lookup"
}
}
在 运行 之后,user_tracks
索引将包含您所需要的内容,例如:
{
"_index" : "user_tracks",
"_type" : "_doc",
"_id" : "0uA8dXMBU9tMsBeoajlw",
"_score" : 1.0,
"_source" : {
"fname" : "Iraj",
"nic" : "871456365V",
"lname" : "Santhosh",
"track" : "ELK"
}
}
如果您的源索引发生变化(新用户、更改名称等),您需要重新运行上述步骤,但在此之前,您需要删除摄取管道和摄取策略(按此顺序):
DELETE /_ingest/pipeline/user_lookup
DELETE /_enrich/policy/user-policy
之后您可以随意重新运行以上步骤
PS:请注意,我作弊了一点,因为 user_detail
中的记录在您的示例中没有相同的 nic
,但我猜它是一个 copy/paste问题。