在 fluentd 中仅提取和记录键的值

Extracting and logging only value of a key in fluentd

我有一个包含 JSON 消息(行分隔)的输入文件。留言格式如下

{"level":"info","message":{"accountId":99,"end_dateTime":"","id":0.22837359658442535,"log":[]}}
{"level":"info","message":{"accountId":100,"end_dateTime":"","id":0.2583,"log":[]}}
{"level":"info","message":{"accountId":200,"end_dateTime":"","id":0.5783,"log":[]}}

我想使用 fluentd 将其中的消息部分保存在 mongo 数据库中。在 mongo 中,集合中的数据应如下所示。

{
    "_id": ObjectId("626a1b813c04335a858e5926"),
    "accountId": 99,
    "end_dateTime": "",
    "id": 0.22837359658442535,
    "log": []
}

我只想提取并保存输入负载的 message 键的值。

我试过使用以下配置,但它不起作用。

<source>
  @type tail
  @id input_tail2
  read_from_head true
  path "/opt/webhook-logs/webhook.log"
  pos_file "/opt/webhook-logs/webhook.log.pos"
  tag "td.mongo.events"
  <parse>
    @type "json"
    unmatched_lines 
  </parse>
</source>
<match td.mongo.**>
  @type mongo
  host "127.0.0.1"
  port 27017
  database "vonnect"
  collection "webhooklog"
  user "vonnect"
  password xxxxxx
  buffer_chunk_limit 8m
  time_key time
  <buffer>
    flush_interval 2s
  </buffer>
  <inject>
    time_key time
  </inject>
</match>

我也试过记录变压器过滤器但没有成功。过滤器配置看起来像

<filter td.mongo.**>
  @type record_transformer
  renew_record true
  # enable_ruby
  # auto_typecast true
  <record>
    ${record["message"]}
  </record>
</filter>

这是一个使用 record_transformerparser 过滤器插件的工作示例:

fluent.conf

<source>
  @type sample
  @id in_sample
  sample [
    {"level":"info","message":{"accountId":99,"end_dateTime":"","id":0.22837359658442535,"log":[]}},
    {"level":"info","message":{"accountId":100,"end_dateTime":"","id":0.2583,"log":[]}},
    {"level":"info","message":{"accountId":200,"end_dateTime":"","id":0.5783,"log":[]}}
  ]
  tag sample
</source>

<filter sample>
  @type record_transformer
  renew_record true
  keep_keys message
  enable_ruby true
  <record>
    message ${record["message"].to_json.to_s}
  </record>
</filter>

<filter sample>
  @type parser
  key_name message
  reserve_data true
  remove_key_name_field true
  <parse>
    @type json
  </parse>
</filter>

<match sample>
  @type stdout
</match>

运行:

fluentd -c fluent.conf

输出:

2022-04-28 14:04:29.094892632 +0500 sample: {"accountId":99,"end_dateTime":"","id":0.22837359658442535,"log":[]}
2022-04-28 14:04:30.097973274 +0500 sample: {"accountId":100,"end_dateTime":"","id":0.2583,"log":[]}
2022-04-28 14:04:31.000677835 +0500 sample: {"accountId":200,"end_dateTime":"","id":0.5783,"log":[]}

record_transformer 删除多余的键并保留 message 并将其值转换为纯字符串。 parser 然后删除 message 键并将其字符串值扩展为 JSON.

参考文献: