通过 fluentd 将接收到的数据解析为 json
Parse received data as json by fluentd
我正在尝试通过 fluentd 从外部系统接收数据,如下所示:
数据={"version":"0.0";"secret":null}
响应是:
400 错误请求
'json' 或 'msgpack' 参数是必需的
如果我发送(无法更改真实来源)相同的字符串 "json" 而不是 "data" (例如 json={"version":"0.0";"secret":null}),一切正常。我如何配置 fluentd 以同样的方式接受它?谢谢
fluent.conf 示例:
<source>
@type http
port 24224
bind 0.0.0.0
# accept "{"key":"value"} input
format json
# accept "json={"key":"value"} input
#format default
</source>
<match **>
@type file
@id output1
path /fluentd/log/data.*.log
symlink_path /fluentd/log/data.log
format json
append true
time_slice_format %Y%m%d
time_slice_wait 10m
time_format %Y%m%dT%H%M%S%z
</match>
我尝试过使用正则表达式或通过 nginx 修改数据。由于编码和复杂的数据,正则表达式是不可能的,并且没有找到如何使用 nginx 修改 POST 数据的方法(这也是不好的方法)。
http://docs.fluentd.org/articles/in_http
本文显示了可接受的格式。
How can i config fluentd to accept it same way?
意思是要用format json
解析data={"k":"v"}
?
如果是这样,它不能。
我自己回答。在尝试了很多配置(以及阅读 fluentd/nginx 和博客的官方文档数小时)之后,我决定创建插件 (http://docs.fluentd.org/articles/plugin-development#parser-plugins)。我已经用这个解决方案结束了:
解析器插件
module Fluent
class TextParser
class CMXParser < Parser
# Register this parser
Plugin.register_parser("parser_CMX", self)
config_param :format_hash, :string, :default => "data" # delimiter is configurable with " " as default
def configure(conf)
super
end
# This is the main method. The input "text" is the unit of data to be parsed.
def parse(text)
text = WEBrick::HTTPUtils.parse_query(text)
record = JSON.parse(text[@format_hash])
yield nil, record
end
end
end
end
Fluentd 配置
<source>
@type http
port 24224
bind 0.0.0.0
body_size_limit 32m
keepalive_timeout 5s
format parser_CMX
</source>
<match **>
@type file
@id output1
path /fluentd/log/data.*.log
symlink_path /fluentd/log/data.log
format json
append true
time_slice_format %Y%m%d
time_slice_wait 10m
time_format %Y%m%dT%H%M%S%z
</match>
我认为有 space 可以将此实现到核心代码,因为基础 in_http 脚本做同样的事情,只是它只使用硬编码字符串 "params['json']"。它可以使用像 "format_hash"/"format_map" 这样的新变量,其中可以包含用于此目的的地图。
我正在尝试通过 fluentd 从外部系统接收数据,如下所示: 数据={"version":"0.0";"secret":null}
响应是: 400 错误请求 'json' 或 'msgpack' 参数是必需的
如果我发送(无法更改真实来源)相同的字符串 "json" 而不是 "data" (例如 json={"version":"0.0";"secret":null}),一切正常。我如何配置 fluentd 以同样的方式接受它?谢谢
fluent.conf 示例:
<source>
@type http
port 24224
bind 0.0.0.0
# accept "{"key":"value"} input
format json
# accept "json={"key":"value"} input
#format default
</source>
<match **>
@type file
@id output1
path /fluentd/log/data.*.log
symlink_path /fluentd/log/data.log
format json
append true
time_slice_format %Y%m%d
time_slice_wait 10m
time_format %Y%m%dT%H%M%S%z
</match>
我尝试过使用正则表达式或通过 nginx 修改数据。由于编码和复杂的数据,正则表达式是不可能的,并且没有找到如何使用 nginx 修改 POST 数据的方法(这也是不好的方法)。
http://docs.fluentd.org/articles/in_http
本文显示了可接受的格式。
How can i config fluentd to accept it same way?
意思是要用format json
解析data={"k":"v"}
?
如果是这样,它不能。
我自己回答。在尝试了很多配置(以及阅读 fluentd/nginx 和博客的官方文档数小时)之后,我决定创建插件 (http://docs.fluentd.org/articles/plugin-development#parser-plugins)。我已经用这个解决方案结束了:
解析器插件
module Fluent class TextParser class CMXParser < Parser # Register this parser Plugin.register_parser("parser_CMX", self) config_param :format_hash, :string, :default => "data" # delimiter is configurable with " " as default def configure(conf) super end # This is the main method. The input "text" is the unit of data to be parsed. def parse(text) text = WEBrick::HTTPUtils.parse_query(text) record = JSON.parse(text[@format_hash]) yield nil, record end end end end
Fluentd 配置
<source> @type http port 24224 bind 0.0.0.0 body_size_limit 32m keepalive_timeout 5s format parser_CMX </source> <match **> @type file @id output1 path /fluentd/log/data.*.log symlink_path /fluentd/log/data.log format json append true time_slice_format %Y%m%d time_slice_wait 10m time_format %Y%m%dT%H%M%S%z </match>
我认为有 space 可以将此实现到核心代码,因为基础 in_http 脚本做同样的事情,只是它只使用硬编码字符串 "params['json']"。它可以使用像 "format_hash"/"format_map" 这样的新变量,其中可以包含用于此目的的地图。