基于负载类型的 Fluentd 路由
Fluentd route based on payload type
我是 Fluentd 的新手。我使用 kubernetes daemonset fluentd 从 docker 容器收集日志并将它们发送到 kafka。我有另一个 kubernetes 服务,它使用来自 kafka 的消息并将它们发送到 elasticsearh,然后是 kibana。我希望将 fluentd record 的 log
字段拆分为单独的字段,以便在 kibana 搜索查询中进一步使用。例如:
源记录:
"log" : "{\"foo\" : \"bar\"}"
输出:
"foo" : "bar"
"log" : "{\"foo\" : \"bar\"}"
我想出了以下配置:
<source>
@type kafka
brokers "#{ENV['FLUENT_KAFKA_BROKERS']}"
topics "#{ENV['FLUENT_KAFKA_TOPICS']}"
</source>
<filter "#{ENV['FLUENT_KAFKA_TOPICS']}">
@type parser
key_name log
reserve_data true
<parse>
@type json
</parse>
</filter>
<match "#{ENV['FLUENT_KAFKA_TOPICS']}">
@type elasticsearch
host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
scheme "#{ENV['FLUENT_ELASTICSEARCH_SCHEME'] || 'http'}"
logstash_format true
</match>
但不幸的是,并非所有日志都是 json 格式,因此 json 解析器无法解析纯文本:ParserError error="pattern not match with data
是否可以仅在 log
字段是有效的 json 对象时应用 json 解析器?如果是纯文本,我希望按原样发送。
找到这个图书馆
https://github.com/ninadpage/fluent-plugin-parser-maybejson/
它不适用于最新的 fluentd,将创建一个 PR 来解决这个问题,只需添加:"require 'fluent/parser'"
更新:升级 fluentd 版本后插件无法按照 README 中的描述工作。我找到了另一个适用于 fluentd 1.4.0
的
fluent-plugin-multi-format-parser
最终得到以下配置:
<filter "#{ENV['FLUENT_KAFKA_TOPICS']}">
@type parser
key_name log
reserve_data true
<parse>
@type multi_format
<pattern>
format json
</pattern>
<pattern>
format none
</pattern>
</parse>
</filter>
<filter "#{ENV['FLUENT_KAFKA_TOPICS']}">
@type record_transformer
remove_keys message # remove message from non jsom logs
</filter>
我是 Fluentd 的新手。我使用 kubernetes daemonset fluentd 从 docker 容器收集日志并将它们发送到 kafka。我有另一个 kubernetes 服务,它使用来自 kafka 的消息并将它们发送到 elasticsearh,然后是 kibana。我希望将 fluentd record 的 log
字段拆分为单独的字段,以便在 kibana 搜索查询中进一步使用。例如:
源记录:
"log" : "{\"foo\" : \"bar\"}"
输出:
"foo" : "bar"
"log" : "{\"foo\" : \"bar\"}"
我想出了以下配置:
<source>
@type kafka
brokers "#{ENV['FLUENT_KAFKA_BROKERS']}"
topics "#{ENV['FLUENT_KAFKA_TOPICS']}"
</source>
<filter "#{ENV['FLUENT_KAFKA_TOPICS']}">
@type parser
key_name log
reserve_data true
<parse>
@type json
</parse>
</filter>
<match "#{ENV['FLUENT_KAFKA_TOPICS']}">
@type elasticsearch
host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
scheme "#{ENV['FLUENT_ELASTICSEARCH_SCHEME'] || 'http'}"
logstash_format true
</match>
但不幸的是,并非所有日志都是 json 格式,因此 json 解析器无法解析纯文本:ParserError error="pattern not match with data
是否可以仅在 log
字段是有效的 json 对象时应用 json 解析器?如果是纯文本,我希望按原样发送。
找到这个图书馆 https://github.com/ninadpage/fluent-plugin-parser-maybejson/
它不适用于最新的 fluentd,将创建一个 PR 来解决这个问题,只需添加:"require 'fluent/parser'"
更新:升级 fluentd 版本后插件无法按照 README 中的描述工作。我找到了另一个适用于 fluentd 1.4.0
的fluent-plugin-multi-format-parser
最终得到以下配置:
<filter "#{ENV['FLUENT_KAFKA_TOPICS']}">
@type parser
key_name log
reserve_data true
<parse>
@type multi_format
<pattern>
format json
</pattern>
<pattern>
format none
</pattern>
</parse>
</filter>
<filter "#{ENV['FLUENT_KAFKA_TOPICS']}">
@type record_transformer
remove_keys message # remove message from non jsom logs
</filter>