Elasticsearch 基于公共字段合并多个索引

Elasticsearch merge multiple indexes based on common field

我正在使用 ELK 从来自两个不同数据库的数据中生成视图。一个是 mysql 另一个是 PostgreSQL。无法在这两个数据库实例之间编写连接查询。但是我有一个共同的领域叫“nic”。以下是每个索引中的文档。

MySQL

索引:user_detail

"_id": "871123365V",
"_source": {
    "type": "db-poc-user",
    "fname": "Iraj",
    "@version": "1",
    "field_lname": "Sanjeewa",
    "nic": "871456365V",
    "@timestamp": "2020-07-22T04:12:00.376Z",
    "id": 2,
    "lname": "Santhosh"
  }

PostgreSQL

索引:track_details

"_id": "871456365V",
"_source": {
   "@version": "1",
   "nic": "871456365V",
   "@timestamp": "2020-07-22T04:12:00.213Z",
   "track": "ELK",
   "type": "db-poc-ceg"
},

我想使用公共字段“nic”将两个索引合并为一个索引。并创建新索引。所以我可以在 Kibana 上创建可视化。如何实现?

Please note that each document in new index should have "nic,fname,lname,track" as fields. Not the aggregation.

我会利用 enrich processor 来实现这一目标。

首先,您需要创建一个丰富策略(使用最小的索引,假设它是 user_detail):

PUT /_enrich/policy/user-policy
{
  "match": {
    "indices": "user_detail",
    "match_field": "nic",
    "enrich_fields": ["fname", "lname"]
  }
}

然后您可以执行该策略以创建浓缩索引

POST /_enrich/policy/user-policy/_execute

下一步需要您创建一个使用上述丰富内容的摄取管道 policy/index:

PUT /_ingest/pipeline/user_lookup
{
  "description" : "Enriching user details with tracks",
  "processors" : [
    {
      "enrich" : {
        "policy_name": "user-policy",
        "field" : "nic",
        "target_field": "tmp",
        "max_matches": "1"
      }
    },
    {
      "script": {
        "if": "ctx.tmp != null",
        "source": "ctx.putAll(ctx.tmp); ctx.remove('tmp');"
      }
    },
    {
      "remove": {
        "field": ["@version", "@timestamp", "type"]
      }
    }
  ]
}

最后,您现在已准备好使用连接的数据创建目标索引。只需将 _reindex API 与我们刚刚创建的摄取管道结合使用即可:

POST _reindex
{
  "source": {
    "index": "track_details"
  },
  "dest": {
    "index": "user_tracks",
    "pipeline": "user_lookup"
  }
}

在 运行 之后,user_tracks 索引将包含您所需要的内容,例如:

  {
    "_index" : "user_tracks",
    "_type" : "_doc",
    "_id" : "0uA8dXMBU9tMsBeoajlw",
    "_score" : 1.0,
    "_source" : {
      "fname" : "Iraj",
      "nic" : "871456365V",
      "lname" : "Santhosh",
      "track" : "ELK"
    }
  }

如果您的源索引发生变化(新用户、更改名称等),您需要重新运行上述步骤,但在此之前,您需要删除摄取管道和摄取策略(按此顺序):

DELETE /_ingest/pipeline/user_lookup
DELETE /_enrich/policy/user-policy

之后您可以随意重新运行以上步骤

PS:请注意,我作弊了一点,因为 user_detail 中的记录在您的示例中没有相同的 nic,但我猜它是一个 copy/paste问题。