如何根据 fluentd 中的严重性过滤日志并将其发送到 2 个不同的日志系统

How to filter logs based on severity in fluentd and send it to 2 different logging systems

我需要帮助配置 Fluentd 以根据严重性过滤日志。

我们有 2 个不同的监控系统 Elasticsearch 和 Splunk,当我们在应用程序中启用日志级别 DEBUG 时,它每天都会生成大量日志,因此我们希望根据严重性过滤日志并将其推送到 2 个不同的日志系统。

当日志严重性为:INFO 和 ERROR 时,然后将容器日志转发到 Splunk,除了那些 DEBUG、TRACE、WARN 和其他日志应该去 elastocsearch,请帮助我如何过滤它。

日志生成格式如下:

event.log:{"@severity":"DEBUG","@timestamp":"2019-01-18T00:15:34.416Z","@traceId" :

event.log:{"@severity":"INFO","@timestamp":"2019-01-18T00:15:34.397Z","@traceId" :

event.log:{"@severity":"WARN","@timestamp":"2019-01-18T00:15:34.920Z","@traceId" :

请在下面找到 fluentd 配置。

我在过滤器中添加了排除方法,还安装了 grep 插件添加了 grep 方法,但它不起作用。

添加了用于测试的过滤器:

<exclude>
       @type grep
       key severity 
       pattern DEBUG
      </exclude>

还添加了:

<filter kubernetes.**>
@type grep
exclude1 severity (DEBUG|NOTICE|WARN)
</filter>

kind: ConfigMap
apiVersion: v1
metadata:
  name: fluentd-config
  namespace: logging
  labels:
    k8s-app: fluentd
data:
  fluentd-standalone.conf: |
    <match fluent.**>
      @type null
    </match>
    # include other configs
    @include systemd.conf
    @include kubernetes.conf
  fluentd.conf: |
    @include systemd.conf
    @include kubernetes.conf
  fluentd.conf: |
    # Use the config specified by the FLUENTD_CONFIG environment variable, or
    # default to fluentd-standalone.conf
    @include "#{ENV['FLUENTD_CONFIG'] || 'fluentd-standalone.conf'}"
  kubernetes.conf: |
    <source>
      @type tail
      @log_level debug
      path /var/log/containers/*.log
      pos_file /var/log/kubernetes.log.pos
      time_format %Y-%m-%dT%H:%M:%S.%NZ
      tag kubernetes.*
      format json
    </source>
    <filter kubernetes.**>
      @type kubernetes_metadata
      verify_ssl false
      <exclude>
       @type grep
       key severity 
       pattern DEBUG
      </exclude>
    </filter>
    <filter kubernetes.**>
      @type record_transformer
      enable_ruby
      <record>
        event ${record}
      </record>
      renew_record
      auto_typecast
    </filter>
    <filter kubernetes.**>
    @type grep
    exclude1 severity (DEBUG|NOTICE|WARN)
    </filter>
  kubernetes.conf: |
    <source>
      @type tail
      @log_level debug
      path /var/log/containers/*.log
      pos_file /var/log/kubernetes.log.pos
      time_format %Y-%m-%dT%H:%M:%S.%NZ
      tag kubernetes.*
      format json
    </source>
    <filter kubernetes.**>
      @type kubernetes_metadata
      verify_ssl false
    </filter>
    <filter kubernetes.**>
      @type record_transformer
      enable_ruby
      <record>
        event ${record}
      </record>
      renew_record
      auto_typecast
    </filter>
    # The `all_items` paramater isn't documented, but it is necessary in order for
    # us to be able to send k8s events to splunk in a useful manner
    <match kubernetes.**>
      @type copy
      <store>
        @type splunk-http-eventcollector
        all_items true
        server localhost:8088
        protocol https
        verify false
      </store>
      <store>
        @type elasticsearch
        host localhost
        port 9200
        scheme http
        ssl_version TLSv1_2
        ssl_verify false
        </buffer>
      </store>
    </match>

下面的怎么样? (未测试)

<source>
  @type tail
  @log_level debug
  path /var/log/containers/*.log
  pos_file /var/log/kubernetes.log.pos
  time_format %Y-%m-%dT%H:%M:%S.%NZ
  tag kubernetes.*
  format json
  @label @INPUT
</source>

<label @INPUT>
  <filter kubernetes.**>
    @type kubernetes_metadata
    verify_ssl false
  </filter>
  <filter kubernetes.**>
    @type record_transformer
    enable_ruby
    <record>
      event ${record}
    </record>
    renew_record
    auto_typecast
  </filter>
  <match>
    @type relabel
    @label @RETAG
  </match>
</label>

<label @RETAG>
  <match>
    @type rewrite_tag_filter
    <rule>
      key @severity
      pattern /(INFO|ERROR)/
      tag splunk.${tag}
    </rule>
    <rule>
      key @severity
      pattern /(DEBUG|TRACE|WARN)/
      tag elasticsearch.${tag}
    </rule>
    @label @OUTPUT
  </match>
</label>

<label @OUTPUT>
  <match splunk.**>
    @type splunk-http-eventcollector
    # ... snip
  </match>
  <match elasticsearch.**>
    @type elasticsearch
    # ... snip
  </match>
</label>