如何在 Fluentd 配置中定义散列(JSON 或对象)?

How to define hash (JSON or object) in Fluentd configuration?

我在为 Fluentd 的 Google 云插件格式化标签时遇到问题。我想用我自己的标签设置 logging.googleapis.com/labels 字段。这就是 Google Cloud plugin documentation 对这个字段的描述:

The value of this field should be a structured record.

Fluentd documentation提到有一个hash数据类型:

hash: the field is parsed as a JSON object. It also supports the shorthand syntax. These are the same values:

  • normal: {"key1": "value1", "key2": "value2"}
  • shorthand: key1:value1,key2:value2

所以,我尝试了这个(引号 + 大括号):

<filter app.**>
  @type record_transformer

  renew_record true

  <record>
    severity ${record["severity"]}
    message ${record["message"]}
    logging.googleapis.com/trace ${record["trace"]}
    logging.googleapis.com/spanId ${record["spanId"]}
    logging.googleapis.com/labels {
      "kubernetes-host": "${record.dig("kubernetes", "host")}",
      "kubernetes-pod-name": "${record.dig("kubernetes", "pod_name")}",
      "kubernetes-pod-id": "${record.dig("kubernetes", "pod_id")}",
      "kubernetes-pod-ip": "${record.dig("kubernetes", "pod_ip")}",
      "kubernetes-container-name": "${record.dig("kubernetes", "container_name")}",
      "kubernetes-namespace-name": "${record.dig("kubernetes", "namespace_name")}",
      "kubernetes-namespace-id": "${record.dig("kubernetes", "namespace_id")}"
    }
  </record>
</filter>

我也试过这个(没有引号,但有大括号):

<filter app.**>
  @type record_transformer

  renew_record true

  <record>
    severity ${record["severity"]}
    message ${record["message"]}
    logging.googleapis.com/trace ${record["trace"]}
    logging.googleapis.com/spanId ${record["spanId"]}
    logging.googleapis.com/labels {
      "kubernetes-host": ${record.dig("kubernetes", "host")},
      "kubernetes-pod-name": ${record.dig("kubernetes", "pod_name")},
      "kubernetes-pod-id": ${record.dig("kubernetes", "pod_id")},
      "kubernetes-pod-ip": ${record.dig("kubernetes", "pod_ip")},
      "kubernetes-container-name": ${record.dig("kubernetes", "container_name")},
      "kubernetes-namespace-name": ${record.dig("kubernetes", "namespace_name")},
      "kubernetes-namespace-id": ${record.dig("kubernetes", "namespace_id")}
    }
  </record>
</filter>

我也试过这个(没有引号和大括号):

<filter app.**>
  @type record_transformer

  renew_record true

  <record>
    severity ${record["severity"]}
    message ${record["message"]}
    logging.googleapis.com/trace ${record["trace"]}
    logging.googleapis.com/spanId ${record["spanId"]}
    logging.googleapis.com/labels {
      "kubernetes-host": record.dig("kubernetes", "host"),
      "kubernetes-pod-name": record.dig("kubernetes", "pod_name"),
      "kubernetes-pod-id": record.dig("kubernetes", "pod_id"),
      "kubernetes-pod-ip": record.dig("kubernetes", "pod_ip"),
      "kubernetes-container-name": record.dig("kubernetes", "container_name"),
      "kubernetes-namespace-name": record.dig("kubernetes", "namespace_name"),
      "kubernetes-namespace-id": record.dig("kubernetes", "namespace_id")
    }
  </record>
</filter>

但每次 Fluentd 都失败并出现以下错误:

/opt/bitnami/fluentd/gems/fluentd-1.12.0/lib/fluent/config/basic_parser.rb:92:in `parse_error!': got incomplete JSON hash configuration at fluentd.conf line 92,9 (Fluent::ConfigParseError)

我的整个配置(错误消息的第 92 行是带有 </match> 的配置的最后一行):

# Ignore fluentd own events
<match fluent.**>
  @type null
</match>

# TCP input to receive logs from
<source>
  @type forward

  bind 0.0.0.0
  port 24224
</source>

# HTTP input for the liveness and readiness probes
<source>
  @type http

  bind 0.0.0.0
  port 9880
</source>

# Throw the healthcheck to the standard output instead of forwarding it
<match fluentd.healthcheck>
  @type stdout
</match>

<filter kubernetes.var.log.containers.**.log>
  @type grep

  <regexp>
    key log
    pattern /^\[(?<logtime>[0-9T:.+-]+)\] \[(?<trace>.*?)\/(?<spanId>.*?)\] \[(?<channel>.+?)\] (?<severity>[A-Z]+): (?<message>.+)$/
  </regexp>
</filter>

<filter kubernetes.var.log.containers.**.log>
  @type parser

  key_name log

  <parse>
    @type regexp

    expression /^\[(?<logtime>[0-9T:.+-]+)\] \[(?<trace>.*?)\/(?<spanId>.*?)\] \[(?<channel>.+?)\] (?<severity>[A-Z]+): (?<message>.+)$/
    time_key logtime
    time_format %Y-%m-%dT%H:%M:%S.%L%z
  </parse>
</filter>

<match kubernetes.var.log.containers.**.log>
  @type rewrite_tag_filter

  <rule>
    key channel
    pattern /^(\w+)$/
    tag app.
  </rule>
</match>

<filter app.**>
  @type record_transformer

  renew_record true

  <record>
    severity ${record["severity"]}
    message ${record["message"]}
    logging.googleapis.com/trace ${record["trace"]}
    logging.googleapis.com/spanId ${record["spanId"]}
    logging.googleapis.com/labels {
      "kubernetes-host": record.dig("kubernetes", "host"),
      "kubernetes-pod-name": record.dig("kubernetes", "pod_name"),
      "kubernetes-pod-id": record.dig("kubernetes", "pod_id"),
      "kubernetes-pod-ip": record.dig("kubernetes", "pod_ip"),
      "kubernetes-container-name": record.dig("kubernetes", "container_name"),
      "kubernetes-namespace-name": record.dig("kubernetes", "namespace_name"),
      "kubernetes-namespace-id": record.dig("kubernetes", "namespace_id")
    }
  </record>
</filter>

<match app.**>
  @type google_cloud

  autoformat_stackdriver_trace true

  <inject>
    time_key time
    time_type string
    time_format %Y-%m-%dT%H:%M:%S.%NZ
  </inject>
</match>

如何正确定义这个值?我还没有找到任何示例,既没有使用 hash 数据类型(使用 Ruby 表达式作为值),也没有在 Google 云插件中使用 logging.googleapis.com/labels 属性。

刚刚用这个测试过(双引号内的单引号):

<filter app.**>
  @type record_transformer

  renew_record true

  <record>
    severity ${record["severity"]}
    message ${record["message"]}
    logging.googleapis.com/trace ${record["trace"]}
    logging.googleapis.com/spanId ${record["spanId"]}

    logging.googleapis.com/labels {
      "kubernetes-host": "${record.dig('kubernetes', 'host')}",
      "kubernetes-pod-name": "${record.dig('kubernetes', 'pod_name')}",
      "kubernetes-pod-id": "${record.dig('kubernetes', 'pod_id')}",
      "kubernetes-pod-ip": "${record.dig('kubernetes', 'pod_ip')}",
      "kubernetes-container-name": "${record.dig('kubernetes', 'container_name')}",
      "kubernetes-namespace-name": "${record.dig('kubernetes', 'namespace_name')}",
      "kubernetes-namespace-id": "${record.dig('kubernetes', 'namespace_id')}"
    }
  </record>
</filter>

并且,使用 --dry-run(对于不可用的插件有一些注释配置):

$ fluentd -c ./fluent-hash-test.conf --dry-run
2021-01-14 17:57:13 +0500 [info]: parsing config file is succeeded path="./fluent-hash-test.conf"
...
2021-01-14 17:57:13 +0500 [info]: using configuration file: <ROOT>
  <source>
    @type http
    bind "0.0.0.0"
    port 9880
  </source>
  <filter app.**>
    @type record_transformer
    renew_record true
    <record>
      severity ${record["severity"]}
      message ${record["message"]}
      logging.googleapis.com/trace ${record["trace"]}
      logging.googleapis.com/spanId ${record["spanId"]}
      logging.googleapis.com/labels {"kubernetes-host":"${record.dig('kubernetes', 'host')}","kubernetes-pod-name":"${record.dig('kubernetes', 'pod_name')}","kubernetes-pod-id":"${record.dig('kubernetes', 'pod_id')}","kubernetes-pod-ip":"${record.dig('kubernetes', 'pod_ip')}","kubernetes-container-name":"${record.dig('kubernetes', 'container_name')}","kubernetes-namespace-name":"${record.dig('kubernetes', 'namespace_name')}","kubernetes-namespace-id":"${record.dig('kubernetes', 'namespace_id')}"}
    </record>
  </filter>
</ROOT>
2021-01-14 17:57:13 +0500 [info]: finished dry run mode