如何逐行处理 logstash 中的 http 发布文件?
How to process http posted files in logstash - line by line?
我成功配置了logstash来处理来自文件系统的csv文件,并将它们放入Elastic中进行进一步分析。
但是我们的 ELK 与 csv 文件的原始来源严重分离,所以我考虑通过 http 将 csv 文件发送到 logstash 而不是使用文件系统。
问题是,如果我使用输入 "http",整个文件将作为一大堆文件进行处理。 csv 过滤器只识别第一行。如前所述,同一文件通过 "file" 输入工作。
logstash 配置是这样的:
input {
# http {
# host => "localhost"
# port => 8080
# }
file {
path => "/media/sample_files/debit_201606.csv"
type => "items"
start_position => "beginning"
}
}
filter {
csv {
columns => ["Created", "Direction", "Member", "Point Value", "Type", "Sub Type"]
separator => " "
convert => { "Point Value" => "integer" }
}
date {
match => [ "Created", "YYYY-MM-dd HH:mm:ss" ]
timezone => "UTC"
}
}
output {
# elasticsearch {
# action => "index"
# hosts => ["localhost"]
# index => "logstash-%{+YYYY.MM.dd}"
# workers => 1
# }
stdout {
codec => rubydebug
}
}
我的目标是通过 curl 传递 csv。所以切换到上面输入区的注释部分,然后使用curl传递文件:
curl http://localhost:8080/ -T /media/samples/debit_201606.csv
我需要做什么才能让 logstash 逐行处理 csv?
我试过了,我认为你需要做的是拆分你的输入。以下是您的操作方式:
我的配置:
input {
http {
port => 8787
}
}
filter {
split {}
csv {}
}
output {
stdout { codec => rubydebug }
}
为了测试,我创建了一个如下所示的 csv 文件:
artur@pandaadb:~/tmp/logstash$ cat test.csv
a,b,c
d,e,f
g,h,i
现在进行测试:
artur@pandaadb:~/dev/logstash/conf3$ curl localhost:8787 -T ~/tmp/logstash/test.csv
输出:
{
"message" => "a,b,c",
"@version" => "1",
"@timestamp" => "2016-08-01T15:27:17.477Z",
"host" => "127.0.0.1",
"headers" => {
"request_method" => "PUT",
"request_path" => "/test.csv",
"request_uri" => "/test.csv",
"http_version" => "HTTP/1.1",
"http_host" => "localhost:8787",
"http_user_agent" => "curl/7.47.0",
"http_accept" => "*/*",
"content_length" => "18",
"http_expect" => "100-continue"
},
"column1" => "a",
"column2" => "b",
"column3" => "c"
}
{
"message" => "d,e,f",
"@version" => "1",
"@timestamp" => "2016-08-01T15:27:17.477Z",
"host" => "127.0.0.1",
"headers" => {
"request_method" => "PUT",
"request_path" => "/test.csv",
"request_uri" => "/test.csv",
"http_version" => "HTTP/1.1",
"http_host" => "localhost:8787",
"http_user_agent" => "curl/7.47.0",
"http_accept" => "*/*",
"content_length" => "18",
"http_expect" => "100-continue"
},
"column1" => "d",
"column2" => "e",
"column3" => "f"
}
{
"message" => "g,h,i",
"@version" => "1",
"@timestamp" => "2016-08-01T15:27:17.477Z",
"host" => "127.0.0.1",
"headers" => {
"request_method" => "PUT",
"request_path" => "/test.csv",
"request_uri" => "/test.csv",
"http_version" => "HTTP/1.1",
"http_host" => "localhost:8787",
"http_user_agent" => "curl/7.47.0",
"http_accept" => "*/*",
"content_length" => "18",
"http_expect" => "100-continue"
},
"column1" => "g",
"column2" => "h",
"column3" => "i"
}
拆分过滤器的作用是:
它获取您的输入消息(这是一个包含换行符的字符串)并将其拆分为配置值(默认情况下为换行符)。然后它取消原始事件并将拆分事件重新提交到 logstash。在执行 csv 过滤器之前执行拆分很重要。
希望这能回答您的问题!
亚瑟
我成功配置了logstash来处理来自文件系统的csv文件,并将它们放入Elastic中进行进一步分析。 但是我们的 ELK 与 csv 文件的原始来源严重分离,所以我考虑通过 http 将 csv 文件发送到 logstash 而不是使用文件系统。
问题是,如果我使用输入 "http",整个文件将作为一大堆文件进行处理。 csv 过滤器只识别第一行。如前所述,同一文件通过 "file" 输入工作。
logstash 配置是这样的:
input {
# http {
# host => "localhost"
# port => 8080
# }
file {
path => "/media/sample_files/debit_201606.csv"
type => "items"
start_position => "beginning"
}
}
filter {
csv {
columns => ["Created", "Direction", "Member", "Point Value", "Type", "Sub Type"]
separator => " "
convert => { "Point Value" => "integer" }
}
date {
match => [ "Created", "YYYY-MM-dd HH:mm:ss" ]
timezone => "UTC"
}
}
output {
# elasticsearch {
# action => "index"
# hosts => ["localhost"]
# index => "logstash-%{+YYYY.MM.dd}"
# workers => 1
# }
stdout {
codec => rubydebug
}
}
我的目标是通过 curl 传递 csv。所以切换到上面输入区的注释部分,然后使用curl传递文件: curl http://localhost:8080/ -T /media/samples/debit_201606.csv
我需要做什么才能让 logstash 逐行处理 csv?
我试过了,我认为你需要做的是拆分你的输入。以下是您的操作方式:
我的配置:
input {
http {
port => 8787
}
}
filter {
split {}
csv {}
}
output {
stdout { codec => rubydebug }
}
为了测试,我创建了一个如下所示的 csv 文件:
artur@pandaadb:~/tmp/logstash$ cat test.csv
a,b,c
d,e,f
g,h,i
现在进行测试:
artur@pandaadb:~/dev/logstash/conf3$ curl localhost:8787 -T ~/tmp/logstash/test.csv
输出:
{
"message" => "a,b,c",
"@version" => "1",
"@timestamp" => "2016-08-01T15:27:17.477Z",
"host" => "127.0.0.1",
"headers" => {
"request_method" => "PUT",
"request_path" => "/test.csv",
"request_uri" => "/test.csv",
"http_version" => "HTTP/1.1",
"http_host" => "localhost:8787",
"http_user_agent" => "curl/7.47.0",
"http_accept" => "*/*",
"content_length" => "18",
"http_expect" => "100-continue"
},
"column1" => "a",
"column2" => "b",
"column3" => "c"
}
{
"message" => "d,e,f",
"@version" => "1",
"@timestamp" => "2016-08-01T15:27:17.477Z",
"host" => "127.0.0.1",
"headers" => {
"request_method" => "PUT",
"request_path" => "/test.csv",
"request_uri" => "/test.csv",
"http_version" => "HTTP/1.1",
"http_host" => "localhost:8787",
"http_user_agent" => "curl/7.47.0",
"http_accept" => "*/*",
"content_length" => "18",
"http_expect" => "100-continue"
},
"column1" => "d",
"column2" => "e",
"column3" => "f"
}
{
"message" => "g,h,i",
"@version" => "1",
"@timestamp" => "2016-08-01T15:27:17.477Z",
"host" => "127.0.0.1",
"headers" => {
"request_method" => "PUT",
"request_path" => "/test.csv",
"request_uri" => "/test.csv",
"http_version" => "HTTP/1.1",
"http_host" => "localhost:8787",
"http_user_agent" => "curl/7.47.0",
"http_accept" => "*/*",
"content_length" => "18",
"http_expect" => "100-continue"
},
"column1" => "g",
"column2" => "h",
"column3" => "i"
}
拆分过滤器的作用是:
它获取您的输入消息(这是一个包含换行符的字符串)并将其拆分为配置值(默认情况下为换行符)。然后它取消原始事件并将拆分事件重新提交到 logstash。在执行 csv 过滤器之前执行拆分很重要。
希望这能回答您的问题!
亚瑟