Fluentd - 日志字段中的 Json 用双引号括起来

Fluentd - Json within the log field is enclosed by double quotes

通过 fluentd 转发的日志存在格式问题。问题是在所有双引号之前添加了反斜杠。

例子-

2022-02-14T10:17:46+13:00     myapp            {"log":"{\"name\":\"contents\",\"hostname\":\"vcr-amyapp1-yyut-4uh57vb-rr73g\",\"pid\":876265,\"level\":20,\"req_id\":\"1644787066643:vcr-myapp1-03-e263f-v4.0.5:876265:kwljxg59:30317\",\"data\":{\"method\":\"get\",\"url\":\"/api/content/heartbeat\",\"agent\":\"Go-http-client/1.1\"},\"msg\":\"\",\"time\":\"2022-02-13T21:17:46.644Z\",\"v\":0}","container_name":"vcr-myapp1-03-e263f"}

反斜杠使日志 json 字段中的所有内容无效。也就是说,它将日志字段视为字符串而不是 json。我们需要日志 json 字段中的字段也有效 json.

应该在 fluentd 中更改哪些内容才能不添加反斜杠?我已经花了一个星期的时间深入研究这个问题,但仍然找不到任何有用的东西。

非常感谢任何帮助。

当前的 fluentd 配置文件如下 -

<system>
  workers 1
</system>

<source>
  @type  forward
  @id    input1
  @label @mainstream
  port  24224
</source>

# Used for docker health check: healthcheck http://localhost:5000/healthcheck?json=%7B%22log%22%3A+%22health+check%22%7D
# The query parameter in the URL defines a URL-encoded JSON object that looks like this:
# {"log": "health check"}
# The container health check inputs a log message of “health check”. While the query parameter in the URL defines the log message, the path, which is /healthcheck, sets the tag for the log message. In Fluentd, log messages are tagged, which allows them to be routed to different destinations.
<source>
  @type http
  port 5000
  bind 0.0.0.0
</source>

# records sent for health checking won't be forwarded anywhere
<match health*>
  @type null
</match>

<label @mainstream>
  <filter **>
    @type record_modifier
    remove_keys container_id,source
  </filter>
  <match **>
    @type copy
    <store>
      @type file
      @id   output1
      path         /fluentd/log/data.*.log
      symlink_path /fluentd/log/data.log
      append       true
      time_slice_format %Y%m%d
      time_slice_wait   10m
      time_format       %Y%m%dT%H%M%S%z
    </store>
    <store>
      @type elasticsearch
      host {{ env "efk__elasticsearch_host" }}
      port {{ env "efk__elasticsearch_port" }}
      logstash_format true
      logstash_prefix fluentd
      logstash_dateformat %Y%m%d
      include_tag_key true
      type_name access_log
      tag_key @log_name
      flush_interval 1s
    </store>
{{ if eq (env "efk__fluent_splunk_hec_enabled") "true" }}
    <store>
      @type splunk_hec
      protocol {{ env "efk__fluent_splunk_hec_protocol" }}
      hec_host {{ env "efk__fluent_splunk_hec_host" }}
      hec_port {{ env "efk__fluent_splunk_hec_port" }}
      {{ if env "efk__fluent_splunk_hec_token" | regexMatch "^secret/.*" -}}
      hec_token {{ with printf "%s" (env "efk__fluent_splunk_hec_token") | secret }}{{ .Data.value }}{{ end }}
      {{ else -}}
      hec_token {{ env "efk__fluent_splunk_hec_token" }}
      {{ end }}
      sourcetype ${tag}
    </store>
{{ end }}
  </match>
</label>

张贴答案以防对某人有帮助

必须添加一些东西才能使其正常工作。

过滤部分需要修改如下

  <filter **>
    @type record_modifier
    remove_keys "container_id,source"
    @type parser
    key_name log
    hash_value_field log
    <parse>
      @type json
    </parse>
  </filter>

完成下面的配置 -

<system>
  workers 1
</system>

<source>
  @type  forward
  @id    input1
  @label @mainstream
  port  24224
</source>

# Used for docker health check: healthcheck http://localhost:5000/healthcheck?json=%7B%22log%22%3A+%22health+check%22%7D
# The query parameter in the URL defines a URL-encoded JSON object that looks like this:
# {"log": "health check"}
# The container health check inputs a log message of “health check”. While the query parameter in the URL defines the log message, the path, which is /healthcheck, sets the tag for the log message. In Fluentd, log messages are tagged, which allows them to be routed to different destinations.
<source>
  @type http
  port 5000
  bind 0.0.0.0
</source>

# records sent for health checking won't be forwarded anywhere
<match health*>
  @type null
</match>

<label @mainstream>
  <filter **>
    @type record_modifier
    remove_keys "container_id,source"
    @type parser
    key_name log
    hash_value_field log
    <parse>
      @type json
    </parse>
  </filter>
  <match **>
    @type copy
    <store>
      @type file
      @id   output1
      path         /fluentd/log/data.*.log
      symlink_path /fluentd/log/data.log
      append       true
      time_slice_format %Y%m%d
      time_slice_wait   10m
      time_format       %Y%m%dT%H%M%S%z
    </store>
    <store>
      @type elasticsearch
      host {{ env "efk__elasticsearch_host" }}
      port {{ env "efk__elasticsearch_port" }}
      logstash_format true
      logstash_prefix fluentd
      logstash_dateformat %Y%m%d
      include_tag_key true
      type_name access_log
      tag_key @log_name
      flush_interval 1s
    </store>
{{ if eq (env "efk__fluent_splunk_hec_enabled") "true" }}
    <store>
      @type splunk_hec
      protocol {{ env "efk__fluent_splunk_hec_protocol" }}
      hec_host {{ env "efk__fluent_splunk_hec_host" }}
      hec_port {{ env "efk__fluent_splunk_hec_port" }}
      {{ if env "efk__fluent_splunk_hec_token" | regexMatch "^secret/.*" -}}
      hec_token {{ with printf "%s" (env "efk__fluent_splunk_hec_token") | secret }}{{ .Data.value }}{{ end }}
      {{ else -}}
      hec_token {{ env "efk__fluent_splunk_hec_token" }}
      {{ end }}
      sourcetype ${tag}
    </store>
{{ end }}
  </match>
</label>