ElasticSearch 映射对分组文档进行折叠/执行操作的结果
ElasticSearch mapping the result of collapse / do operations on a grouped documents
有一个对话列表,每个对话都有一个消息列表。每条消息都有不同的字段和一个 action
字段。我们需要考虑的是,在对话的第一条消息中使用了动作 A
,在几条消息之后使用了动作 A.1
,过了一会儿 A.1.1
等等(有是聊天机器人意图的列表)。
将对话的消息操作分组如下:A > A > A > A.1 > A > A.1 > A.1.1 ...
问题:
我需要使用 ElasticSearch 创建一个报告,该报告将 return 每个对话的 actions group
;接下来,我需要对相似的 actions groups
进行分组并添加一个计数;最后将导致 Map<actionsGroup, count>
为 'A > A.1 > A > A.1 > A.1.1', 3
.
构建 actions group
我需要消除每组重复项;而不是 A > A > A > A.1 > A > A.1 > A.1.1
我需要 A > A.1 > A > A.1 > A.1.1
.
我开始做的步骤:
{
"collapse":{
"field":"context.conversationId",
"inner_hits":{
"name":"logs",
"size": 10000,
"sort":[
{
"@timestamp":"asc"
}
]
}
},
"aggs":{
},
}
接下来需要什么:
- 我需要将崩溃的结果映射到单个结果中,例如
A > A.1 > A > A.1 > A.1.1
。我已经看到在这种情况下 or aggr
可以对结果使用 scripts 并且可以创建我需要的操作列表,但是 aggr
正在做对所有消息的操作,而不仅仅是对我崩溃的分组消息的操作。是否可以使用 aggr
inside collapse 或类似的解决方案?
- 我需要对所有折叠的结果值 (
A > A.1 > A > A.1 > A.1.1
) 进行分组,添加计数并得到 Map<actionsGroup, count>
。
或:
- 使用
aggr
按 conversationId
字段对对话消息进行分组(我不知道该怎么做)
- 使用脚本迭代所有值并为每个对话创建
actions group
。 (不确定这是否可能)
- 对所有值使用另一个
aggr
并将重复项分组,returning Map<actionsGroup, count>
。
更新 2: 我设法得到了部分结果,但仍然存在一个问题。请检查 here 我还需要修复什么。
更新 1: 添加一些额外的细节
映射:
"mappings":{
"properties":{
"@timestamp":{
"type":"date",
"format": "epoch_millis"
}
"context":{
"properties":{
"action":{
"type":"keyword"
},
"conversationId":{
"type":"keyword"
}
}
}
}
}
对话样本文件:
Conversation 1.
{
"@timestamp": 1579632745000,
"context": {
"action": "A",
"conversationId": "conv_id1",
}
},
{
"@timestamp": 1579632745001,
"context": {
"action": "A.1",
"conversationId": "conv_id1",
}
},
{
"@timestamp": 1579632745002,
"context": {
"action": "A.1.1",
"conversationId": "conv_id1",
}
}
Conversation 2.
{
"@timestamp": 1579632745000,
"context": {
"action": "A",
"conversationId": "conv_id2",
}
},
{
"@timestamp": 1579632745001,
"context": {
"action": "A.1",
"conversationId": "conv_id2",
}
},
{
"@timestamp": 1579632745002,
"context": {
"action": "A.1.1",
"conversationId": "conv_id2",
}
}
Conversation 3.
{
"@timestamp": 1579632745000,
"context": {
"action": "B",
"conversationId": "conv_id3",
}
},
{
"@timestamp": 1579632745001,
"context": {
"action": "B.1",
"conversationId": "conv_id3",
}
}
预期结果:
{
"A -> A.1 -> A.1.1": 2,
"B -> B.1": 1
}
Something similar, having this or any other format.
因为我是 elasticsearch 的新手,所以非常欢迎每个提示。
使用 Terms aggregation 中的脚本,我们可以在 "context.action" 的第一个字符上创建桶。使用相似术语子聚合
我们可以得到父桶 ex A-> A.1->A.1.1 ..."context.action" 下的所有 "context.action" ...
查询:
{
"size": 0,
"aggs": {
"conversations": {
"terms": {
"script": {
"source": "def term=doc['context.action'].value; return term.substring(0,1);"
---> returns first character ex A,B,C etc
},
"size": 10
},
"aggs": {
"sub_conversations": {
"terms": {
"script": {
"source": "if(doc['context.action'].value.length()>1) return doc['context.action'];"--> All context.action under [A], length check to ignore [A]
},
"size": 10
}
},
"count": {
"cardinality": {
"script": {
"source": "if(doc['context.action'].value.length()>1) return doc['context.action'];"--> count of all context.action under A
}
}
}
}
}
}
}
因为在弹性搜索中无法加入不同的文档。您将必须通过遍历聚合存储桶在客户端获取组合密钥。
结果:
"aggregations" : {
"conversations" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "A",
"doc_count" : 6,
"sub_conversations" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "A.1",
"doc_count" : 2
},
{
"key" : "A.1.1",
"doc_count" : 2
}
]
},
"count" : {
"value" : 2
}
},
{
"key" : "B",
"doc_count" : 2,
"sub_conversations" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "B.1",
"doc_count" : 1
}
]
},
"count" : {
"value" : 1
}
}
]
}
}
我用elastic的scripted_metric
解决了。此外,index
已从初始状态更改。
脚本:
{
"size": 0,
"aggs": {
"intentPathsCountAgg": {
"scripted_metric": {
"init_script": "state.messagesList = new ArrayList();",
"map_script": "long currentMessageTime = doc['messageReceivedEvent.context.timestamp'].value.millis; Map currentMessage = ['conversationId': doc['messageReceivedEvent.context.conversationId.keyword'], 'time': currentMessageTime, 'intentsPath': doc['brainQueryRequestEvent.brainQueryRequest.user_data.intentsHistoryPath.keyword'].value]; state.messagesList.add(currentMessage);",
"combine_script": "return state",
"reduce_script": "List messages = new ArrayList(); Map conversationsMap = new HashMap(); Map intentsMap = new HashMap(); String[] ifElseWorkaround = new String[1]; for (state in states) { messages.addAll(state.messagesList);} messages.stream().forEach((message) -> { Map existingMessage = conversationsMap.get(message.conversationId); if(existingMessage == null || message.time > existingMessage.time) { conversationsMap.put(message.conversationId, ['time': message.time, 'intentsPath': message.intentsPath]); } else { ifElseWorkaround[0] = ''; } }); conversationsMap.entrySet().forEach(conversation -> { if (intentsMap.containsKey(conversation.getValue().intentsPath)) { long intentsCount = intentsMap.get(conversation.getValue().intentsPath) + 1; intentsMap.put(conversation.getValue().intentsPath, intentsCount); } else {intentsMap.put(conversation.getValue().intentsPath, 1L);} }); return intentsMap.entrySet().stream().map(intentPath -> [intentPath.getKey().toString(): intentPath.getValue()]).collect(Collectors.toSet()) "
}
}
}
}
格式化脚本(为了更好的可读性 - 使用 .ts):
scripted_metric: {
init_script: 'state.messagesList = new ArrayList();',
map_script: `
long currentMessageTime = doc['messageReceivedEvent.context.timestamp'].value.millis;
Map currentMessage = [
'conversationId': doc['messageReceivedEvent.context.conversationId.keyword'],
'time': currentMessageTime,
'intentsPath': doc['brainQueryRequestEvent.brainQueryRequest.user_data.intentsHistoryPath.keyword'].value
];
state.messagesList.add(currentMessage);`,
combine_script: 'return state',
reduce_script: `
List messages = new ArrayList();
Map conversationsMap = new HashMap();
Map intentsMap = new HashMap();
boolean[] ifElseWorkaround = new boolean[1];
for (state in states) {
messages.addAll(state.messagesList);
}
messages.stream().forEach(message -> {
Map existingMessage = conversationsMap.get(message.conversationId);
if(existingMessage == null || message.time > existingMessage.time) {
conversationsMap.put(message.conversationId, ['time': message.time, 'intentsPath': message.intentsPath]);
} else {
ifElseWorkaround[0] = true;
}
});
conversationsMap.entrySet().forEach(conversation -> {
if (intentsMap.containsKey(conversation.getValue().intentsPath)) {
long intentsCount = intentsMap.get(conversation.getValue().intentsPath) + 1;
intentsMap.put(conversation.getValue().intentsPath, intentsCount);
} else {
intentsMap.put(conversation.getValue().intentsPath, 1L);
}
});
return intentsMap.entrySet().stream().map(intentPath -> [
'path': intentPath.getKey().toString(),
'count': intentPath.getValue()
]).collect(Collectors.toSet())`
答案:
{
"took": 2,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 11,
"relation": "eq"
},
"max_score": null,
"hits": []
},
"aggregations": {
"intentPathsCountAgg": {
"value": [
{
"smallTalk.greet -> smallTalk.greet2 -> smallTalk.greet3": 2
},
{
"smallTalk.greet -> smallTalk.greet2 -> smallTalk.greet3 -> smallTalk.greet4": 1
},
{
"smallTalk.greet -> smallTalk.greet2": 1
}
]
}
}
}
有一个对话列表,每个对话都有一个消息列表。每条消息都有不同的字段和一个 action
字段。我们需要考虑的是,在对话的第一条消息中使用了动作 A
,在几条消息之后使用了动作 A.1
,过了一会儿 A.1.1
等等(有是聊天机器人意图的列表)。
将对话的消息操作分组如下:A > A > A > A.1 > A > A.1 > A.1.1 ...
问题:
我需要使用 ElasticSearch 创建一个报告,该报告将 return 每个对话的 actions group
;接下来,我需要对相似的 actions groups
进行分组并添加一个计数;最后将导致 Map<actionsGroup, count>
为 'A > A.1 > A > A.1 > A.1.1', 3
.
构建 actions group
我需要消除每组重复项;而不是 A > A > A > A.1 > A > A.1 > A.1.1
我需要 A > A.1 > A > A.1 > A.1.1
.
我开始做的步骤:
{
"collapse":{
"field":"context.conversationId",
"inner_hits":{
"name":"logs",
"size": 10000,
"sort":[
{
"@timestamp":"asc"
}
]
}
},
"aggs":{
},
}
接下来需要什么:
- 我需要将崩溃的结果映射到单个结果中,例如
A > A.1 > A > A.1 > A.1.1
。我已经看到在这种情况下 oraggr
可以对结果使用 scripts 并且可以创建我需要的操作列表,但是aggr
正在做对所有消息的操作,而不仅仅是对我崩溃的分组消息的操作。是否可以使用aggr
inside collapse 或类似的解决方案? - 我需要对所有折叠的结果值 (
A > A.1 > A > A.1 > A.1.1
) 进行分组,添加计数并得到Map<actionsGroup, count>
。
或:
- 使用
aggr
按conversationId
字段对对话消息进行分组(我不知道该怎么做) - 使用脚本迭代所有值并为每个对话创建
actions group
。 (不确定这是否可能) - 对所有值使用另一个
aggr
并将重复项分组,returningMap<actionsGroup, count>
。
更新 2: 我设法得到了部分结果,但仍然存在一个问题。请检查 here 我还需要修复什么。
更新 1: 添加一些额外的细节
映射:
"mappings":{
"properties":{
"@timestamp":{
"type":"date",
"format": "epoch_millis"
}
"context":{
"properties":{
"action":{
"type":"keyword"
},
"conversationId":{
"type":"keyword"
}
}
}
}
}
对话样本文件:
Conversation 1.
{
"@timestamp": 1579632745000,
"context": {
"action": "A",
"conversationId": "conv_id1",
}
},
{
"@timestamp": 1579632745001,
"context": {
"action": "A.1",
"conversationId": "conv_id1",
}
},
{
"@timestamp": 1579632745002,
"context": {
"action": "A.1.1",
"conversationId": "conv_id1",
}
}
Conversation 2.
{
"@timestamp": 1579632745000,
"context": {
"action": "A",
"conversationId": "conv_id2",
}
},
{
"@timestamp": 1579632745001,
"context": {
"action": "A.1",
"conversationId": "conv_id2",
}
},
{
"@timestamp": 1579632745002,
"context": {
"action": "A.1.1",
"conversationId": "conv_id2",
}
}
Conversation 3.
{
"@timestamp": 1579632745000,
"context": {
"action": "B",
"conversationId": "conv_id3",
}
},
{
"@timestamp": 1579632745001,
"context": {
"action": "B.1",
"conversationId": "conv_id3",
}
}
预期结果:
{
"A -> A.1 -> A.1.1": 2,
"B -> B.1": 1
}
Something similar, having this or any other format.
因为我是 elasticsearch 的新手,所以非常欢迎每个提示。
使用 Terms aggregation 中的脚本,我们可以在 "context.action" 的第一个字符上创建桶。使用相似术语子聚合 我们可以得到父桶 ex A-> A.1->A.1.1 ..."context.action" 下的所有 "context.action" ...
查询:
{
"size": 0,
"aggs": {
"conversations": {
"terms": {
"script": {
"source": "def term=doc['context.action'].value; return term.substring(0,1);"
---> returns first character ex A,B,C etc
},
"size": 10
},
"aggs": {
"sub_conversations": {
"terms": {
"script": {
"source": "if(doc['context.action'].value.length()>1) return doc['context.action'];"--> All context.action under [A], length check to ignore [A]
},
"size": 10
}
},
"count": {
"cardinality": {
"script": {
"source": "if(doc['context.action'].value.length()>1) return doc['context.action'];"--> count of all context.action under A
}
}
}
}
}
}
}
因为在弹性搜索中无法加入不同的文档。您将必须通过遍历聚合存储桶在客户端获取组合密钥。
结果:
"aggregations" : {
"conversations" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "A",
"doc_count" : 6,
"sub_conversations" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "A.1",
"doc_count" : 2
},
{
"key" : "A.1.1",
"doc_count" : 2
}
]
},
"count" : {
"value" : 2
}
},
{
"key" : "B",
"doc_count" : 2,
"sub_conversations" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "B.1",
"doc_count" : 1
}
]
},
"count" : {
"value" : 1
}
}
]
}
}
我用elastic的scripted_metric
解决了。此外,index
已从初始状态更改。
脚本:
{
"size": 0,
"aggs": {
"intentPathsCountAgg": {
"scripted_metric": {
"init_script": "state.messagesList = new ArrayList();",
"map_script": "long currentMessageTime = doc['messageReceivedEvent.context.timestamp'].value.millis; Map currentMessage = ['conversationId': doc['messageReceivedEvent.context.conversationId.keyword'], 'time': currentMessageTime, 'intentsPath': doc['brainQueryRequestEvent.brainQueryRequest.user_data.intentsHistoryPath.keyword'].value]; state.messagesList.add(currentMessage);",
"combine_script": "return state",
"reduce_script": "List messages = new ArrayList(); Map conversationsMap = new HashMap(); Map intentsMap = new HashMap(); String[] ifElseWorkaround = new String[1]; for (state in states) { messages.addAll(state.messagesList);} messages.stream().forEach((message) -> { Map existingMessage = conversationsMap.get(message.conversationId); if(existingMessage == null || message.time > existingMessage.time) { conversationsMap.put(message.conversationId, ['time': message.time, 'intentsPath': message.intentsPath]); } else { ifElseWorkaround[0] = ''; } }); conversationsMap.entrySet().forEach(conversation -> { if (intentsMap.containsKey(conversation.getValue().intentsPath)) { long intentsCount = intentsMap.get(conversation.getValue().intentsPath) + 1; intentsMap.put(conversation.getValue().intentsPath, intentsCount); } else {intentsMap.put(conversation.getValue().intentsPath, 1L);} }); return intentsMap.entrySet().stream().map(intentPath -> [intentPath.getKey().toString(): intentPath.getValue()]).collect(Collectors.toSet()) "
}
}
}
}
格式化脚本(为了更好的可读性 - 使用 .ts):
scripted_metric: {
init_script: 'state.messagesList = new ArrayList();',
map_script: `
long currentMessageTime = doc['messageReceivedEvent.context.timestamp'].value.millis;
Map currentMessage = [
'conversationId': doc['messageReceivedEvent.context.conversationId.keyword'],
'time': currentMessageTime,
'intentsPath': doc['brainQueryRequestEvent.brainQueryRequest.user_data.intentsHistoryPath.keyword'].value
];
state.messagesList.add(currentMessage);`,
combine_script: 'return state',
reduce_script: `
List messages = new ArrayList();
Map conversationsMap = new HashMap();
Map intentsMap = new HashMap();
boolean[] ifElseWorkaround = new boolean[1];
for (state in states) {
messages.addAll(state.messagesList);
}
messages.stream().forEach(message -> {
Map existingMessage = conversationsMap.get(message.conversationId);
if(existingMessage == null || message.time > existingMessage.time) {
conversationsMap.put(message.conversationId, ['time': message.time, 'intentsPath': message.intentsPath]);
} else {
ifElseWorkaround[0] = true;
}
});
conversationsMap.entrySet().forEach(conversation -> {
if (intentsMap.containsKey(conversation.getValue().intentsPath)) {
long intentsCount = intentsMap.get(conversation.getValue().intentsPath) + 1;
intentsMap.put(conversation.getValue().intentsPath, intentsCount);
} else {
intentsMap.put(conversation.getValue().intentsPath, 1L);
}
});
return intentsMap.entrySet().stream().map(intentPath -> [
'path': intentPath.getKey().toString(),
'count': intentPath.getValue()
]).collect(Collectors.toSet())`
答案:
{
"took": 2,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 11,
"relation": "eq"
},
"max_score": null,
"hits": []
},
"aggregations": {
"intentPathsCountAgg": {
"value": [
{
"smallTalk.greet -> smallTalk.greet2 -> smallTalk.greet3": 2
},
{
"smallTalk.greet -> smallTalk.greet2 -> smallTalk.greet3 -> smallTalk.greet4": 1
},
{
"smallTalk.greet -> smallTalk.greet2": 1
}
]
}
}
}