通过 fluentd 将接收到的数据解析为 json

Parse received data as json by fluentd

我正在尝试通过 fluentd 从外部系统接收数据,如下所示: 数据={"version":"0.0";"secret":null}

响应是: 400 错误请求 'json' 或 'msgpack' 参数是必需的

如果我发送(无法更改真实来源)相同的字符串 "json" 而不是 "data" (例如 json={"version":"0.0";"secret":null}),一切正常。我如何配置 fluentd 以同样的方式接受它?谢谢

fluent.conf 示例:

<source>                                                  
  @type http                                              
  port 24224                                              
  bind 0.0.0.0          

  # accept "{"key":"value"} input                                    
  format json   

  # accept "json={"key":"value"} input                                    
  #format default
</source>                                               
<match **>                                              
  @type file                                            
  @id   output1                                         
  path         /fluentd/log/data.*.log                  
  symlink_path /fluentd/log/data.log                   
  format json                                           
  append       true                                     
  time_slice_format %Y%m%d                              
  time_slice_wait   10m                                 
  time_format       %Y%m%dT%H%M%S%z                     
</match>

我尝试过使用正则表达式或通过 nginx 修改数据。由于编码和复杂的数据,正则表达式是不可能的,并且没有找到如何使用 nginx 修改 POST 数据的方法(这也是不好的方法)。

http://docs.fluentd.org/articles/in_http

本文显示了可接受的格式。

How can i config fluentd to accept it same way?

意思是要用format json解析data={"k":"v"}? 如果是这样,它不能。

我自己回答。在尝试了很多配置(以及阅读 fluentd/nginx 和博客的官方文档数小时)之后,我决定创建插件 (http://docs.fluentd.org/articles/plugin-development#parser-plugins)。我已经用这个解决方案结束了:

  1. 解析器插件

    module Fluent
      class TextParser
        class CMXParser < Parser
          # Register this parser
          Plugin.register_parser("parser_CMX", self)
    
          config_param :format_hash, :string, :default => "data" #  delimiter is configurable with " " as default
    
          def configure(conf)
            super
          end
    
          # This is the main method. The input "text" is the unit of data to be parsed.
          def parse(text)
            text = WEBrick::HTTPUtils.parse_query(text)
            record = JSON.parse(text[@format_hash])
            yield nil, record
          end
        end
      end
    end
    
  2. Fluentd 配置

    <source>                                
      @type http                            
      port 24224                            
      bind 0.0.0.0                          
      body_size_limit 32m                   
      keepalive_timeout 5s                  
      format parser_CMX
    </source>                     
    
    <match **>                            
      @type file                          
      @id   output1                       
      path         /fluentd/log/data.*.log
      symlink_path /fluentd/log/data.log  
      format json                         
      append       true                   
      time_slice_format %Y%m%d            
      time_slice_wait   10m               
      time_format       %Y%m%dT%H%M%S%z   
    </match>                    
    

我认为有 space 可以将此实现到核心代码,因为基础 in_http 脚本做同样的事情,只是它只使用硬编码字符串 "params['json']"。它可以使用像 "format_hash"/"format_map" 这样的新变量,其中可以包含用于此目的的地图。