Elasticsearch:带有求和和比较的棘手聚合

Elasticsearch: Tricky aggregation with sum and comparison

我正在尝试从我无法弄清楚的弹性集群中提取统计信息。

最后我想要实现的是 count 流(字段:状态)随时间(字段:时间戳)针对特定项目(字段:媒体)。

数据是来自 nginx 的日志,带有匿名 IP(字段:ip_hash)和用户代理(字段:http_user_agent)。要获得有效的 count,我需要总结传输的字节数(字段:bytes_sent)并将其与考虑到相同 IP 和用户代理的最小阈值(整数)进行比较。它只是一个有效的流/仅当该流的 XY 字节已被总计传输时才算数。

"_source": {
    "media": "my-stream.001",
    "http_user_agent": "Spotify/8.4.44 Android/29 (SM-T535)",
    "ip_hash": "fcd2653c44c1d8e33ef5d58ac5a33c2599b68f05d55270a8946166373d79a8212a49f75bcf3f71a62b9c71d3206c6343430a9ebec9c062a0b308a48838161ce8",
    "timestamp": "2022-02-05 01:32:23.941",
    "bytes_sent": 4893480,
    "status": 206
}

我遇到问题的地方是根据唯一的用户代理/IP 哈希组合总结传输的字节并将其与阈值进行比较。

感谢任何指点我如何解决这个问题。谢谢!

到目前为止我得到了这个:

GET /logdata_*/_search
    {
      "size": 0,
      "query": {
        "bool": {
          "must": [
            {
              "range": {
                "timestamp": {
                  "gte": "now-1w/d",
                  "lt": "now/d"
                }
              }
            }
          ]
       }
      },
      "aggs": {
        "status206":{
          "filter": {
            "term": {
              "status": "206"
            }
          },

                "aggs": {
                  "medias": {
                    "terms": {
                      "field": "media",
                      "size": 10
                    }, 
                    "aggs": {
                      "ips": {
                        "terms": {
                          "field": "ip_hash",
                            "size": 10
                          },
                          "aggs": {
                            "clients": {
                              "terms": {
                                "field": "http_user_agent",
                                "size": 10
                              },
                              "aggs": {
                                "transferred": {
                                  "sum": {
                                    "field": "bytes_sent"
                                  }
                                }
                              }
                    }
                  }
                }
              }
            }
          }
        }
      }
    }

这给出了这样的东西:

{
  "took" : 1563,
  "timed_out" : false,
  "_shards" : {
    "total" : 12,
    "successful" : 12,
    "skipped" : 8,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 10000,
      "relation" : "gte"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "status206" : {
      "doc_count" : 1307130,
      "medias" : {
        "doc_count_error_upper_bound" : 7612,
        "sum_other_doc_count" : 1163149,
        "buckets" : [
          {
            "key" : "20220402_ETD_Podcast_2234_Eliten_-_VD_Hanson.mp3",
            "doc_count" : 21772,
            "ips" : {
              "doc_count_error_upper_bound" : 12,
              "sum_other_doc_count" : 21574,
              "buckets" : [
                {
                  "key" : "ae55a10beda61afd3641fe2a6ca8470262d5a0c07040d3b9b8285ea1a4dba661a0502a7974dc5a4fecbfbbe5b7c81544cdcea126271533e724feb3d7750913a5",
                  "doc_count" : 38,
                  "clients" : {
                    "doc_count_error_upper_bound" : 0,
                    "sum_other_doc_count" : 0,
                    "buckets" : [
                      {
                        "key" : "Deezer/7.0.0.xxx (Android; 10; Mobile; de) samsung SM-G960F",
                        "doc_count" : 38,
                        "transferred" : {
                          "value" : 7582635.0
                        }
                      }
                    ]
                  }
                },
                {
                  "key" : "60082e96eb57c4a8b7962dc623ef7446fbc08cea676e75c4ff94ab5324dec93a6db1848d45f6dcc6e7acbcb700bb891cf6bee66e1aa98fc228107104176734ff",
                  "doc_count" : 37,
                  "clients" : {
                    "doc_count_error_upper_bound" : 0,
                    "sum_other_doc_count" : 0,
                    "buckets" : [
                      {
                        "key" : "Deezer/7.0.0.xxx (Android; 12; Mobile; de) samsung SM-N770F",
                        "doc_count" : 36,
                        "transferred" : {
                          "value" : 7252448.0
                        }
                      },
                      {
                        "key" : "Mozilla/5.0 (Linux; Android 11; RMX2063) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.58 Mobile Safari/537.36",
                        "doc_count" : 1,
                        "transferred" : {
                          "value" : 843367.0
                        }
                      }
                    ]
                  }
                },
          

现在我需要检查“transferred”是否是 gte 阈值,这将算作 1 个流。最后我需要所有适用流的计数。

您可以尝试以下方法:

> GET _search?filter_path=aggregations.valid_streams.count
{
    "size": 0,
    "query": {
        "bool": {
            "must": [
                {
                    "range": {
                        "timestamp": {
                            "gte": "now-1w/d",
                            "lt": "now/d"
                        }
                    }
                },
                {
                    "match": {
                        "status": "206"
                    }
                }
            ]
        }
    },
    "aggs": {
        "streams": {
            "multi_terms": {
                "size": "65536",
                "terms": [
                    {
                        "field": "media"
                    },
                    {
                        "field": "ip_hash"
                    },
                    {
                        "field": "http_user_agent"
                    }
                ]
            },
            "aggs": {
                "transferred": {
                    "sum": {
                        "field": "bytes_sent"
                    }
                },
                "threshold": {
                    "bucket_selector": {
                        "buckets_path": {
                            "total": "transferred"
                        },
                        "script": "params.total > 12345"
                    }
                }
            }
        },
        "valid_streams": {
            "stats_bucket": {
                "buckets_path": "streams>transferred"
            }
        }
    }
}

解释:

  1. streams - 组合术语聚合,因为其中每个更改的字段都应计为新流。这主要是为了更好的可读性,如果它不符合你的逻辑就改变它。
  2. transferred - sum 汇总发送的字节。
  3. threshold - bucket_selector 聚合过滤掉未达到 XY 阈值的流。
  4. valid_streams - stats_bucket 聚合,其中 returns 包含桶数量 = 有效流的 count 字段。顺便说一句,它还为您提供有关有效流的信息(即平均字节数)
  5. filter_path queryparam 用于将返回的响应减少为仅包含聚合输出。