fluentd 不解析 JSON 日志文件条目

fluentd not parsing JSON log file entry

我在 Whosebug 上看到了很多类似的问题,包括 this one。但是 none 解决了我的特殊问题。

应用程序部署在 Kubernetes (v1.15) 集群中。我正在使用基于 fluent/fluentd-docker-image GitHub 存储库 v1.9/armhf 的 docker 图像,修改为包含 elasticsearch 插件。 Elasticsearch 和 Kibana 都是 version 7.6.0.

日志将输出到标准输出,如下所示:

{"Application":"customer","HTTPMethod":"GET","HostName":"","RemoteAddr":"10.244.4.154:51776","URLPath":"/customers","level":"info","msg":"HTTP request received","time":"2020-03-10T20:17:32Z"}

在 Kibana 中,我看到了这样的东西:

{
  "_index": "logstash-2020.03.10",
  "_type": "_doc",
  "_id": "p-UZxnABBcooPsDQMBy_",
  "_version": 1,
  "_score": null,
  "_source": {
    "log": "{\"Application\":\"customer\",\"HTTPMethod\":\"GET\",\"HostName\":\"\",\"RemoteAddr\":\"10.244.4.154:46160\",\"URLPath\":\"/customers\",\"level\":\"info\",\"msg\":\"HTTP request received\",\"time\":\"2020-03-10T20:18:18Z\"}\n",
    "stream": "stdout",
    "docker": {
      "container_id": "cd1634b0ce410f3c89fe63f508fe6208396be87adf1f27fa9d47a01d81ff7904"
    },
    "kubernetes": {

我期待看到从 log: 值中提取的 JSON 有点像这样(缩写):

{
  "_index": "logstash-2020.03.10",
  ...
  "_source": {
    "log": "...",   
    "Application":"customer",
    "HTTPMethod":"GET",
    "HostName":"",
    "RemoteAddr":"10.244.4.154:46160",
    "URLPath":"/customers",
    "level":"info",
    "msg":"HTTP request received",
    "time":"2020-03-10T20:18:18Z",
    "stream": "stdout",
    "docker": {
      "container_id": "cd1634b0ce410f3c89fe63f508fe6208396be87adf1f27fa9d47a01d81ff7904"
    },
    "kubernetes": {

我的 fluentd 配置是:

match fluent.**>
  @type null
</match>

<source>
  @type tail
  path /var/log/containers/*.log
  pos_file /var/log/fluentd-containers.log.pos
  time_format %Y-%m-%dT%H:%M:%S.%NZ
  tag kubernetes.*
  format json
  read_from_head true
</source>

<match kubernetes.var.log.containers.**fluentd**.log>
  @type null
</match>
<match kubernetes.var.log.containers.**kube-system**.log>
  @type null
</match>
<filter kubernetes.**>
  @type kubernetes_metadata
</filter>

<match **>
   @type elasticsearch
   @id out_es
   @log_level info
   include_tag_key true
   host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
   port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
   path "#{ENV['FLUENT_ELASTICSEARCH_PATH']}"
   <format>
      @type json
   </format>
</match>

我确定我遗漏了什么。谁能指出我正确的方向?

谢谢, 有钱人

这个配置对我有用:

<source>
  @type tail
  path /var/log/containers/*.log,/var/log/containers/*.log
  pos_file /opt/bitnami/fluentd/logs/buffers/fluentd-docker.pos
  tag kubernetes.*
  read_from_head true
  <parse>
    @type json
    time_key time
    time_format %iso8601
  </parse>
</source>

<filter kubernetes.**>
  @type parser
  key_name "$.log"
  hash_value_field "log"
  reserve_data true
  <parse>
    @type json
  </parse> 
</filter>

<filter kubernetes.**>
  @type kubernetes_metadata
</filter>

确保编辑路径以使其符合您的用例。

发生这种情况是因为 docker 登录 /var/log/containers/*.log 将容器 STDOUT 作为字符串放在 'log' 键下,因此要将那些 JSON 日志作为字符串放在首位序列化为字符串。您需要做的是添加一个额外的步骤,该步骤将在 'log' 键下解析此字符串:

<filter kubernetes.**>
  @type parser
  key_name "$.log"
  hash_value_field "log"
  reserve_data true
  <parse>
    @type json
  </parse> 
</filter>

我有一个 json 像这样从我的容器中发出:

{"asctime": "2020-06-28 23:40:37,184", "filename": "streaming_pull_manager.py", "funcName": "_should_recover", "lineno": 648, "processName": "MainProcess", "threadName": "Thread-6", "message": "Observed recoverable stream error 504 Deadline Exceeded", "severity": "INFO"}

Kibana 显示“找不到消息”。然后我转身 google 并通过将以下代码附加到我的 kubernetes.conf:

来解决这个问题
<filter **>
  @type record_transformer
  <record>
    log_json ${record["log"]}
  </record>
</filter>


<filter **>
  @type parser
  @log_level debug
  key_name log_json
  reserve_data true
  remove_key_name_field true
  emit_invalid_record_to_error false
  <parse>
    @type json
  </parse>
</filter>

最终的 kuberenetes.json 文件如下所示:

<label @FLUENT_LOG>
  <match fluent.**>
    @type null
  </match>
</label>

<source>
  @type tail
  @id in_tail_container_logs
  path /var/log/containers/*.log
  pos_file /var/log/fluentd-containers.log.pos
  tag "#{ENV['FLUENT_CONTAINER_TAIL_TAG'] || 'kubernetes.*'}"
  exclude_path "#{ENV['FLUENT_CONTAINER_TAIL_EXCLUDE_PATH'] || use_default}"
  read_from_head true
  <parse>
    @type "#{ENV['FLUENT_CONTAINER_TAIL_PARSER_TYPE'] || 'json'}"
    time_format %Y-%m-%dT%H:%M:%S.%NZ
  </parse>
</source>

<source>
  @type tail
  @id in_tail_minion
  path /var/log/salt/minion
  pos_file /var/log/fluentd-salt.pos
  tag salt
  <parse>
    @type regexp
    expression /^(?<time>[^ ]* [^ ,]*)[^\[]*\[[^\]]*\]\[(?<severity>[^ \]]*) *\] (?<message>.*)$/
    time_format %Y-%m-%d %H:%M:%S
  </parse>
</source>

<source>
  @type tail
  @id in_tail_startupscript
  path /var/log/startupscript.log
  pos_file /var/log/fluentd-startupscript.log.pos
  tag startupscript
  <parse>
    @type syslog
  </parse>
</source>

<source>
  @type tail
  @id in_tail_docker
  path /var/log/docker.log
  pos_file /var/log/fluentd-docker.log.pos
  tag docker
  <parse>
    @type regexp
    expression /^time="(?<time>[^)]*)" level=(?<severity>[^ ]*) msg="(?<message>[^"]*)"( err="(?<error>[^"]*)")?( statusCode=($<status_code>\d+))?/
  </parse>
</source>

<source>
  @type tail
  @id in_tail_etcd
  path /var/log/etcd.log
  pos_file /var/log/fluentd-etcd.log.pos
  tag etcd
  <parse>
    @type none
  </parse>
</source>

<source>
  @type tail
  @id in_tail_kubelet
  multiline_flush_interval 5s
  path /var/log/kubelet.log
  pos_file /var/log/fluentd-kubelet.log.pos
  tag kubelet
  <parse>
    @type kubernetes
  </parse>
</source>

<source>
  @type tail
  @id in_tail_kube_proxy
  multiline_flush_interval 5s
  path /var/log/kube-proxy.log
  pos_file /var/log/fluentd-kube-proxy.log.pos
  tag kube-proxy
  <parse>
    @type kubernetes
  </parse>
</source>

<source>
  @type tail
  @id in_tail_kube_apiserver
  multiline_flush_interval 5s
  path /var/log/kube-apiserver.log
  pos_file /var/log/fluentd-kube-apiserver.log.pos
  tag kube-apiserver
  <parse>
    @type kubernetes
  </parse>
</source>

<source>
  @type tail
  @id in_tail_kube_controller_manager
  multiline_flush_interval 5s
  path /var/log/kube-controller-manager.log
  pos_file /var/log/fluentd-kube-controller-manager.log.pos
  tag kube-controller-manager
  <parse>
    @type kubernetes
  </parse>
</source>

<source>
  @type tail
  @id in_tail_kube_scheduler
  multiline_flush_interval 5s
  path /var/log/kube-scheduler.log
  pos_file /var/log/fluentd-kube-scheduler.log.pos
  tag kube-scheduler
  <parse>
    @type kubernetes
  </parse>
</source>

<source>
  @type tail
  @id in_tail_rescheduler
  multiline_flush_interval 5s
  path /var/log/rescheduler.log
  pos_file /var/log/fluentd-rescheduler.log.pos
  tag rescheduler
  <parse>
    @type kubernetes
  </parse>
</source>

<source>
  @type tail
  @id in_tail_glbc
  multiline_flush_interval 5s
  path /var/log/glbc.log
  pos_file /var/log/fluentd-glbc.log.pos
  tag glbc
  <parse>
    @type kubernetes
  </parse>
</source>

<source>
  @type tail
  @id in_tail_cluster_autoscaler
  multiline_flush_interval 5s
  path /var/log/cluster-autoscaler.log
  pos_file /var/log/fluentd-cluster-autoscaler.log.pos
  tag cluster-autoscaler
  <parse>
    @type kubernetes
  </parse>
</source>

# Example:
# 2017-02-09T00:15:57.992775796Z AUDIT: id="90c73c7c-97d6-4b65-9461-f94606ff825f" ip="104.132.1.72" method="GET" user="kubecfg" as="<self>" asgroups="<lookup>" namespace="default" uri="/api/v1/namespaces/default/pods"
# 2017-02-09T00:15:57.993528822Z AUDIT: id="90c73c7c-97d6-4b65-9461-f94606ff825f" response="200"
<source>
  @type tail
  @id in_tail_kube_apiserver_audit
  multiline_flush_interval 5s
  path /var/log/kubernetes/kube-apiserver-audit.log
  pos_file /var/log/kube-apiserver-audit.log.pos
  tag kube-apiserver-audit
  <parse>
    @type multiline
    format_firstline /^\S+\s+AUDIT:/
    # Fields must be explicitly captured by name to be parsed into the record.
    # Fields may not always be present, and order may change, so this just looks
    # for a list of key="\"quoted\" value" pairs separated by spaces.
    # Unknown fields are ignored.
    # Note: We can't separate query/response lines as format1/format2 because
    #       they don't always come one after the other for a given query.
    format1 /^(?<time>\S+) AUDIT:(?: (?:id="(?<id>(?:[^"\]|\.)*)"|ip="(?<ip>(?:[^"\]|\.)*)"|method="(?<method>(?:[^"\]|\.)*)"|user="(?<user>(?:[^"\]|\.)*)"|groups="(?<groups>(?:[^"\]|\.)*)"|as="(?<as>(?:[^"\]|\.)*)"|asgroups="(?<asgroups>(?:[^"\]|\.)*)"|namespace="(?<namespace>(?:[^"\]|\.)*)"|uri="(?<uri>(?:[^"\]|\.)*)"|response="(?<response>(?:[^"\]|\.)*)"|\w+="(?:[^"\]|\.)*"))*/
    time_format %Y-%m-%dT%T.%L%Z
  </parse>
</source>

<filter kubernetes.**>
  @type kubernetes_metadata
  @id filter_kube_metadata
  kubernetes_url "#{ENV['FLUENT_FILTER_KUBERNETES_URL'] || 'https://' + ENV.fetch('KUBERNETES_SERVICE_HOST') + ':' + ENV.fetch('KUBERNETES_SERVICE_PORT') + '/api'}"
  verify_ssl "#{ENV['KUBERNETES_VERIFY_SSL'] || true}"
  ca_file "#{ENV['KUBERNETES_CA_FILE']}"
  skip_labels "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_LABELS'] || 'false'}"
  skip_container_metadata "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_CONTAINER_METADATA'] || 'false'}"
  skip_master_url "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_MASTER_URL'] || 'false'}"
  skip_namespace_metadata "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_NAMESPACE_METADATA'] || 'false'}"
</filter>

<filter **>
  @type record_transformer
  <record>
    log_json ${record["log"]}
  </record>
</filter>


<filter **>
  @type parser
  @log_level debug
  key_name log_json
  reserve_data true
  remove_key_name_field true
  emit_invalid_record_to_error false
  <parse>
    @type json
  </parse>
</filter>

编辑:如果有人正在寻找如何覆盖流畅的 .conf 文件,尤其是 kubernetes.conf,这里有一个很棒的教程 here

我从这个解析中解决了

首先检查 http,确保它被解析,然后记录你的容器

fluentd.conf

<source>
  @type http
  port 5170
  bind 0.0.0.0
</source>

<filter *>
  @type parser
  key_name "$.log"
  hash_value_field "log"
  reserve_data true
  <parse>
    @type json
  </parse> 
</filter>

<match **>
  @type stdout
</match>

并在您的终端中使用 curl

检查 http
curl -i -X POST -d 'json={"source":"stderr","log":"{\"applicationName\":\"api-producer-go\",\"level\":\"info\",\"msg\":\"Development is Running\",\"time\":\"2020-09-04T14:32:29Z\"}","container_id":"f9975c6a7bc6dcc21dbdacca8ff98152cd04ae28b3bc36707eba5453f6ff9960","container_name":"/api-producer-golang"}' http://localhost:5170/test.cycle