ElasticSearch:日期时间格式的平均聚合
ElasticSearch: Avg aggregation for datetime format
我对使用 python
的弹性搜索查询感到困惑
我有这样的数据:
{
"_index": "user_log",
"_type": "logs",
"_id": "gdUJpXIBAoADuwvHTK29",
"_score": 1,
"_source": {
"user_name": "prathameshsalap@gmail.com",
"working_hours": "2019-10-21 09:00:01",
}
{
"_index": "user_log",
"_type": "logs",
"_id": "gtUJpXIBAoADuwvHTK29",
"_version": 1,
"_score": 0,
"_source": {
"user_name": "vaishusawant143@gmail.com",
"working_hours": "2019-10-21 09:15:01",
}
{
"_index": "user_log",
"_type": "logs",
"_id": "g9UJpXIBAoADuwvHTK29",
"_version": 1,
"_score": 0,
"_source": {
"user_name": "prathameshsalap@gmail.com",
"working_hours": "2019-10-22 07:50:00",
}
{
"_index": "user_log",
"_type": "logs",
"_id": "g8UJpXIBAoADuwvHTK29",
"_version": 1,
"_score": 0,
"_source": {
"user_name": "vaishusawant143@gmail.com",
"working_hours": "2019-10-22 04:15:01",
}
在这里,为每个用户提供不同日期(21 和 22)的工作时间。我想取每个用户的工作时间的平均值。
{
"size": 0,
"query" : {"match_all": {}},
"aggs": {
"users": {
"terms": {
"field": "user_name"
},
"aggs": {
"avg_hours": {
"avg": {
"field": "working_hours"
}
}
}
}
}
}
此查询无效。如何找到每个用户在所有日期的平均工作时间?而且,我还想 运行 使用 python-elastic search 运行 这个查询。
已更新
当我使用摄取管道作为@Val 提及时。我收到一个错误:
{
"error" : {
"root_cause" : [
{
"type" : "script_exception",
"reason" : "compile error",
"processor_type" : "script",
"script_stack" : [
"\n def workDate = /\s+/.split(ctx.working_h ...",
" ^---- HERE"
],
"script" : "\n def workDate = /\s+/.split(ctx.working_hours);\n def workHours = /:/.split(workDate[1]);\n ctx.working_minutes = (Integer.parseInt(workHours[0]) * 60) + Integer.parseInt(workHours[1]);\n ",
"lang" : "painless",
"position" : {
"offset" : 24,
"start" : 0,
"end" : 49
}
}
.....
我该如何解决?
问题是您的 working_hours
字段是一个时间点,并不表示持续时间。
对于此用例,最好将工作日和工作时间存储在两个单独的字段中,并以分钟为单位存储工作时间。
所以不要有这样的文档:
{
"user_name": "prathameshsalap@gmail.com",
"working_hours": "2019-10-21 09:00:01",
}
像这样创建文档:
{
"user_name": "prathameshsalap@gmail.com",
"working_day": "2019-10-21",
"working_hours": "09:00:01",
"working_minutes": 540
}
然后您可以在 working_minutes
字段上使用您的查询:
{
"size": 0,
"query" : {"match_all": {}},
"aggs": {
"users": {
"terms": {
"field": "user_name.keyword",
"order": {
"avg_hours": "desc"
}
},
"aggs": {
"avg_hours": {
"avg": {
"field": "working_minutes"
}
}
}
}
}
}
如果在您的客户端代码中计算 working_minutes
字段不方便,您可以使用 ingest pipeline 来实现相同的目的。让我们先定义管道:
PUT _ingest/pipeline/working-hours
{
"processors": [
{
"dissect": {
"field": "working_hours",
"pattern": "%{?date} %{tmp_hours}:%{tmp_minutes}:%{?seconds}"
}
},
{
"convert": {
"field": "tmp_hours",
"type": "integer"
}
},
{
"convert": {
"field": "tmp_minutes",
"type": "integer"
}
},
{
"script": {
"source": """
ctx.working_minutes = (ctx.tmp_hours * 60) + ctx.tmp_minutes;
"""
}
},
{
"remove": {
"field": [
"tmp_hours",
"tmp_minutes"
]
}
}
]
}
然后您需要更新 Python 客户端代码以使用将为您创建 working_hours
字段的新管道:
helpers.bulk(es, reader, index='user_log', doc_type='logs', pipeline='working-hours')
我对使用 python
的弹性搜索查询感到困惑我有这样的数据:
{
"_index": "user_log",
"_type": "logs",
"_id": "gdUJpXIBAoADuwvHTK29",
"_score": 1,
"_source": {
"user_name": "prathameshsalap@gmail.com",
"working_hours": "2019-10-21 09:00:01",
}
{
"_index": "user_log",
"_type": "logs",
"_id": "gtUJpXIBAoADuwvHTK29",
"_version": 1,
"_score": 0,
"_source": {
"user_name": "vaishusawant143@gmail.com",
"working_hours": "2019-10-21 09:15:01",
}
{
"_index": "user_log",
"_type": "logs",
"_id": "g9UJpXIBAoADuwvHTK29",
"_version": 1,
"_score": 0,
"_source": {
"user_name": "prathameshsalap@gmail.com",
"working_hours": "2019-10-22 07:50:00",
}
{
"_index": "user_log",
"_type": "logs",
"_id": "g8UJpXIBAoADuwvHTK29",
"_version": 1,
"_score": 0,
"_source": {
"user_name": "vaishusawant143@gmail.com",
"working_hours": "2019-10-22 04:15:01",
}
在这里,为每个用户提供不同日期(21 和 22)的工作时间。我想取每个用户的工作时间的平均值。
{
"size": 0,
"query" : {"match_all": {}},
"aggs": {
"users": {
"terms": {
"field": "user_name"
},
"aggs": {
"avg_hours": {
"avg": {
"field": "working_hours"
}
}
}
}
}
}
此查询无效。如何找到每个用户在所有日期的平均工作时间?而且,我还想 运行 使用 python-elastic search 运行 这个查询。
已更新 当我使用摄取管道作为@Val 提及时。我收到一个错误:
{
"error" : {
"root_cause" : [
{
"type" : "script_exception",
"reason" : "compile error",
"processor_type" : "script",
"script_stack" : [
"\n def workDate = /\s+/.split(ctx.working_h ...",
" ^---- HERE"
],
"script" : "\n def workDate = /\s+/.split(ctx.working_hours);\n def workHours = /:/.split(workDate[1]);\n ctx.working_minutes = (Integer.parseInt(workHours[0]) * 60) + Integer.parseInt(workHours[1]);\n ",
"lang" : "painless",
"position" : {
"offset" : 24,
"start" : 0,
"end" : 49
}
}
.....
我该如何解决?
问题是您的 working_hours
字段是一个时间点,并不表示持续时间。
对于此用例,最好将工作日和工作时间存储在两个单独的字段中,并以分钟为单位存储工作时间。
所以不要有这样的文档:
{
"user_name": "prathameshsalap@gmail.com",
"working_hours": "2019-10-21 09:00:01",
}
像这样创建文档:
{
"user_name": "prathameshsalap@gmail.com",
"working_day": "2019-10-21",
"working_hours": "09:00:01",
"working_minutes": 540
}
然后您可以在 working_minutes
字段上使用您的查询:
{
"size": 0,
"query" : {"match_all": {}},
"aggs": {
"users": {
"terms": {
"field": "user_name.keyword",
"order": {
"avg_hours": "desc"
}
},
"aggs": {
"avg_hours": {
"avg": {
"field": "working_minutes"
}
}
}
}
}
}
如果在您的客户端代码中计算 working_minutes
字段不方便,您可以使用 ingest pipeline 来实现相同的目的。让我们先定义管道:
PUT _ingest/pipeline/working-hours
{
"processors": [
{
"dissect": {
"field": "working_hours",
"pattern": "%{?date} %{tmp_hours}:%{tmp_minutes}:%{?seconds}"
}
},
{
"convert": {
"field": "tmp_hours",
"type": "integer"
}
},
{
"convert": {
"field": "tmp_minutes",
"type": "integer"
}
},
{
"script": {
"source": """
ctx.working_minutes = (ctx.tmp_hours * 60) + ctx.tmp_minutes;
"""
}
},
{
"remove": {
"field": [
"tmp_hours",
"tmp_minutes"
]
}
}
]
}
然后您需要更新 Python 客户端代码以使用将为您创建 working_hours
字段的新管道:
helpers.bulk(es, reader, index='user_log', doc_type='logs', pipeline='working-hours')