Elasticsearch:带有求和和比较的棘手聚合
Elasticsearch: Tricky aggregation with sum and comparison
我正在尝试从我无法弄清楚的弹性集群中提取统计信息。
最后我想要实现的是 count 流(字段:状态)随时间(字段:时间戳)针对特定项目(字段:媒体)。
数据是来自 nginx 的日志,带有匿名 IP(字段:ip_hash)和用户代理(字段:http_user_agent)。要获得有效的 count,我需要总结传输的字节数(字段:bytes_sent)并将其与考虑到相同 IP 和用户代理的最小阈值(整数)进行比较。它只是一个有效的流/仅当该流的 XY 字节已被总计传输时才算数。
"_source": {
"media": "my-stream.001",
"http_user_agent": "Spotify/8.4.44 Android/29 (SM-T535)",
"ip_hash": "fcd2653c44c1d8e33ef5d58ac5a33c2599b68f05d55270a8946166373d79a8212a49f75bcf3f71a62b9c71d3206c6343430a9ebec9c062a0b308a48838161ce8",
"timestamp": "2022-02-05 01:32:23.941",
"bytes_sent": 4893480,
"status": 206
}
我遇到问题的地方是根据唯一的用户代理/IP 哈希组合总结传输的字节并将其与阈值进行比较。
感谢任何指点我如何解决这个问题。谢谢!
到目前为止我得到了这个:
GET /logdata_*/_search
{
"size": 0,
"query": {
"bool": {
"must": [
{
"range": {
"timestamp": {
"gte": "now-1w/d",
"lt": "now/d"
}
}
}
]
}
},
"aggs": {
"status206":{
"filter": {
"term": {
"status": "206"
}
},
"aggs": {
"medias": {
"terms": {
"field": "media",
"size": 10
},
"aggs": {
"ips": {
"terms": {
"field": "ip_hash",
"size": 10
},
"aggs": {
"clients": {
"terms": {
"field": "http_user_agent",
"size": 10
},
"aggs": {
"transferred": {
"sum": {
"field": "bytes_sent"
}
}
}
}
}
}
}
}
}
}
}
}
这给出了这样的东西:
{
"took" : 1563,
"timed_out" : false,
"_shards" : {
"total" : 12,
"successful" : 12,
"skipped" : 8,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 10000,
"relation" : "gte"
},
"max_score" : null,
"hits" : [ ]
},
"aggregations" : {
"status206" : {
"doc_count" : 1307130,
"medias" : {
"doc_count_error_upper_bound" : 7612,
"sum_other_doc_count" : 1163149,
"buckets" : [
{
"key" : "20220402_ETD_Podcast_2234_Eliten_-_VD_Hanson.mp3",
"doc_count" : 21772,
"ips" : {
"doc_count_error_upper_bound" : 12,
"sum_other_doc_count" : 21574,
"buckets" : [
{
"key" : "ae55a10beda61afd3641fe2a6ca8470262d5a0c07040d3b9b8285ea1a4dba661a0502a7974dc5a4fecbfbbe5b7c81544cdcea126271533e724feb3d7750913a5",
"doc_count" : 38,
"clients" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "Deezer/7.0.0.xxx (Android; 10; Mobile; de) samsung SM-G960F",
"doc_count" : 38,
"transferred" : {
"value" : 7582635.0
}
}
]
}
},
{
"key" : "60082e96eb57c4a8b7962dc623ef7446fbc08cea676e75c4ff94ab5324dec93a6db1848d45f6dcc6e7acbcb700bb891cf6bee66e1aa98fc228107104176734ff",
"doc_count" : 37,
"clients" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "Deezer/7.0.0.xxx (Android; 12; Mobile; de) samsung SM-N770F",
"doc_count" : 36,
"transferred" : {
"value" : 7252448.0
}
},
{
"key" : "Mozilla/5.0 (Linux; Android 11; RMX2063) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.58 Mobile Safari/537.36",
"doc_count" : 1,
"transferred" : {
"value" : 843367.0
}
}
]
}
},
现在我需要检查“transferred”是否是 gte 阈值,这将算作 1 个流。最后我需要所有适用流的计数。
您可以尝试以下方法:
> GET _search?filter_path=aggregations.valid_streams.count
{
"size": 0,
"query": {
"bool": {
"must": [
{
"range": {
"timestamp": {
"gte": "now-1w/d",
"lt": "now/d"
}
}
},
{
"match": {
"status": "206"
}
}
]
}
},
"aggs": {
"streams": {
"multi_terms": {
"size": "65536",
"terms": [
{
"field": "media"
},
{
"field": "ip_hash"
},
{
"field": "http_user_agent"
}
]
},
"aggs": {
"transferred": {
"sum": {
"field": "bytes_sent"
}
},
"threshold": {
"bucket_selector": {
"buckets_path": {
"total": "transferred"
},
"script": "params.total > 12345"
}
}
}
},
"valid_streams": {
"stats_bucket": {
"buckets_path": "streams>transferred"
}
}
}
}
解释:
streams
- 组合术语聚合,因为其中每个更改的字段都应计为新流。这主要是为了更好的可读性,如果它不符合你的逻辑就改变它。
transferred
- sum
汇总发送的字节。
threshold
- bucket_selector
聚合过滤掉未达到 XY 阈值的流。
valid_streams
- stats_bucket
聚合,其中 returns 包含桶数量 = 有效流的 count
字段。顺便说一句,它还为您提供有关有效流的信息(即平均字节数)
filter_path
queryparam 用于将返回的响应减少为仅包含聚合输出。
我正在尝试从我无法弄清楚的弹性集群中提取统计信息。
最后我想要实现的是 count 流(字段:状态)随时间(字段:时间戳)针对特定项目(字段:媒体)。
数据是来自 nginx 的日志,带有匿名 IP(字段:ip_hash)和用户代理(字段:http_user_agent)。要获得有效的 count,我需要总结传输的字节数(字段:bytes_sent)并将其与考虑到相同 IP 和用户代理的最小阈值(整数)进行比较。它只是一个有效的流/仅当该流的 XY 字节已被总计传输时才算数。
"_source": {
"media": "my-stream.001",
"http_user_agent": "Spotify/8.4.44 Android/29 (SM-T535)",
"ip_hash": "fcd2653c44c1d8e33ef5d58ac5a33c2599b68f05d55270a8946166373d79a8212a49f75bcf3f71a62b9c71d3206c6343430a9ebec9c062a0b308a48838161ce8",
"timestamp": "2022-02-05 01:32:23.941",
"bytes_sent": 4893480,
"status": 206
}
我遇到问题的地方是根据唯一的用户代理/IP 哈希组合总结传输的字节并将其与阈值进行比较。
感谢任何指点我如何解决这个问题。谢谢!
到目前为止我得到了这个:
GET /logdata_*/_search
{
"size": 0,
"query": {
"bool": {
"must": [
{
"range": {
"timestamp": {
"gte": "now-1w/d",
"lt": "now/d"
}
}
}
]
}
},
"aggs": {
"status206":{
"filter": {
"term": {
"status": "206"
}
},
"aggs": {
"medias": {
"terms": {
"field": "media",
"size": 10
},
"aggs": {
"ips": {
"terms": {
"field": "ip_hash",
"size": 10
},
"aggs": {
"clients": {
"terms": {
"field": "http_user_agent",
"size": 10
},
"aggs": {
"transferred": {
"sum": {
"field": "bytes_sent"
}
}
}
}
}
}
}
}
}
}
}
}
这给出了这样的东西:
{
"took" : 1563,
"timed_out" : false,
"_shards" : {
"total" : 12,
"successful" : 12,
"skipped" : 8,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 10000,
"relation" : "gte"
},
"max_score" : null,
"hits" : [ ]
},
"aggregations" : {
"status206" : {
"doc_count" : 1307130,
"medias" : {
"doc_count_error_upper_bound" : 7612,
"sum_other_doc_count" : 1163149,
"buckets" : [
{
"key" : "20220402_ETD_Podcast_2234_Eliten_-_VD_Hanson.mp3",
"doc_count" : 21772,
"ips" : {
"doc_count_error_upper_bound" : 12,
"sum_other_doc_count" : 21574,
"buckets" : [
{
"key" : "ae55a10beda61afd3641fe2a6ca8470262d5a0c07040d3b9b8285ea1a4dba661a0502a7974dc5a4fecbfbbe5b7c81544cdcea126271533e724feb3d7750913a5",
"doc_count" : 38,
"clients" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "Deezer/7.0.0.xxx (Android; 10; Mobile; de) samsung SM-G960F",
"doc_count" : 38,
"transferred" : {
"value" : 7582635.0
}
}
]
}
},
{
"key" : "60082e96eb57c4a8b7962dc623ef7446fbc08cea676e75c4ff94ab5324dec93a6db1848d45f6dcc6e7acbcb700bb891cf6bee66e1aa98fc228107104176734ff",
"doc_count" : 37,
"clients" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "Deezer/7.0.0.xxx (Android; 12; Mobile; de) samsung SM-N770F",
"doc_count" : 36,
"transferred" : {
"value" : 7252448.0
}
},
{
"key" : "Mozilla/5.0 (Linux; Android 11; RMX2063) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.58 Mobile Safari/537.36",
"doc_count" : 1,
"transferred" : {
"value" : 843367.0
}
}
]
}
},
现在我需要检查“transferred”是否是 gte 阈值,这将算作 1 个流。最后我需要所有适用流的计数。
您可以尝试以下方法:
> GET _search?filter_path=aggregations.valid_streams.count
{
"size": 0,
"query": {
"bool": {
"must": [
{
"range": {
"timestamp": {
"gte": "now-1w/d",
"lt": "now/d"
}
}
},
{
"match": {
"status": "206"
}
}
]
}
},
"aggs": {
"streams": {
"multi_terms": {
"size": "65536",
"terms": [
{
"field": "media"
},
{
"field": "ip_hash"
},
{
"field": "http_user_agent"
}
]
},
"aggs": {
"transferred": {
"sum": {
"field": "bytes_sent"
}
},
"threshold": {
"bucket_selector": {
"buckets_path": {
"total": "transferred"
},
"script": "params.total > 12345"
}
}
}
},
"valid_streams": {
"stats_bucket": {
"buckets_path": "streams>transferred"
}
}
}
}
解释:
streams
- 组合术语聚合,因为其中每个更改的字段都应计为新流。这主要是为了更好的可读性,如果它不符合你的逻辑就改变它。transferred
-sum
汇总发送的字节。threshold
-bucket_selector
聚合过滤掉未达到 XY 阈值的流。valid_streams
-stats_bucket
聚合,其中 returns 包含桶数量 = 有效流的count
字段。顺便说一句,它还为您提供有关有效流的信息(即平均字节数)filter_path
queryparam 用于将返回的响应减少为仅包含聚合输出。