基于负载类型的 Fluentd 路由

Fluentd route based on payload type

我是 Fluentd 的新手。我使用 kubernetes daemonset fluentd 从 docker 容器收集日志并将它们发送到 kafka。我有另一个 kubernetes 服务,它使用来自 kafka 的消息并将它们发送到 elasticsearh,然后是 kibana。我希望将 fluentd record 的 log 字段拆分为单独的字段,以便在 kibana 搜索查询中进一步使用。例如:

源记录:

"log" : "{\"foo\" : \"bar\"}"

输出:

"foo" : "bar"
"log" : "{\"foo\" : \"bar\"}"

我想出了以下配置:

<source>
@type kafka
brokers "#{ENV['FLUENT_KAFKA_BROKERS']}"
topics "#{ENV['FLUENT_KAFKA_TOPICS']}"
</source>

<filter "#{ENV['FLUENT_KAFKA_TOPICS']}">
@type parser
key_name log
reserve_data true
 <parse>
   @type json
 </parse>
</filter>

<match "#{ENV['FLUENT_KAFKA_TOPICS']}">
@type elasticsearch
host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
scheme "#{ENV['FLUENT_ELASTICSEARCH_SCHEME'] || 'http'}"
logstash_format true
</match>

但不幸的是,并非所有日志都是 json 格式,因此 json 解析器无法解析纯文本:ParserError error="pattern not match with data

是否可以仅在 log 字段是有效的 json 对象时应用 json 解析器?如果是纯文本,我希望按原样发送。

找到这个图书馆 https://github.com/ninadpage/fluent-plugin-parser-maybejson/

它不适用于最新的 fluentd,将创建一个 PR 来解决这个问题,只需添加:"require 'fluent/parser'"

更新:升级 fluentd 版本后插件无法按照 README 中的描述工作。我找到了另一个适用于 fluentd 1.4.0

fluent-plugin-multi-format-parser

最终得到以下配置:

<filter "#{ENV['FLUENT_KAFKA_TOPICS']}">
  @type parser
  key_name log
  reserve_data true
  <parse>
    @type multi_format
    <pattern>
      format json
    </pattern>
    <pattern>
      format none
    </pattern>
  </parse>
</filter>

<filter "#{ENV['FLUENT_KAFKA_TOPICS']}">
  @type record_transformer
  remove_keys message  # remove message from non jsom logs
</filter>