Elasticsearch Entity-centric indexing with Transforms
Elasticsearch Entity-centric indexing with Transforms
我们是第一次使用 Elasticsearch,我们目前正在决定什么是解决我们手头问题的最佳解决方案。
我们正在从我们的应用程序直接接收基于事件的日志(以 JSON 形式)到 Elasticsearch 索引。这些日志高度相互关联(它们共享一个通用的唯一 ID),因此我们需要 convert/aggregate 以实体为中心的方式处理它们。
每个事件通常在目标字段中都有一个状态变化。状态不仅仅是 start/end。 Document 拥有更多数据,可用于创建多个以实体为中心的索引。
{
*uniqueID*: ain123in145512kn
name: Bob
target: {
eventStart: {timestamp: 2020-06-01T13:50:55.000Z}
}
}
{
*uniqueID*: ain123in145512kn
name: Bob
target: {
eventStop: {timestamp: 2021-06-01T13:50:55.000Z}
}
}
我们已经能够使用 Python 或 Logstash 加入这些文档。我们基本上创建了一个包含以下文档的索引:
{
*uniqueID*: ain123in145512kn
name: Bob
target: {
eventStart: {timestamp: 2020-06-01T13:50:55.000Z},
eventStop: {timestamp: 2021-06-01T13:50:55.000Z}
**time_dif_Start_Stop : xxxx**
}
}
我们分配了与自动更新它们的 uniqueID 相同的所有事件文档 ID。下一步只是计算了 eventStart 和 eventStop 时间戳之间的差异。
我们对我们的管道有一定的要求,所以我们希望数据永远不必离开 elasticsearch。因此,我们想知道 是否可以使用 ELK 堆栈中已经存在或托管在 Elastic 云中的任何工具来执行此操作? 我们尝试使用 Transforms,但我们只能够计算新索引中的聚合字段。是否可以使用此工具或任何其他工具将所有文档基本上 merge/update 合并为一个文档?这对我们来说是理想的,因为它是 运行 按计划进行的,我们不需要任何外部工具来修改文档。
任何其他建议或帮助也将不胜感激。
转换听起来不错。我尝试了以下快速示例:
PUT test/_doc/1
{
"uniqueID": "one",
"eventStart": {
"timestamp": "2020-06-01T13:50:55.000Z"
}
}
PUT test/_doc/2
{
"uniqueID": "one",
"eventStop": {
"timestamp": "2020-06-01T13:53:55.000Z"
}
}
PUT test/_doc/3
{
"uniqueID": "one",
"eventStop": {
"timestamp": "2020-06-01T13:54:55.000Z"
}
}
PUT test/_doc/4
{
"uniqueID": "other",
"eventStop": {
"timestamp": "2020-06-01T13:54:55.000Z"
}
}
GET test/_mapping
POST _transform/_preview
{
"source": {
"index": "test"
},
"dest": {
"index": "test_transformed"
},
"pivot": {
"group_by": {
"id": {
"terms": {
"field": "uniqueID.keyword"
}
}
},
"aggregations": {
"event_count": {
"value_count": {
"field": "_id"
}
},
"start": {
"min": {
"field": "eventStart.timestamp"
}
},
"stop": {
"max": {
"field": "eventStop.timestamp"
}
},
"duration": {
"bucket_script": {
"buckets_path": {
"start": "start.value",
"stop": "stop.value"
},
"script": """
return (params.stop - params.start)/1000; //in seconds (initially in ms)
"""
}
}
}
}
}
生成以下结果 — 聚合和计算看起来正确:
[
{
"duration" : 240.0,
"stop" : "2020-06-01T13:54:55.000Z",
"event_count" : 3,
"start" : "2020-06-01T13:50:55.000Z",
"id" : "one"
},
{
"stop" : "2020-06-01T13:54:55.000Z",
"event_count" : 1,
"start" : null,
"id" : "other"
}
]
PS:我已经将答案变成了 blog post,更深入地探讨了一般主题 :)
我们是第一次使用 Elasticsearch,我们目前正在决定什么是解决我们手头问题的最佳解决方案。
我们正在从我们的应用程序直接接收基于事件的日志(以 JSON 形式)到 Elasticsearch 索引。这些日志高度相互关联(它们共享一个通用的唯一 ID),因此我们需要 convert/aggregate 以实体为中心的方式处理它们。
每个事件通常在目标字段中都有一个状态变化。状态不仅仅是 start/end。 Document 拥有更多数据,可用于创建多个以实体为中心的索引。
{
*uniqueID*: ain123in145512kn
name: Bob
target: {
eventStart: {timestamp: 2020-06-01T13:50:55.000Z}
}
}
{
*uniqueID*: ain123in145512kn
name: Bob
target: {
eventStop: {timestamp: 2021-06-01T13:50:55.000Z}
}
}
我们已经能够使用 Python 或 Logstash 加入这些文档。我们基本上创建了一个包含以下文档的索引:
{
*uniqueID*: ain123in145512kn
name: Bob
target: {
eventStart: {timestamp: 2020-06-01T13:50:55.000Z},
eventStop: {timestamp: 2021-06-01T13:50:55.000Z}
**time_dif_Start_Stop : xxxx**
}
}
我们分配了与自动更新它们的 uniqueID 相同的所有事件文档 ID。下一步只是计算了 eventStart 和 eventStop 时间戳之间的差异。
我们对我们的管道有一定的要求,所以我们希望数据永远不必离开 elasticsearch。因此,我们想知道 是否可以使用 ELK 堆栈中已经存在或托管在 Elastic 云中的任何工具来执行此操作? 我们尝试使用 Transforms,但我们只能够计算新索引中的聚合字段。是否可以使用此工具或任何其他工具将所有文档基本上 merge/update 合并为一个文档?这对我们来说是理想的,因为它是 运行 按计划进行的,我们不需要任何外部工具来修改文档。
任何其他建议或帮助也将不胜感激。
转换听起来不错。我尝试了以下快速示例:
PUT test/_doc/1
{
"uniqueID": "one",
"eventStart": {
"timestamp": "2020-06-01T13:50:55.000Z"
}
}
PUT test/_doc/2
{
"uniqueID": "one",
"eventStop": {
"timestamp": "2020-06-01T13:53:55.000Z"
}
}
PUT test/_doc/3
{
"uniqueID": "one",
"eventStop": {
"timestamp": "2020-06-01T13:54:55.000Z"
}
}
PUT test/_doc/4
{
"uniqueID": "other",
"eventStop": {
"timestamp": "2020-06-01T13:54:55.000Z"
}
}
GET test/_mapping
POST _transform/_preview
{
"source": {
"index": "test"
},
"dest": {
"index": "test_transformed"
},
"pivot": {
"group_by": {
"id": {
"terms": {
"field": "uniqueID.keyword"
}
}
},
"aggregations": {
"event_count": {
"value_count": {
"field": "_id"
}
},
"start": {
"min": {
"field": "eventStart.timestamp"
}
},
"stop": {
"max": {
"field": "eventStop.timestamp"
}
},
"duration": {
"bucket_script": {
"buckets_path": {
"start": "start.value",
"stop": "stop.value"
},
"script": """
return (params.stop - params.start)/1000; //in seconds (initially in ms)
"""
}
}
}
}
}
生成以下结果 — 聚合和计算看起来正确:
[
{
"duration" : 240.0,
"stop" : "2020-06-01T13:54:55.000Z",
"event_count" : 3,
"start" : "2020-06-01T13:50:55.000Z",
"id" : "one"
},
{
"stop" : "2020-06-01T13:54:55.000Z",
"event_count" : 1,
"start" : null,
"id" : "other"
}
]
PS:我已经将答案变成了 blog post,更深入地探讨了一般主题 :)