EFK JSON 键作为字段

EFK JSON Keys as Fields

我在我的 kubernetes 集群中使用 EFK(elasticsearch、fluentd、kibana)堆栈进行日志记录。 一切正常,但包含最有用信息的 log 字段在 kibana 中以普通 JSON 显示。

有没有办法从 log 字段中提取这些键值并将它们显示为单独的字段?

示例:

我已经将 fluentd.conf 提取到配置映射中并尝试使用过滤器解析器

实现结果
    <filter kubernetes.var.log.containers.dealing-**.log>
      @type parser
      key_name log
      <parse>
        @type regexp
        expression  {{tried different regexes without luck}}
      </parse>
    </filter>

此时,我不确定应该配置这 3 个(elasticsearch、fluentd 或 kibana)中的哪一个来实现想要的结果。

Fluentd 配置:

    <source>
      @type prometheus
      bind "0.0.0.0"
      port 24231
      metrics_path "/metrics"
    </source>

    <source>
      @type prometheus_output_monitor
    </source>

    <match fluent.**>
      @type null
    </match>

    <source>
      @type tail
      @id in_tail_container_logs
      path "/var/log/containers/*.log"
      pos_file "/var/log/fluentd-containers.log.pos"
      tag "kubernetes.*"
      read_from_head true
      <parse>
        @type "json"
        time_format "%Y-%m-%dT%H:%M:%S.%NZ"
        time_type string
      </parse>
    </source>

    <source>
      @type tail
      @id in_tail_minion
      path "/var/log/salt/minion"
      pos_file "/var/log/fluentd-salt.pos"
      tag "salt"
      <parse>
        @type "regexp"
        expression /^(?<time>[^ ]* [^ ,]*)[^\[]*\[[^\]]*\]\[(?<severity>[^ \]]*) *\] (?<message>.*)$/
        time_format "%Y-%m-%d %H:%M:%S"
      </parse>
    </source>

    <source>
      @type tail
      @id in_tail_startupscript
      path "/var/log/startupscript.log"
      pos_file "/var/log/fluentd-startupscript.log.pos"
      tag "startupscript"
      <parse>
        @type "syslog"
      </parse>
    </source>

    <source>
      @type tail
      @id in_tail_docker
      path "/var/log/docker.log"
      pos_file "/var/log/fluentd-docker.log.pos"
      tag "docker"
      <parse>
        @type "regexp"
        expression /^time="(?<time>[^)]*)" level=(?<severity>[^ ]*) msg="(?<message>[^"]*)"( err="(?<error>[^"]*)")?( statusCode=($<status_code>\d+))?/
      </parse>
    </source>

    <source>
      @type tail
      @id in_tail_etcd
      path "/var/log/etcd.log"
      pos_file "/var/log/fluentd-etcd.log.pos"
      tag "etcd"
      <parse>
        @type "none"
      </parse>
    </source>

    <source>
      @type tail
      @id in_tail_kubelet
      multiline_flush_interval 5s
      path "/var/log/kubelet.log"
      pos_file "/var/log/fluentd-kubelet.log.pos"
      tag "kubelet"
      <parse>
        @type "kubernetes"
        expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
        time_format "%m%d %H:%M:%S.%N"
      </parse>
    </source>

    <source>
      @type tail
      @id in_tail_kube_proxy
      multiline_flush_interval 5s
      path "/var/log/kube-proxy.log"
      pos_file "/var/log/fluentd-kube-proxy.log.pos"
      tag "kube-proxy"
      <parse>
        @type "kubernetes"
        expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
        time_format "%m%d %H:%M:%S.%N"
      </parse>
    </source>

    <source>
      @type tail
      @id in_tail_kube_apiserver
      multiline_flush_interval 5s
      path "/var/log/kube-apiserver.log"
      pos_file "/var/log/fluentd-kube-apiserver.log.pos"
      tag "kube-apiserver"
      <parse>
        @type "kubernetes"
        expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
        time_format "%m%d %H:%M:%S.%N"
      </parse>
    </source>

    <source>
      @type tail
      @id in_tail_kube_controller_manager
      multiline_flush_interval 5s
      path "/var/log/kube-controller-manager.log"
      pos_file "/var/log/fluentd-kube-controller-manager.log.pos"
      tag "kube-controller-manager"
      <parse>
        @type "kubernetes"
        expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
        time_format "%m%d %H:%M:%S.%N"
      </parse>
    </source>

    <source>
      @type tail
      @id in_tail_kube_scheduler
      multiline_flush_interval 5s
      path "/var/log/kube-scheduler.log"
      pos_file "/var/log/fluentd-kube-scheduler.log.pos"
      tag "kube-scheduler"
      <parse>
        @type "kubernetes"
        expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
        time_format "%m%d %H:%M:%S.%N"
      </parse>
    </source>

    <source>
      @type tail
      @id in_tail_rescheduler
      multiline_flush_interval 5s
      path "/var/log/rescheduler.log"
      pos_file "/var/log/fluentd-rescheduler.log.pos"
      tag "rescheduler"
      <parse>
        @type "kubernetes"
        expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
        time_format "%m%d %H:%M:%S.%N"
      </parse>
    </source>

    <source>
      @type tail
      @id in_tail_glbc
      multiline_flush_interval 5s
      path "/var/log/glbc.log"
      pos_file "/var/log/fluentd-glbc.log.pos"
      tag "glbc"
      <parse>
        @type "kubernetes"
        expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
        time_format "%m%d %H:%M:%S.%N"
      </parse>
    </source>

    <source>
      @type tail
      @id in_tail_cluster_autoscaler
      multiline_flush_interval 5s
      path "/var/log/cluster-autoscaler.log"
      pos_file "/var/log/fluentd-cluster-autoscaler.log.pos"
      tag "cluster-autoscaler"
      <parse>
        @type "kubernetes"
        expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
        time_format "%m%d %H:%M:%S.%N"
      </parse>
    </source>

    <source>
      @type tail
      @id in_tail_kube_apiserver_audit
      multiline_flush_interval 5s
      path "/var/log/kubernetes/kube-apiserver-audit.log"
      pos_file "/var/log/kube-apiserver-audit.log.pos"
      tag "kube-apiserver-audit"
      <parse>
        @type "multiline"
        format_firstline "/^\S+\s+AUDIT:/"
        format1 /^(?<time>\S+) AUDIT:(?: (?:id="(?<id>(?:[^"\]|\.)*)"|ip="(?<ip>(?:[^"\]|\.)*)"|method="(?<method>(?:[^"\]|\.)*)"|user="(?<user>(?:[^"\]|\.)*)"|groups="(?<groups>(?:[^"\]|\.)*)"|as="(?<as>(?:[^"\]|\.)*)"|asgroups="(?<asgroups>(?:[^"\]|\.)*)"|namespace="(?<namespace>(?:[^"\]|\.)*)"|uri="(?<uri>(?:[^"\]|\.)*)"|response="(?<response>(?:[^"\]|\.)*)"|\w+="(?:[^"\]|\.)*"))*/
        time_format "%Y-%m-%dT%T.%L%Z"
      </parse>
    </source>

    <filter kubernetes.**>
      @type kubernetes_metadata
      @id filter_kube_metadata
      kubernetes_url "https://172.20.0.1:443/api"
      verify_ssl true
      ca_file ""
    </filter>

    <match **>
      @type elasticsearch
      @id out_es
      @log_level "info"
      include_tag_key true
      host "elasticsearch.logging.svc.cluster.local"
      port 9200
      path ""
      scheme http
      ssl_verify true
      ssl_version TLSv1
      user ""
      password xxxxxx
      reload_connections false
      reconnect_on_error true
      reload_on_failure true
      log_es_400_reason false
      logstash_prefix "logstash"
      logstash_format true
      index_name "logstash"
      type_name "fluentd"
      template_name 
      template_file 
      template_overwrite false
      <buffer>
        flush_thread_count 8
        flush_interval 5s
        chunk_limit_size 2M
        queue_limit_length 32
        retry_max_interval 30
        retry_forever true
      </buffer>
    </match>

PS:我是堆栈的新手。

尝试将此过滤器添加到匹配部分之前的 Fluentd 配置中

    <filter kubernetes.**>
      @type parser
      key_name log
      reserve_data true
      <parse>
        @type json
      </parse>
    </filter>

我能够使用以下过滤器达到想要的结果

    <filter kubernetes.**>
      @type parser
      key_name log
      <parse>
        @type json
        json_parser json
      </parse>
      replace_invalid_sequence true
      reserve_data true # this preserves unparsable log lines
      emit_invalid_record_to_error false # In case of unparsable log lines keep the error log clean
      reserve_time # the time was already parsed in the source, we don't want to overwrite it with current time.
    </filter>

感谢@Al-waleed Shihadeh 的时间和见解!