在 fluentd 中仅提取和记录键的值
Extracting and logging only value of a key in fluentd
我有一个包含 JSON 消息(行分隔)的输入文件。留言格式如下
{"level":"info","message":{"accountId":99,"end_dateTime":"","id":0.22837359658442535,"log":[]}}
{"level":"info","message":{"accountId":100,"end_dateTime":"","id":0.2583,"log":[]}}
{"level":"info","message":{"accountId":200,"end_dateTime":"","id":0.5783,"log":[]}}
我想使用 fluentd 将其中的消息部分保存在 mongo 数据库中。在 mongo 中,集合中的数据应如下所示。
{
"_id": ObjectId("626a1b813c04335a858e5926"),
"accountId": 99,
"end_dateTime": "",
"id": 0.22837359658442535,
"log": []
}
我只想提取并保存输入负载的 message
键的值。
我试过使用以下配置,但它不起作用。
<source>
@type tail
@id input_tail2
read_from_head true
path "/opt/webhook-logs/webhook.log"
pos_file "/opt/webhook-logs/webhook.log.pos"
tag "td.mongo.events"
<parse>
@type "json"
unmatched_lines
</parse>
</source>
<match td.mongo.**>
@type mongo
host "127.0.0.1"
port 27017
database "vonnect"
collection "webhooklog"
user "vonnect"
password xxxxxx
buffer_chunk_limit 8m
time_key time
<buffer>
flush_interval 2s
</buffer>
<inject>
time_key time
</inject>
</match>
我也试过记录变压器过滤器但没有成功。过滤器配置看起来像
<filter td.mongo.**>
@type record_transformer
renew_record true
# enable_ruby
# auto_typecast true
<record>
${record["message"]}
</record>
</filter>
这是一个使用 record_transformer
和 parser
过滤器插件的工作示例:
fluent.conf
<source>
@type sample
@id in_sample
sample [
{"level":"info","message":{"accountId":99,"end_dateTime":"","id":0.22837359658442535,"log":[]}},
{"level":"info","message":{"accountId":100,"end_dateTime":"","id":0.2583,"log":[]}},
{"level":"info","message":{"accountId":200,"end_dateTime":"","id":0.5783,"log":[]}}
]
tag sample
</source>
<filter sample>
@type record_transformer
renew_record true
keep_keys message
enable_ruby true
<record>
message ${record["message"].to_json.to_s}
</record>
</filter>
<filter sample>
@type parser
key_name message
reserve_data true
remove_key_name_field true
<parse>
@type json
</parse>
</filter>
<match sample>
@type stdout
</match>
运行:
fluentd -c fluent.conf
输出:
2022-04-28 14:04:29.094892632 +0500 sample: {"accountId":99,"end_dateTime":"","id":0.22837359658442535,"log":[]}
2022-04-28 14:04:30.097973274 +0500 sample: {"accountId":100,"end_dateTime":"","id":0.2583,"log":[]}
2022-04-28 14:04:31.000677835 +0500 sample: {"accountId":200,"end_dateTime":"","id":0.5783,"log":[]}
record_transformer
删除多余的键并保留 message
并将其值转换为纯字符串。 parser
然后删除 message
键并将其字符串值扩展为 JSON.
参考文献:
我有一个包含 JSON 消息(行分隔)的输入文件。留言格式如下
{"level":"info","message":{"accountId":99,"end_dateTime":"","id":0.22837359658442535,"log":[]}}
{"level":"info","message":{"accountId":100,"end_dateTime":"","id":0.2583,"log":[]}}
{"level":"info","message":{"accountId":200,"end_dateTime":"","id":0.5783,"log":[]}}
我想使用 fluentd 将其中的消息部分保存在 mongo 数据库中。在 mongo 中,集合中的数据应如下所示。
{
"_id": ObjectId("626a1b813c04335a858e5926"),
"accountId": 99,
"end_dateTime": "",
"id": 0.22837359658442535,
"log": []
}
我只想提取并保存输入负载的 message
键的值。
我试过使用以下配置,但它不起作用。
<source>
@type tail
@id input_tail2
read_from_head true
path "/opt/webhook-logs/webhook.log"
pos_file "/opt/webhook-logs/webhook.log.pos"
tag "td.mongo.events"
<parse>
@type "json"
unmatched_lines
</parse>
</source>
<match td.mongo.**>
@type mongo
host "127.0.0.1"
port 27017
database "vonnect"
collection "webhooklog"
user "vonnect"
password xxxxxx
buffer_chunk_limit 8m
time_key time
<buffer>
flush_interval 2s
</buffer>
<inject>
time_key time
</inject>
</match>
我也试过记录变压器过滤器但没有成功。过滤器配置看起来像
<filter td.mongo.**>
@type record_transformer
renew_record true
# enable_ruby
# auto_typecast true
<record>
${record["message"]}
</record>
</filter>
这是一个使用 record_transformer
和 parser
过滤器插件的工作示例:
fluent.conf
<source>
@type sample
@id in_sample
sample [
{"level":"info","message":{"accountId":99,"end_dateTime":"","id":0.22837359658442535,"log":[]}},
{"level":"info","message":{"accountId":100,"end_dateTime":"","id":0.2583,"log":[]}},
{"level":"info","message":{"accountId":200,"end_dateTime":"","id":0.5783,"log":[]}}
]
tag sample
</source>
<filter sample>
@type record_transformer
renew_record true
keep_keys message
enable_ruby true
<record>
message ${record["message"].to_json.to_s}
</record>
</filter>
<filter sample>
@type parser
key_name message
reserve_data true
remove_key_name_field true
<parse>
@type json
</parse>
</filter>
<match sample>
@type stdout
</match>
运行:
fluentd -c fluent.conf
输出:
2022-04-28 14:04:29.094892632 +0500 sample: {"accountId":99,"end_dateTime":"","id":0.22837359658442535,"log":[]}
2022-04-28 14:04:30.097973274 +0500 sample: {"accountId":100,"end_dateTime":"","id":0.2583,"log":[]}
2022-04-28 14:04:31.000677835 +0500 sample: {"accountId":200,"end_dateTime":"","id":0.5783,"log":[]}
record_transformer
删除多余的键并保留 message
并将其值转换为纯字符串。 parser
然后删除 message
键并将其字符串值扩展为 JSON.
参考文献: