使用 fluentd 创建字段

Create Field using fluentd

我有使用 Fluentd 使用并发送到 Elasticsearch 的日志。如果找到字符串,我想创建一个新字段。

示例日志:

{
  "@timestamp": "2021-01-29T08:05:38.613Z",
  "@version": "1",
  "message": "Started Application in 110.374 seconds (JVM running for 113.187)",
  "level": "INFO"
}

我想创建一个新字段 STARTIME,在这种情况下,该值将是 113.187

我尝试过的是,使用 record_transformer 和 ruby 拆分来获取值,但是当它匹配时似乎从日志文件中删除了我想要的字符串。

<filter**>
  @type record_transformer
  enable_ruby true
  <record>
    STARTIME ${record["message"].split("JVM running").last.split(")")}
  </record>
</filter>

如何创建具有所需值的新字段?

我现在已经使用了下面建议的选项:

<filter**>
  @type record_transformer
  enable_ruby true
  <record>
    STARTIME ${record["message"].split("JVM running for ").last.split(")")[0]}
  </record>
</filter>

这让我离得更近了。现在发生的是字段 STARTIME 被创建,当日志条目匹配时,它的值为 113.187 这是正确的,但是与此模式不匹配的所有其他行都被添加到新的场.

也许这不是使用 Fluentd 转换解决此问题的直接答案,但您可以使用 Elasticsearch ingestion pipelines together with grok processor 来提取数据。这是一个模拟的例子:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "Enrich logs",
    "processors": [
      {
        "grok": {
          "field": "message",
          "patterns": [
            "(JVM running for %{NUMBER:start_time})"
          ]
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "@timestamp": "2021-01-29T08:05:38.613Z",
        "@version": "1",
        "message": "Started Application in 110.374 seconds (JVM running for 113.187)",
        "level": "INFO"
      }
    }
  ]
}

_source 是您提供的文档,并且有一个 grok 处理器可以从 message 字段中提取 start_time。调用此管道会导致:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "start_time" : "113.187",
          "@timestamp" : "2021-01-29T08:05:38.613Z",
          "level" : "INFO",
          "@version" : "1",
          "message" : "Started Application in 110.374 seconds (JVM running for 113.187)"
        },
        "_ingest" : {
          "timestamp" : "2021-01-29T14:09:43.447147676Z"
        }
      }
    }
  ]
}

您可以看到,转换后,您的文档包含 "start_time" : "113.187" 个值。

您可以尝试这样的操作:

<record>
  STARTIME ${ s = record['message'][/JVM running for \d{3}.\d{3}/]; s ? s.split(' ')[-1] : nil }
</record>

STARTIME 将具有有效值,否则 null