在 fluentd 中解析日志

Parse logs in fluentd

我在 docker 容器中有本地服务器 运行,它设置为使用 fluentd 作为日志驱动程序。 我有 docker compose 文件,它在自己的容器中运行 fluentd、nginx、elasticsearch 和 kibana。 所以 fluentd 从我的服务器获取日志,将其传递给 elasticsearch 并显示在 Kibana 上。

我的问题是,如何在 fluentd 中解析我的日志(elasticsearch 或 kibana,如果在 fluentd 中不可能)以制作新标签,这样我就可以对它们进行排序并更容易导航。

这是Kibana 中显示的当前日志。现在我希望将此日志字符串 'broken' 放入新标签中。在这种情况下:

2017/01/04 13:26:56.574909 UTC (Example deployment.web) [INFO] [GET] /api/device/ 200 10.562379ms

date: 2017/01/04
time: 13:26:56.574909 UTC
message: (Example deployment.web)
logType: [INFO]
other: [GET] /api/device/ 200 10.562379ms

我的docker-compose.yml

version: "2"

services:

  fluentd:
    image: fluent/fluentd:latest
    ports:
      - "24224:24224"
    volumes:
      - ./fluentd/etc:/fluentd/etc
    command: /fluentd/etc/start.sh
    networks:
      - lognet

  elasticsearch:
    image: elasticsearch
    ports:
      - "9200:9200"
      - "9300:9300"
    volumes:
      - /usr/share/elasticsearch/data:/usr/share/elasticsearch/data
    networks:
      - lognet

  kibana:
    image: kibana
    restart: always
    ports:
      - "5601:5601"
    environment:
      - ELASTICSEARCH_URL=http://localhost:9200
    networks:
      - lognet

  nginx:
    image: nginx
    ports:
      - "8084:80"
    logging:
      driver: fluentd
    networks:
      - lognet

    networks:
      lognet:
        driver: bridge

我的fluent.conf文件,没有解析,只是简单的转发

<source>
  type forward
</source>

<match *.*>
  type elasticsearch
  host elasticsearch
  logstash_format true
  flush_interval 10s
</match>

我尝试使用正则表达式,这里我尝试解析 logType

<source>
  @type forward
</source>

<match *.*>
  type stdout 
</match>

<filter docker.**>
  @type parser  
  format /(?<logType>\[([^\)]+)\])/ 
  key_name log
  reserve_data false
</filter>

我尝试了其他配置,但 none 导致解析了我的日志。

首先,使用 tag 标记您的来源。其次,在匹配部分包含您的标签键:

include_tag_key true tag_key fluentd_key

这对我有用。日志将按 fluentd_key.

分类

对于遇到类似问题的任何人,我找到了适合我的解决方案。

在 fluent.conf 文件中添加了新的 filter 标签。例如,如果我想创建一个名为 severity 的新字段,第一步是用正则表达式记录它。

例如 [DEBU].

<filter *.*>
  @type record_transformer
  enable_ruby
  <record>
    severity ${record["log"].scan(/\[([^\)]+)\]/).last} 
  </record>
</filter>

然后从原始消息中删除:

<filter *.*>
  @type record_transformer
  enable_ruby
  <record>
    log ${record["log"].gsub(/\[([^\)]+)\]/, '')}   
  </record>
</filter>

主要部分是:

severity ${record["log"].scan(/\[([^\)]+)\]/).last} 

其中 severity 是新字段的名称,record["log"] 是原始日志字符串,其中通过正则表达式找到字符串并将其附加到新字段。

log ${record["log"].gsub(/\[([^\)]+)\]/, '')}   

此命令修改字段 log,其中正则表达式被空字符串替换 - 已删除。

注意:顺序很重要,因为我们首先必须附加到新字段,然后从原始日志消息中删除字符串(如果需要)。

我们可以使用record_transformer选项。就像下面的配置:

      <filter kubernetes.**>
        @type record_transformer
        enable_ruby true
        <record>
          container_name ${record["kubernetes"]["container_name"]}
          namespace ${record["kubernetes"]["namespace_name"]}
          pod ${record["kubernetes"]["pod_name"]}
          host ${record["kubernetes"]["host"]}
        </record>
      </filter>

由此我们可以 container_name、命名空间、pod 和主机作为 labels/tags。然后我们可以进一步使用它。以下是示例用例之一。

      <match **>
        @type elasticsearch
        host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
        port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
        logstash_format true
        logstash_prefix ${namespace}_${container_name}
        <buffer tag, container_name, namespace>
          @type file
          path /var/log/${container_name}/app.log
        </buffer>
      </match>