如何使用 Elasticsearch 按 @timestamp 对分页日志进行排序?

How to sort paginated logs by @timestamp with Elasticsearch?

我的目标是根据我从 Elasticsearch 收到的时间戳对数百万条日志进行排序。

示例日志:

{"realIp": "192.168.0.2", "@timestamp": "2020-12-06T02:00:09.000Z"}
{"realIp": "192.168.0.2", "@timestamp": "2020-12-06T02:01:09.000Z"}
{"realIp": "192.168.0.2", "@timestamp": "2020-12-06T02:02:09.000Z"}
{"realIp": "192.168.0.2", "@timestamp": "2020-12-06T02:04:09.000Z"}

不幸的是,我无法从 Elastic 中整理出所有日志。看来得自己动手了

我尝试将数据从 elastic 中整理出来的方法:

es = Search(index="somelogs-*").using(client).params(preserve_order=True)
for hit in es.scan():
    print(hit['@timestamp'])

另一种方法:

notifications = (es
    .query("range", **{
        "@timestamp": {
            'gte': 'now-48h',
            'lt' : 'now'
        }
    })
    .sort("@timestamp")
    .scan()
)

所以我正在寻找一种方法来自己或直接通过Elasticsearch对这些日志进行排序。目前,我正在将所有数据保存在本地 'logs.json' 中,在我看来我必须自己迭代并对其进行排序。

你绝对应该让 Elasticsearch 进行排序,然后 return 给你的数据已经排序了。

问题是您正在使用 .scan(). It uses Elasticsearch's scan/scroll API, which unfortunately only applies the sorting params on each page/slice, not the entire search result. This is noted in the elasticsearch-dsl docs on Pagination:

Pagination

...
If you want to access all the documents matched by your query you can use the scan method which uses the scan/scroll elasticsearch API:

for hit in s.scan():
    print(hit.title)

Note that in this case the results won’t be sorted.

(强调我的)

使用分页绝对是一种选择,尤其是当您有“数百万条日志”时,如您所说。有一个search_after pagination API:

Search after

You can use the search_after parameter to retrieve the next page of hits using a set of sort values from the previous page.
...
To get the first page of results, submit a search request with a sort argument.
...
The search response includes an array of sort values for each hit.
...
To get the next page of results, rerun the previous search using the last hit’s sort values as the search_after argument. ... The search’s query and sort arguments must remain unchanged. If provided, the from argument must be 0 (default) or -1.
...
You can repeat this process to get additional pages of results.

(省略了原始 JSON 请求,因为我将在下面的 Python 中展示示例)

下面是如何使用 elasticsearch-dsl for Python 进行操作的示例。请注意,我限制了 fields 和结果的数量,以便于测试。这里的重要部分是 sortextra(search_after=).

search = Search(using=client, index='some-index')

# The main query
search = search.extra(size=100)
search = search.query('range', **{'@timestamp': {'gte': '2020-12-29T09:00', 'lt': '2020-12-29T09:59'}})
search = search.source(fields=('@timestamp', ))
search = search.sort({
    '@timestamp': {
        'order': 'desc'
    },
})

# Store all the results (it would be better to be wrap all this in a generator to be performant)
hits = []

# Get the 1st page
results = search.execute()
hits.extend(results.hits)
total = results.hits.total
print(f'Expecting {total}')

# Get the next pages
# Real use-case condition should be "until total" or "until no more results.hits"
while len(hits) < 1000:  
    print(f'Now have {len(hits)}')
    last_hit_sort_id = hits[-1].meta.sort[0]
    search = search.extra(search_after=[last_hit_sort_id])
    results = search.execute()
    hits.extend(results.hits)

with open('results.txt', 'w') as out:
    for hit in hits:
        out.write(f'{hit["@timestamp"]}\n')

这将导致已排序数据:

# 1st 10 lines
2020-12-29T09:58:57.749Z
2020-12-29T09:58:55.736Z
2020-12-29T09:58:53.627Z
2020-12-29T09:58:52.738Z
2020-12-29T09:58:47.221Z
2020-12-29T09:58:45.676Z
2020-12-29T09:58:44.523Z
2020-12-29T09:58:43.541Z
2020-12-29T09:58:40.116Z
2020-12-29T09:58:38.206Z
...
# 250-260
2020-12-29T09:50:31.117Z
2020-12-29T09:50:27.754Z
2020-12-29T09:50:25.738Z
2020-12-29T09:50:23.601Z
2020-12-29T09:50:17.736Z
2020-12-29T09:50:15.753Z
2020-12-29T09:50:14.491Z
2020-12-29T09:50:13.555Z
2020-12-29T09:50:07.721Z
2020-12-29T09:50:05.744Z
2020-12-29T09:50:03.630Z 
...
# 675-685
2020-12-29T09:43:30.609Z
2020-12-29T09:43:30.608Z
2020-12-29T09:43:30.602Z
2020-12-29T09:43:30.570Z
2020-12-29T09:43:30.568Z
2020-12-29T09:43:30.529Z
2020-12-29T09:43:30.475Z
2020-12-29T09:43:30.474Z
2020-12-29T09:43:30.468Z
2020-12-29T09:43:30.418Z
2020-12-29T09:43:30.417Z
...
# 840-850
2020-12-29T09:43:27.953Z
2020-12-29T09:43:27.929Z
2020-12-29T09:43:27.927Z
2020-12-29T09:43:27.920Z
2020-12-29T09:43:27.897Z
2020-12-29T09:43:27.895Z
2020-12-29T09:43:27.886Z
2020-12-29T09:43:27.861Z
2020-12-29T09:43:27.860Z
2020-12-29T09:43:27.853Z
2020-12-29T09:43:27.828Z
...
# Last 3
2020-12-29T09:43:25.878Z
2020-12-29T09:43:25.876Z
2020-12-29T09:43:25.869Z 

API 文档中讨论了使用 search_after 的一些注意事项:

  • 使用 Point In Time or PIT parameter
    • If a refresh occurs between these requests, the order of your results may change, causing inconsistent results across pages. To prevent this, you can create a point in time (PIT) to preserve the current index state over your searches.

    • 您需要先发出 POST 请求以获取 PIT ID
    • 然后在每个请求中添加一个extra 'pit': {'id':xxxx, 'keep_alive':5m}参数
    • 确保使用上次回复中的 PIT ID
  • 使用决胜局
    • We recommend you include a tiebreaker field in your sort. This tiebreaker field should contain a unique value for each document. If you don’t include a tiebreaker field, your paged results could miss or duplicate hits.

    • 这取决于您的文档架构
      # Add some ID as a tiebreaker to the `sort` call
      search = search.sort(
          {'@timestamp': {
              'order': 'desc'
          }},
          {'some.id': {
              'order': 'desc'
          }}
      )
      
      # Include both the sort ID and the some.ID in `search_after`
      last_hit_sort_id, last_hit_route_id = hits[-1].meta.sort
      search = search.extra(search_after=[last_hit_sort_id, last_hit_route_id])
      

谢谢 Gino Mempin。有效!

但我也发现,一个简单的改变就可以完成同样的工作。

通过添加.params(preserve_order=True) elasticsearch 将对所有数据进行排序。

es = Search(index="somelog-*").using(client)
notifications = (es
    .query("range", **{
        "@timestamp": {
            'gte': 'now-48h',
            'lt' : 'now'
        }
    })
    .sort("@timestamp")
    .params(preserve_order=True)
    .scan()
)