如何逐行处理 logstash 中的 http 发布文件?

How to process http posted files in logstash - line by line?

我成功配置了logstash来处理来自文件系统的csv文件,并将它们放入Elastic中进行进一步分析。 但是我们的 ELK 与 csv 文件的原始来源严重分离,所以我考虑通过 http 将 csv 文件发送到 logstash 而不是使用文件系统。

问题是,如果我使用输入 "http",整个文件将作为一大堆文件进行处理。 csv 过滤器只识别第一行。如前所述,同一文件通过 "file" 输入工作。

logstash 配置是这样的:

input {
#  http {
#    host => "localhost" 
#    port => 8080
#  }
  file {
    path => "/media/sample_files/debit_201606.csv"
    type => "items"
    start_position => "beginning" 
  }
}

filter {  
    csv {
        columns => ["Created", "Direction", "Member", "Point Value", "Type", "Sub Type"]
        separator => "  "
        convert => { "Point Value" => "integer" }
    }
    date {
        match => [ "Created", "YYYY-MM-dd HH:mm:ss" ]
        timezone => "UTC"
    }
}

output {  
#    elasticsearch {
#        action => "index"
#        hosts => ["localhost"]
#        index => "logstash-%{+YYYY.MM.dd}"
#        workers => 1
#    }
     stdout {
         codec => rubydebug
     }
}

我的目标是通过 curl 传递 csv。所以切换到上面输入区的注释部分,然后使用curl传递文件: curl http://localhost:8080/ -T /media/samples/debit_201606.csv

我需要做什么才能让 logstash 逐行处理 csv?

我试过了,我认为你需要做的是拆分你的输入。以下是您的操作方式:

我的配置:

input {
  http {
      port => 8787
  }
}

filter {
  split {}
  csv {}
}

output {
  stdout { codec => rubydebug }
}

为了测试,我创建了一个如下所示的 csv 文件:

artur@pandaadb:~/tmp/logstash$ cat test.csv 
a,b,c
d,e,f
g,h,i

现在进行测试:

artur@pandaadb:~/dev/logstash/conf3$ curl localhost:8787 -T ~/tmp/logstash/test.csv

输出:

{
       "message" => "a,b,c",
      "@version" => "1",
    "@timestamp" => "2016-08-01T15:27:17.477Z",
          "host" => "127.0.0.1",
       "headers" => {
         "request_method" => "PUT",
           "request_path" => "/test.csv",
            "request_uri" => "/test.csv",
           "http_version" => "HTTP/1.1",
              "http_host" => "localhost:8787",
        "http_user_agent" => "curl/7.47.0",
            "http_accept" => "*/*",
         "content_length" => "18",
            "http_expect" => "100-continue"
    },
       "column1" => "a",
       "column2" => "b",
       "column3" => "c"
}
{
       "message" => "d,e,f",
      "@version" => "1",
    "@timestamp" => "2016-08-01T15:27:17.477Z",
          "host" => "127.0.0.1",
       "headers" => {
         "request_method" => "PUT",
           "request_path" => "/test.csv",
            "request_uri" => "/test.csv",
           "http_version" => "HTTP/1.1",
              "http_host" => "localhost:8787",
        "http_user_agent" => "curl/7.47.0",
            "http_accept" => "*/*",
         "content_length" => "18",
            "http_expect" => "100-continue"
    },
       "column1" => "d",
       "column2" => "e",
       "column3" => "f"
}
{
       "message" => "g,h,i",
      "@version" => "1",
    "@timestamp" => "2016-08-01T15:27:17.477Z",
          "host" => "127.0.0.1",
       "headers" => {
         "request_method" => "PUT",
           "request_path" => "/test.csv",
            "request_uri" => "/test.csv",
           "http_version" => "HTTP/1.1",
              "http_host" => "localhost:8787",
        "http_user_agent" => "curl/7.47.0",
            "http_accept" => "*/*",
         "content_length" => "18",
            "http_expect" => "100-continue"
    },
       "column1" => "g",
       "column2" => "h",
       "column3" => "i"
}

拆分过滤器的作用是:

它获取您的输入消息(这是一个包含换行符的字符串)并将其拆分为配置值(默认情况下为换行符)。然后它取消原始事件并将拆分事件重新提交到 logstash。在执行 csv 过滤器之前执行拆分很重要。

希望这能回答您的问题!

亚瑟