Elasticsearch Entity-centric indexing with Transforms

Elasticsearch Entity-centric indexing with Transforms

我们是第一次使用 Elasticsearch,我们目前正在决定什么是解决我们手头问题的最佳解决方案。

我们正在从我们的应用程序直接接收基于事件的日志(以 JSON 形式)到 Elasticsearch 索引。这些日志高度相互关联(它们共享一个通用的唯一 ID),因此我们需要 convert/aggregate 以实体为中心的方式处理它们。

每个事件通常在目标字段中都有一个状态变化。状态不仅仅是 start/end。 Document 拥有更多数据,可用于创建多个以实体为中心的索引。

{
*uniqueID*: ain123in145512kn
name: Bob
target: {
eventStart: {timestamp: 2020-06-01T13:50:55.000Z}
}
}
{
*uniqueID*: ain123in145512kn
name: Bob
target: {
eventStop: {timestamp: 2021-06-01T13:50:55.000Z}
}
}

我们已经能够使用 Python 或 Logstash 加入这些文档。我们基本上创建了一个包含以下文档的索引:

{
*uniqueID*: ain123in145512kn
name: Bob
target: {
eventStart: {timestamp: 2020-06-01T13:50:55.000Z},
eventStop: {timestamp: 2021-06-01T13:50:55.000Z}
**time_dif_Start_Stop : xxxx**
}
}

我们分配了与自动更新它们的 uniqueID 相同的所有事件文档 ID。下一步只是计算了 eventStart 和 eventStop 时间戳之间的差异。

我们对我们的管道有一定的要求,所以我们希望数据永远不必离开 elasticsearch。因此,我们想知道 是否可以使用 ELK 堆栈中已经存在或托管在 Elastic 云中的任何工具来执行此操作? 我们尝试使用 Transforms,但我们只能够计算新索引中的聚合字段。是否可以使用此工具或任何其他工具将所有文档基本上 merge/update 合并为一个文档?这对我们来说是理想的,因为它是 运行 按计划进行的,我们不需要任何外部工具来修改文档。

任何其他建议或帮助也将不胜感激。

转换听起来不错。我尝试了以下快速示例:

PUT test/_doc/1
{
  "uniqueID": "one",
  "eventStart": {
    "timestamp": "2020-06-01T13:50:55.000Z"
  }
}
PUT test/_doc/2
{
  "uniqueID": "one",
  "eventStop": {
    "timestamp": "2020-06-01T13:53:55.000Z"
  }
}
PUT test/_doc/3
{
  "uniqueID": "one",
  "eventStop": {
    "timestamp": "2020-06-01T13:54:55.000Z"
  }
}
PUT test/_doc/4
{
  "uniqueID": "other",
  "eventStop": {
    "timestamp": "2020-06-01T13:54:55.000Z"
  }
}

GET test/_mapping


POST _transform/_preview
{
  "source": {
    "index": "test"
  },
  "dest": {
    "index": "test_transformed"
  },
  "pivot": {
    "group_by": {
      "id": {
        "terms": {
          "field": "uniqueID.keyword"
        }
      }
    },
    "aggregations": {
      "event_count": {
        "value_count": {
          "field": "_id"
        }
      },
      "start": {
        "min": {
          "field": "eventStart.timestamp"
        }
      },
      "stop": {
        "max": {
          "field": "eventStop.timestamp"
        }
      },
      "duration": {
        "bucket_script": {
          "buckets_path": { 
            "start": "start.value",
            "stop": "stop.value"
          },
        "script": """
          return (params.stop - params.start)/1000; //in seconds (initially in ms)
          """
        }
      }
    }
  }
}

生成以下结果 — 聚合和计算看起来正确:

[
    {
      "duration" : 240.0,
      "stop" : "2020-06-01T13:54:55.000Z",
      "event_count" : 3,
      "start" : "2020-06-01T13:50:55.000Z",
      "id" : "one"
    },
    {
      "stop" : "2020-06-01T13:54:55.000Z",
      "event_count" : 1,
      "start" : null,
      "id" : "other"
    }
]

PS:我已经将答案变成了 blog post,更深入地探讨了一般主题 :)