Fluentd - 日志字段中的 Json 用双引号括起来
Fluentd - Json within the log field is enclosed by double quotes
通过 fluentd 转发的日志存在格式问题。问题是在所有双引号之前添加了反斜杠。
例子-
2022-02-14T10:17:46+13:00 myapp {"log":"{\"name\":\"contents\",\"hostname\":\"vcr-amyapp1-yyut-4uh57vb-rr73g\",\"pid\":876265,\"level\":20,\"req_id\":\"1644787066643:vcr-myapp1-03-e263f-v4.0.5:876265:kwljxg59:30317\",\"data\":{\"method\":\"get\",\"url\":\"/api/content/heartbeat\",\"agent\":\"Go-http-client/1.1\"},\"msg\":\"\",\"time\":\"2022-02-13T21:17:46.644Z\",\"v\":0}","container_name":"vcr-myapp1-03-e263f"}
反斜杠使日志 json 字段中的所有内容无效。也就是说,它将日志字段视为字符串而不是 json。我们需要日志 json 字段中的字段也有效 json.
应该在 fluentd 中更改哪些内容才能不添加反斜杠?我已经花了一个星期的时间深入研究这个问题,但仍然找不到任何有用的东西。
非常感谢任何帮助。
当前的 fluentd 配置文件如下 -
<system>
workers 1
</system>
<source>
@type forward
@id input1
@label @mainstream
port 24224
</source>
# Used for docker health check: healthcheck http://localhost:5000/healthcheck?json=%7B%22log%22%3A+%22health+check%22%7D
# The query parameter in the URL defines a URL-encoded JSON object that looks like this:
# {"log": "health check"}
# The container health check inputs a log message of “health check”. While the query parameter in the URL defines the log message, the path, which is /healthcheck, sets the tag for the log message. In Fluentd, log messages are tagged, which allows them to be routed to different destinations.
<source>
@type http
port 5000
bind 0.0.0.0
</source>
# records sent for health checking won't be forwarded anywhere
<match health*>
@type null
</match>
<label @mainstream>
<filter **>
@type record_modifier
remove_keys container_id,source
</filter>
<match **>
@type copy
<store>
@type file
@id output1
path /fluentd/log/data.*.log
symlink_path /fluentd/log/data.log
append true
time_slice_format %Y%m%d
time_slice_wait 10m
time_format %Y%m%dT%H%M%S%z
</store>
<store>
@type elasticsearch
host {{ env "efk__elasticsearch_host" }}
port {{ env "efk__elasticsearch_port" }}
logstash_format true
logstash_prefix fluentd
logstash_dateformat %Y%m%d
include_tag_key true
type_name access_log
tag_key @log_name
flush_interval 1s
</store>
{{ if eq (env "efk__fluent_splunk_hec_enabled") "true" }}
<store>
@type splunk_hec
protocol {{ env "efk__fluent_splunk_hec_protocol" }}
hec_host {{ env "efk__fluent_splunk_hec_host" }}
hec_port {{ env "efk__fluent_splunk_hec_port" }}
{{ if env "efk__fluent_splunk_hec_token" | regexMatch "^secret/.*" -}}
hec_token {{ with printf "%s" (env "efk__fluent_splunk_hec_token") | secret }}{{ .Data.value }}{{ end }}
{{ else -}}
hec_token {{ env "efk__fluent_splunk_hec_token" }}
{{ end }}
sourcetype ${tag}
</store>
{{ end }}
</match>
</label>
张贴答案以防对某人有帮助
必须添加一些东西才能使其正常工作。
过滤部分需要修改如下
<filter **>
@type record_modifier
remove_keys "container_id,source"
@type parser
key_name log
hash_value_field log
<parse>
@type json
</parse>
</filter>
完成下面的配置 -
<system>
workers 1
</system>
<source>
@type forward
@id input1
@label @mainstream
port 24224
</source>
# Used for docker health check: healthcheck http://localhost:5000/healthcheck?json=%7B%22log%22%3A+%22health+check%22%7D
# The query parameter in the URL defines a URL-encoded JSON object that looks like this:
# {"log": "health check"}
# The container health check inputs a log message of “health check”. While the query parameter in the URL defines the log message, the path, which is /healthcheck, sets the tag for the log message. In Fluentd, log messages are tagged, which allows them to be routed to different destinations.
<source>
@type http
port 5000
bind 0.0.0.0
</source>
# records sent for health checking won't be forwarded anywhere
<match health*>
@type null
</match>
<label @mainstream>
<filter **>
@type record_modifier
remove_keys "container_id,source"
@type parser
key_name log
hash_value_field log
<parse>
@type json
</parse>
</filter>
<match **>
@type copy
<store>
@type file
@id output1
path /fluentd/log/data.*.log
symlink_path /fluentd/log/data.log
append true
time_slice_format %Y%m%d
time_slice_wait 10m
time_format %Y%m%dT%H%M%S%z
</store>
<store>
@type elasticsearch
host {{ env "efk__elasticsearch_host" }}
port {{ env "efk__elasticsearch_port" }}
logstash_format true
logstash_prefix fluentd
logstash_dateformat %Y%m%d
include_tag_key true
type_name access_log
tag_key @log_name
flush_interval 1s
</store>
{{ if eq (env "efk__fluent_splunk_hec_enabled") "true" }}
<store>
@type splunk_hec
protocol {{ env "efk__fluent_splunk_hec_protocol" }}
hec_host {{ env "efk__fluent_splunk_hec_host" }}
hec_port {{ env "efk__fluent_splunk_hec_port" }}
{{ if env "efk__fluent_splunk_hec_token" | regexMatch "^secret/.*" -}}
hec_token {{ with printf "%s" (env "efk__fluent_splunk_hec_token") | secret }}{{ .Data.value }}{{ end }}
{{ else -}}
hec_token {{ env "efk__fluent_splunk_hec_token" }}
{{ end }}
sourcetype ${tag}
</store>
{{ end }}
</match>
</label>
通过 fluentd 转发的日志存在格式问题。问题是在所有双引号之前添加了反斜杠。
例子-
2022-02-14T10:17:46+13:00 myapp {"log":"{\"name\":\"contents\",\"hostname\":\"vcr-amyapp1-yyut-4uh57vb-rr73g\",\"pid\":876265,\"level\":20,\"req_id\":\"1644787066643:vcr-myapp1-03-e263f-v4.0.5:876265:kwljxg59:30317\",\"data\":{\"method\":\"get\",\"url\":\"/api/content/heartbeat\",\"agent\":\"Go-http-client/1.1\"},\"msg\":\"\",\"time\":\"2022-02-13T21:17:46.644Z\",\"v\":0}","container_name":"vcr-myapp1-03-e263f"}
反斜杠使日志 json 字段中的所有内容无效。也就是说,它将日志字段视为字符串而不是 json。我们需要日志 json 字段中的字段也有效 json.
应该在 fluentd 中更改哪些内容才能不添加反斜杠?我已经花了一个星期的时间深入研究这个问题,但仍然找不到任何有用的东西。
非常感谢任何帮助。
当前的 fluentd 配置文件如下 -
<system>
workers 1
</system>
<source>
@type forward
@id input1
@label @mainstream
port 24224
</source>
# Used for docker health check: healthcheck http://localhost:5000/healthcheck?json=%7B%22log%22%3A+%22health+check%22%7D
# The query parameter in the URL defines a URL-encoded JSON object that looks like this:
# {"log": "health check"}
# The container health check inputs a log message of “health check”. While the query parameter in the URL defines the log message, the path, which is /healthcheck, sets the tag for the log message. In Fluentd, log messages are tagged, which allows them to be routed to different destinations.
<source>
@type http
port 5000
bind 0.0.0.0
</source>
# records sent for health checking won't be forwarded anywhere
<match health*>
@type null
</match>
<label @mainstream>
<filter **>
@type record_modifier
remove_keys container_id,source
</filter>
<match **>
@type copy
<store>
@type file
@id output1
path /fluentd/log/data.*.log
symlink_path /fluentd/log/data.log
append true
time_slice_format %Y%m%d
time_slice_wait 10m
time_format %Y%m%dT%H%M%S%z
</store>
<store>
@type elasticsearch
host {{ env "efk__elasticsearch_host" }}
port {{ env "efk__elasticsearch_port" }}
logstash_format true
logstash_prefix fluentd
logstash_dateformat %Y%m%d
include_tag_key true
type_name access_log
tag_key @log_name
flush_interval 1s
</store>
{{ if eq (env "efk__fluent_splunk_hec_enabled") "true" }}
<store>
@type splunk_hec
protocol {{ env "efk__fluent_splunk_hec_protocol" }}
hec_host {{ env "efk__fluent_splunk_hec_host" }}
hec_port {{ env "efk__fluent_splunk_hec_port" }}
{{ if env "efk__fluent_splunk_hec_token" | regexMatch "^secret/.*" -}}
hec_token {{ with printf "%s" (env "efk__fluent_splunk_hec_token") | secret }}{{ .Data.value }}{{ end }}
{{ else -}}
hec_token {{ env "efk__fluent_splunk_hec_token" }}
{{ end }}
sourcetype ${tag}
</store>
{{ end }}
</match>
</label>
张贴答案以防对某人有帮助
必须添加一些东西才能使其正常工作。
过滤部分需要修改如下
<filter **>
@type record_modifier
remove_keys "container_id,source"
@type parser
key_name log
hash_value_field log
<parse>
@type json
</parse>
</filter>
完成下面的配置 -
<system>
workers 1
</system>
<source>
@type forward
@id input1
@label @mainstream
port 24224
</source>
# Used for docker health check: healthcheck http://localhost:5000/healthcheck?json=%7B%22log%22%3A+%22health+check%22%7D
# The query parameter in the URL defines a URL-encoded JSON object that looks like this:
# {"log": "health check"}
# The container health check inputs a log message of “health check”. While the query parameter in the URL defines the log message, the path, which is /healthcheck, sets the tag for the log message. In Fluentd, log messages are tagged, which allows them to be routed to different destinations.
<source>
@type http
port 5000
bind 0.0.0.0
</source>
# records sent for health checking won't be forwarded anywhere
<match health*>
@type null
</match>
<label @mainstream>
<filter **>
@type record_modifier
remove_keys "container_id,source"
@type parser
key_name log
hash_value_field log
<parse>
@type json
</parse>
</filter>
<match **>
@type copy
<store>
@type file
@id output1
path /fluentd/log/data.*.log
symlink_path /fluentd/log/data.log
append true
time_slice_format %Y%m%d
time_slice_wait 10m
time_format %Y%m%dT%H%M%S%z
</store>
<store>
@type elasticsearch
host {{ env "efk__elasticsearch_host" }}
port {{ env "efk__elasticsearch_port" }}
logstash_format true
logstash_prefix fluentd
logstash_dateformat %Y%m%d
include_tag_key true
type_name access_log
tag_key @log_name
flush_interval 1s
</store>
{{ if eq (env "efk__fluent_splunk_hec_enabled") "true" }}
<store>
@type splunk_hec
protocol {{ env "efk__fluent_splunk_hec_protocol" }}
hec_host {{ env "efk__fluent_splunk_hec_host" }}
hec_port {{ env "efk__fluent_splunk_hec_port" }}
{{ if env "efk__fluent_splunk_hec_token" | regexMatch "^secret/.*" -}}
hec_token {{ with printf "%s" (env "efk__fluent_splunk_hec_token") | secret }}{{ .Data.value }}{{ end }}
{{ else -}}
hec_token {{ env "efk__fluent_splunk_hec_token" }}
{{ end }}
sourcetype ${tag}
</store>
{{ end }}
</match>
</label>