如何使用 filebeat 和处理器解析混合自定义日志

How to parse a mixed custom log using filebeat and processors

我正在尝试仅使用 filebeat 和处理器来解析自定义日志。我不想使用 Logstash 和管道。

下面是日志示例:

TID: [-1234] [] [2021-08-25 16:25:52,021]  INFO {org.wso2.carbon.event.output.adapter.logger.LoggerEventAdapter} - Unique ID: Evento_Teste, Event: {"event":{"host":"example.com","server":"WSO2 API Manager"}}

然后,我需要获取日期 2021-08-25 16:25:52,021 并将其设为我的 _doc 时间戳并获取 Event 并将其设为我的 message.

经过多次尝试,我只能使用以下配置剖析日志:

filebeat.inputs:

- type: log

  enabled: true
  paths:
    - /tmp/a.log
  processors:
    - dissect:
        tokenizer: "TID: [-1234] [] [%{@timestamp}]  INFO {org.wso2.carbon.event.output.adapter.logger.LoggerEventAdapter} - Unique ID: Evento_Teste, Event: %{event}"
        field: "message"

output.console:
  pretty: true

并得到以下输出:

{
  "@timestamp": "2021-08-25T19:58:00.525Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "_doc",
    "version": "7.12.1"
  },
  "input": {
    "type": "log"
  },
  "dissect": {
    "@timestamp": "2021-08-25 16:25:52,021",
    "event": "{\"event\":{\"host\":\"example.com\",\"server\":\"WSO2 API Manager\"}}"
  },
  "host": {
    "name": "dtrsrvhomapim301"
  },
  "agent": {
    "ephemeral_id": "1555da2b-234f-444e-a0fe-42b49fb73b38",
    "id": "1b43e769-87be-4087-9876-70281ceb3cf5",
    "name": "dtrsrvhomapim301",
    "type": "filebeat",
    "version": "7.12.1",
    "hostname": "dtrsrvhomapim301"
  },
  "ecs": {
    "version": "1.8.0"
  },
  "log": {
    "offset": 0,
    "file": {
      "path": "/tmp/a.log"
    }
  },
  "message": "TID: [-1234] [] [2021-08-25 16:25:52,021]  INFO {org.wso2.carbon.event.output.adapter.logger.LoggerEventAdapter} - Unique ID: Evento_Teste, Event: {\"event\":{\"host\":\"example.com\",\"server\":\"WSO2 API Manager\"}}"
}

我不知道如何将 dissect.@timestamp 作为我的 @timestamp,以及如何将 dissect.event 解析为 json 并使其成为我的 message.

这些怎么做到的?

您可以使用 target_prefix: "" 来避免“解剖”前缀。 Json 字段可以使用 decode_json_fields 处理器提取。您可能希望使用 script 将日志时间戳中的“,”转换为“.”。因为时间戳处理器不支持用逗号解析时间戳。 timestamp 处理器的目标字段默认为@timestamp

 processors:
    - dissect:
        tokenizer: "TID: [-1234] [] [%{time}]  INFO {org.wso2.carbon.event.output.adapter.logger.LoggerEventAdapter} - Unique ID: Evento_Teste, Event: %{event}"
        field: "message"
        target_prefix: ""
        overwrite_keys: true
    - decode_json_fields:
        fields: ["event"]
        process_array: false
        max_depth: 2
        target: ""
        overwrite_keys: true
        add_error_key: true
    - script:
        lang: javascript
        source: >
          function process(evt) {
            var ts = evt.Get('time').replace(',', '.');
            evt.Put('time', ts);
          }
    - timestamp:
        field: "time"
        layouts:
          - '2006-01-02 15:04:05.999'
  output.console:
    pretty: true

根据 Swarna 的回答,我想出了以下代码:

filebeat.inputs:

- type: log
  enabled: true
  paths:
    - /tmp/a.log
  processors:
    - dissect:
        tokenizer: "TID: [-1234] [] [%{wso2timestamp}]  INFO {org.wso2.carbon.event.output.adapter.logger.LoggerEventAdapter} - Unique ID: Evento_Teste, Event: %{event}"
        field: "message"
    - decode_json_fields:
        fields: ["dissect.event"]
        process_array: false
        max_depth: 1
        target: "message"
        overwrite_keys: false
        add_error_key: true
    - script:
        lang: javascript
        id: myscript1
        source: >
          function process(event) {
            var wso2ts = event.Get("dissect.wso2timestamp")
            wso2ts = new Date(wso2ts.replace(' ', 'T').replace(',','.'))
            event.Put("dissect.wso2ts", wso2ts)
          }
    - timestamp:
        field: dissect.wso2ts
        layouts:
          - '2006-01-02T15:04:05.999Z'
    - drop_fields:
        fields: [dissect]

output.console:
  pretty: true

获取输出:

{
  "@timestamp": "2021-08-25T16:25:52.021Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "_doc",
    "version": "7.12.1"
  },
  "ecs": {
    "version": "1.8.0"
  },
  "log": {
    "offset": 0,
    "file": {
      "path": "/tmp/a.log"
    }
  },
  "message": {
    "event": {
      "host": "example.com",
      "server": "WSO2 API Manager"
    }
  },
  "input": {
    "type": "log"
  },
  "host": {
    "name": "dtrsrvhomapim301"
  },
  "agent": {
    "type": "filebeat",
    "version": "7.12.1",
    "hostname": "dtrsrvhomapim301",
    "ephemeral_id": "c3393f80-e924-44d8-92ff-5c09a0926dbd",
    "id": "34259a65-2480-495c-8ec6-dc28989ebdd0",
    "name": "dtrsrvhomapim301"
  }
}