logstash 每 30 秒创建一个新文件
logstash create new file every 30seconds
我的 logstatsh 管道中有以下过滤器配置。它所做的是,在事件开始时,第一个过滤器创建一个带有 header 的 CSV 文件,并将文件名设置为元数据。第二个过滤器将输出写入以上 CSV。
挑战(或)要求我有 is:Every X 秒,我们需要创建新的 CSV 文件并写入该文件。我不是 ruby 专家,无法从 Google 搜索中获得任何线索。有人可以请教吗?
filter {
ruby {
init => "
begin
randval = (0...8).map { (65 + rand(26)).chr }.join
@csv_file = 'output'+randval+'.csv'
csv_headers = ['YYYY-MM-ddTHH:mm:ss.SSSZ','Log Level','Event ID']
if File.zero?(@csv_file) || !File.exist?(@csv_file)
CSV.open(@csv_file, 'w') do |csv|
csv << csv_headers
end
end
end
"
code => '
event.set("[@metadata][suffix]",@csv_file)
'
}
}
output {
file {
path => "output.log"
}
csv {
fields => [ "created", "level", "code"]
path => "%{[@metadata][suffix]}"
}
}
有趣的是,昨天有人在 discuss.elastic.co 问了这个问题,用同样不必要的 init 选项完成,所以我碰巧知道答案是
ruby {
code => '
event.set("[@metadata][suffix]", 'output' + (Time.now.to_i / 30).to_s + '.csv')
'
}
对于每 30 秒间隔内到达的任何事件,这将导致文件输出写入不同的文件。
我不知道有什么方法可以为输出写入文件的第一个事件添加 header。 csv 输出可以为每一行添加 headers。一个csv编解码器可以写一次headers,但是当文件名改变时就不会再写了。
就是说,如果您只是写入一个文件,您可以在 ruby 过滤器中进行写入,并跟踪是否已为 [= 的当前值写入 header 23=] / 30。您可以使用与此类似的方法来做到这一点。我re-purposed一些代码来自csv output.
input { heartbeat { interval => 5 message => '{ "foo": 1, "bar": 2, "baz": 3 }' } }
filter {
json { source => "message" target => "data" remove_field => [ "message" ] }
ruby {
init => '
@fields = [ "[data][bar]", "[data][baz]" ]
@csv_options = Hash.new
@spreadsheet_safe = true
'
code => '
def event_to_csv(event)
csv_values = @fields.map {|name| get_value(name, event)}
csv_values.to_csv(@csv_options)
end
def get_value(name, event)
val = event.get(name)
val.is_a?(Hash) ? LogStash::Json.dump(val) : escape_csv(val)
end
def escape_csv(val)
(@spreadsheet_safe && val.is_a?(String) && val.start_with?("=")) ? "\'#{val}" : val
end
id = Time.now.to_i / 20
file = "output" + id.to_s + ".csv"
fd = open("/tmp/#{file}", "a")
if id != @last_id
chunk = "bar,baz\n"
fd.write(chunk)
end
chunk = event_to_csv(event)
fd.write(chunk)
fd.close
@last_id = id
event.cancel
'
}
}
我的 logstatsh 管道中有以下过滤器配置。它所做的是,在事件开始时,第一个过滤器创建一个带有 header 的 CSV 文件,并将文件名设置为元数据。第二个过滤器将输出写入以上 CSV。
挑战(或)要求我有 is:Every X 秒,我们需要创建新的 CSV 文件并写入该文件。我不是 ruby 专家,无法从 Google 搜索中获得任何线索。有人可以请教吗?
filter {
ruby {
init => "
begin
randval = (0...8).map { (65 + rand(26)).chr }.join
@csv_file = 'output'+randval+'.csv'
csv_headers = ['YYYY-MM-ddTHH:mm:ss.SSSZ','Log Level','Event ID']
if File.zero?(@csv_file) || !File.exist?(@csv_file)
CSV.open(@csv_file, 'w') do |csv|
csv << csv_headers
end
end
end
"
code => '
event.set("[@metadata][suffix]",@csv_file)
'
}
}
output {
file {
path => "output.log"
}
csv {
fields => [ "created", "level", "code"]
path => "%{[@metadata][suffix]}"
}
}
有趣的是,昨天有人在 discuss.elastic.co 问了这个问题,用同样不必要的 init 选项完成,所以我碰巧知道答案是
ruby {
code => '
event.set("[@metadata][suffix]", 'output' + (Time.now.to_i / 30).to_s + '.csv')
'
}
对于每 30 秒间隔内到达的任何事件,这将导致文件输出写入不同的文件。
我不知道有什么方法可以为输出写入文件的第一个事件添加 header。 csv 输出可以为每一行添加 headers。一个csv编解码器可以写一次headers,但是当文件名改变时就不会再写了。
就是说,如果您只是写入一个文件,您可以在 ruby 过滤器中进行写入,并跟踪是否已为 [= 的当前值写入 header 23=] / 30。您可以使用与此类似的方法来做到这一点。我re-purposed一些代码来自csv output.
input { heartbeat { interval => 5 message => '{ "foo": 1, "bar": 2, "baz": 3 }' } }
filter {
json { source => "message" target => "data" remove_field => [ "message" ] }
ruby {
init => '
@fields = [ "[data][bar]", "[data][baz]" ]
@csv_options = Hash.new
@spreadsheet_safe = true
'
code => '
def event_to_csv(event)
csv_values = @fields.map {|name| get_value(name, event)}
csv_values.to_csv(@csv_options)
end
def get_value(name, event)
val = event.get(name)
val.is_a?(Hash) ? LogStash::Json.dump(val) : escape_csv(val)
end
def escape_csv(val)
(@spreadsheet_safe && val.is_a?(String) && val.start_with?("=")) ? "\'#{val}" : val
end
id = Time.now.to_i / 20
file = "output" + id.to_s + ".csv"
fd = open("/tmp/#{file}", "a")
if id != @last_id
chunk = "bar,baz\n"
fd.write(chunk)
end
chunk = event_to_csv(event)
fd.write(chunk)
fd.close
@last_id = id
event.cancel
'
}
}