删除 ElasticSearch 中的重复记录

Remove duplicated records in ElasticSearch

我在 ElasticSearch 中有数百万条记录。今天,我意识到有些记录是重复的。有什么方法可以删除这些重复的记录吗?

这是我的查询。

  {
  "query": {
        "filtered":{    
            "query" : {
                "bool": {"must":[ 
                        {"match": { "sensorId":  "14FA084408" }},
                  {"match": { "variableName":  "FORWARD_FLOW" }}
                  ]
                    }
            },  
            "filter": {
                "range": { "timestamp": { "gt" : "2015-07-04",
                                             "lt" : "2015-07-06" }}
            }
        }
    }
}

这就是我从中收到的。

{
"took": 2,
"timed_out": false,
"_shards": {
    "total": 5,
    "successful": 5,
    "failed": 0
},
"hits": {
    "total": 21,
    "max_score": 8.272615,
    "hits": [
        {
            "_index": "iotsens-summarizedmeasures",
            "_type": "summarizedmeasure",
            "_id": "AU5isxVcMpd7AZtvmZcK",
            "_score": 8.272615,
            "_source": {
                "id": null,
                "sensorId": "14FA084408",
                "variableName": "FORWARD_FLOW",
                "rawValue": "0.2",
                "value": "0.2",
                "timestamp": 1436047200000,
                "summaryTimeUnit": "DAYS"
            }
        },
        {
            "_index": "iotsens-summarizedmeasures",
            "_type": "summarizedmeasure",
            "_id": "AU5isxVnMpd7AZtvmZcL",
            "_score": 8.272615,
            "_source": {
                "id": null,
                "sensorId": "14FA084408",
                "variableName": "FORWARD_FLOW",
                "rawValue": "0.2",
                "value": "0.2",
                "timestamp": 1436047200000,
                "summaryTimeUnit": "DAYS"
            }
        },
        {
            "_index": "iotsens-summarizedmeasures",
            "_type": "summarizedmeasure",
            "_id": "AU5isxV6Mpd7AZtvmZcN",
            "_score": 8.0957,
            "_source": {
                "id": null,
                "sensorId": "14FA084408",
                "variableName": "FORWARD_FLOW",
                "rawValue": "0.2",
                "value": "0.2",
                "timestamp": 1436047200000,
                "summaryTimeUnit": "DAYS"
            }
        },
        {
            "_index": "iotsens-summarizedmeasures",
            "_type": "summarizedmeasure",
            "_id": "AU5isxWOMpd7AZtvmZcP",
            "_score": 8.0957,
            "_source": {
                "id": null,
                "sensorId": "14FA084408",
                "variableName": "FORWARD_FLOW",
                "rawValue": "0.2",
                "value": "0.2",
                "timestamp": 1436047200000,
                "summaryTimeUnit": "DAYS"
            }
        },
        {
            "_index": "iotsens-summarizedmeasures",
            "_type": "summarizedmeasure",
            "_id": "AU5isxW8Mpd7AZtvmZcT",
            "_score": 8.0957,
            "_source": {
                "id": null,
                "sensorId": "14FA084408",
                "variableName": "FORWARD_FLOW",
                "rawValue": "0.2",
                "value": "0.2",
                "timestamp": 1436047200000,
                "summaryTimeUnit": "DAYS"
            }
        },
        {
            "_index": "iotsens-summarizedmeasures",
            "_type": "summarizedmeasure",
            "_id": "AU5isxXFMpd7AZtvmZcU",
            "_score": 8.0957,
            "_source": {
                "id": null,
                "sensorId": "14FA084408",
                "variableName": "FORWARD_FLOW",
                "rawValue": "0.2",
                "value": "0.2",
                "timestamp": 1436047200000,
                "summaryTimeUnit": "DAYS"
            }
        },
        {
            "_index": "iotsens-summarizedmeasures",
            "_type": "summarizedmeasure",
            "_id": "AU5isxXbMpd7AZtvmZcW",
            "_score": 8.0957,
            "_source": {
                "id": null,
                "sensorId": "14FA084408",
                "variableName": "FORWARD_FLOW",
                "rawValue": "0.2",
                "value": "0.2",
                "timestamp": 1436047200000,
                "summaryTimeUnit": "DAYS"
            }
        },
        {
            "_index": "iotsens-summarizedmeasures",
            "_type": "summarizedmeasure",
            "_id": "AU5isxUtMpd7AZtvmZcG",
            "_score": 8.077545,
            "_source": {
                "id": null,
                "sensorId": "14FA084408",
                "variableName": "FORWARD_FLOW",
                "rawValue": "0.2",
                "value": "0.2",
                "timestamp": 1436047200000,
                "summaryTimeUnit": "DAYS"
            }
        },
        {
            "_index": "iotsens-summarizedmeasures",
            "_type": "summarizedmeasure",
            "_id": "AU5isxXPMpd7AZtvmZcV",
            "_score": 8.077545,
            "_source": {
                "id": null,
                "sensorId": "14FA084408",
                "variableName": "FORWARD_FLOW",
                "rawValue": "0.2",
                "value": "0.2",
                "timestamp": 1436047200000,
                "summaryTimeUnit": "DAYS"
            }
        },
        {
            "_index": "iotsens-summarizedmeasures",
            "_type": "summarizedmeasure",
            "_id": "AU5isxUZMpd7AZtvmZcE",
            "_score": 7.9553676,
            "_source": {
                "id": null,
                "sensorId": "14FA084408",
                "variableName": "FORWARD_FLOW",
                "rawValue": "0.2",
                "value": "0.2",
                "timestamp": 1436047200000,
                "summaryTimeUnit": "DAYS"
            }
        }
    ]
}

}

如您所见,同一天我有 21 条重复记录。如何删除重复的记录并每天只保留一条?谢谢

做一个计数(为此使用计数 API),然后使用查询大小比计数小一的查询删除。 (使用 delete by query + From/Size API 得到这个)

Count API

From/size API

Delete by query API

在这种情况下,您应该编写查询,使其仅获取重复记录。

或者只查询 ID,然后对除一个以外的所有 ID 调用批量删除。但是,我猜你不能这样做,因为你没有 Id。恕我直言,我没有看到任何其他聪明的方法来做到这一点。

这是一个随机的想法,可能不完全符合您的需要。这仍然是我第一次阅读您的问题时的感受。

我们如何使用任何 elasticsearch 客户端库重新索引整个数据。在这样做的同时,让我们为每个对象(我的意思是文档)计算一个哈希码,并将其设置为文档的 ID。所有字段都完全相同的任何文档都会重新索引到相同的 ID,因此一旦重新索引完成,就会删除重复项。

使用 aggregate queries 你可以在你的 ES 索引中找到重复的字段:

例如找到 3 个在字段 Uuid 中具有相同值的文档(并且 return 每个 Uuid 最多有 5 个重复的文档):

curl -XPOST http://localhost:9200/logstash-2017.03.17/_search -d '
 {
  "size": 0,
  "aggs": {
    "duplicateCount": {
      "terms": {
        "field": "Uuid",
        "min_doc_count": 2,
        "size": 3
      },
      "aggs": {
        "duplicateDocuments": {
          "top_hits": {
            "size": 5
          }
        }
      }
    }
  }
}'

从输出中您可以轻松过滤文档 _id 并删除它们。随着 jq:

cat es_response.json | jq -r '.aggregations.duplicateCount.buckets[].duplicateDocuments.hits.hits[]._id'

然后天真的方法是使用 DELETE 请求:

 curl -XDELETE http://localhost:9200/{index}/{document type}/{_id value}

然而,这将删除所有重复的文档,而不会在索引中留下单个唯一文档(通常,请参见下文)。此外,单独的 DELETE 查询效率极低。

我写了一个es-deduplicator tool, that leaves out one document for each group of duplicated documents and deletes rest via Bulk API.

这样可以在几分钟内删除数千个文档:

ES query took 0:01:44.922958, retrieved 10000 unique docs
Deleted 232539 duplicates, in total 1093490. Batch processed in 0:00:07.550461, running time 0:09:03.853110
ES query took 0:01:38.117346, retrieved 10000 unique docs
Deleted 219259 duplicates, in total 1312749. Batch processed in 0:00:07.351001, running time 0:10:50.322695
ES query took 0:01:40.111385, retrieved 10000 unique docs

注意:在循环中删除文档时,在每次批量请求后 refresh index 非常重要,否则下一个查询可能 return 已经删除了文档。

根据设计聚合查询是近似的,很可能很少文档会被遗漏(取决于您有多少分片和节点)。对于多个节点(典型的集群设置),最好通过唯一字段再次查询(并删除额外的副本)。