将 Logstash 中的时间戳时区转换为输出索引名称
Convert timestamp timezone in Logstash for output index name
在我的场景中,Logstash 收到的系统日志行的 "timestamp" 是 UTC,我们在 Elasticsearch 输出中使用事件 "timestamp":
output {
elasticsearch {
embedded => false
host => localhost
port => 9200
protocol => http
cluster => 'elasticsearch'
index => "syslog-%{+YYYY.MM.dd}"
}
}
我的问题是,在 UTC 午夜,Logstash 在外时区 (GMT-4 => America/Montreal) 结束前将日志发送到不同的索引,并且索引在 20 小时 (8 小时) 后没有日志下午)因为 "timestamp" 是 UTC。
我们已经完成了转换时区的工作,但我们遇到了显着的性能下降:
filter {
mutate {
add_field => {
# Create a new field with string value of the UTC event date
"timestamp_zoned" => "%{@timestamp}"
}
}
date {
# Parse UTC string value and convert it to my timezone into a new field
match => [ "timestamp_zoned", "yyyy-MM-dd HH:mm:ss Z" ]
timezone => "America/Montreal"
locale => "en"
remove_field => [ "timestamp_zoned" ]
target => "timestamp_zoned_obj"
}
ruby {
# Output the zoned date to a new field
code => "event['index_day'] = event['timestamp_zoned_obj'].strftime('%Y.%m.%d')"
remove_field => [ "timestamp_zoned_obj" ]
}
}
output {
elasticsearch {
embedded => false
host => localhost
port => 9200
protocol => http
cluster => 'elasticsearch'
# Use of the string value
index => "syslog-%{index_day}"
}
}
有没有办法优化这个配置?
这是优化配置,请尝试测试性能。
您无需使用 mutate
和 date
插件。直接使用 ruby
插件。
input {
stdin {
}
}
filter {
ruby {
code => "
event['index_day'] = event['@timestamp'].localtime.strftime('%Y.%m.%d')
"
}
}
output {
stdout { codec => rubydebug }
}
示例输出:
{
"message" => "test",
"@version" => "1",
"@timestamp" => "2015-03-30T05:27:06.310Z",
"host" => "BEN_LIM",
"index_day" => "2015.03.29"
}
在1.5.0版本中,我们可以将时间戳转换为索引名称的本地时区。这是我的配置:
filter {
ruby {
code => "event['index_day'] = event.timestamp.time.localtime.strftime('%Y.%m.%d')"
}
}
output {
elasticsearch {
host => localhost
index => "thrall-%{index_day}"
}
}
在Logstash Version 5.0.2中,修改了API。我们可以通过本地时区将时间戳转换为索引名称。这是我的配置:
filter {
ruby {
code => "event['index_day'] = event.timestamp.time.localtime.strftime('%Y.%m.%d')"
}
}
在logstash version 5.0 and later中,你可以这样使用:
filter{
ruby {
code => "event.set('index_day', event.get('[@timestamp]').time.localtime.strftime('%Y%m%d'))"
}
}
类似用例 - 但使用 logstash file output plugin 并写入日期为 事件到达 当地时间的文件。
在 logstash version 7.12
.
上验证
改编自discuss.elastic.co,主要是用零填充偏移时间。注意!如果您的偏移量有半小时,您将需要相应地进行调整。
filter {
ruby {
code => "
require 'tzinfo'
tz = 'Europe/Oslo'
offset = TZInfo::Timezone.get(tz).current_period.utc_total_offset / (60*60)
event.set('[@metadata][local_date]',
event.get('@timestamp').time.localtime(
sprintf('+%02i:00', offset.to_s)
).strftime('%Y%m%d'))
"
}
if ([agent][type] == "filebeat") {
mutate {
add_field => ["file_path", "%{[host][name]}_%{[log][file][path]}.%{[@metadata][local_date]}"]
}
} else {
mutate {
add_field => ["file_path", "%{[agent][hostname]}_%{[agent][type]}.%{[@metadata][local_date]}"]
}
}
}
在我的场景中,Logstash 收到的系统日志行的 "timestamp" 是 UTC,我们在 Elasticsearch 输出中使用事件 "timestamp":
output {
elasticsearch {
embedded => false
host => localhost
port => 9200
protocol => http
cluster => 'elasticsearch'
index => "syslog-%{+YYYY.MM.dd}"
}
}
我的问题是,在 UTC 午夜,Logstash 在外时区 (GMT-4 => America/Montreal) 结束前将日志发送到不同的索引,并且索引在 20 小时 (8 小时) 后没有日志下午)因为 "timestamp" 是 UTC。
我们已经完成了转换时区的工作,但我们遇到了显着的性能下降:
filter {
mutate {
add_field => {
# Create a new field with string value of the UTC event date
"timestamp_zoned" => "%{@timestamp}"
}
}
date {
# Parse UTC string value and convert it to my timezone into a new field
match => [ "timestamp_zoned", "yyyy-MM-dd HH:mm:ss Z" ]
timezone => "America/Montreal"
locale => "en"
remove_field => [ "timestamp_zoned" ]
target => "timestamp_zoned_obj"
}
ruby {
# Output the zoned date to a new field
code => "event['index_day'] = event['timestamp_zoned_obj'].strftime('%Y.%m.%d')"
remove_field => [ "timestamp_zoned_obj" ]
}
}
output {
elasticsearch {
embedded => false
host => localhost
port => 9200
protocol => http
cluster => 'elasticsearch'
# Use of the string value
index => "syslog-%{index_day}"
}
}
有没有办法优化这个配置?
这是优化配置,请尝试测试性能。
您无需使用 mutate
和 date
插件。直接使用 ruby
插件。
input {
stdin {
}
}
filter {
ruby {
code => "
event['index_day'] = event['@timestamp'].localtime.strftime('%Y.%m.%d')
"
}
}
output {
stdout { codec => rubydebug }
}
示例输出:
{
"message" => "test",
"@version" => "1",
"@timestamp" => "2015-03-30T05:27:06.310Z",
"host" => "BEN_LIM",
"index_day" => "2015.03.29"
}
在1.5.0版本中,我们可以将时间戳转换为索引名称的本地时区。这是我的配置:
filter {
ruby {
code => "event['index_day'] = event.timestamp.time.localtime.strftime('%Y.%m.%d')"
}
}
output {
elasticsearch {
host => localhost
index => "thrall-%{index_day}"
}
}
在Logstash Version 5.0.2中,修改了API。我们可以通过本地时区将时间戳转换为索引名称。这是我的配置:
filter {
ruby {
code => "event['index_day'] = event.timestamp.time.localtime.strftime('%Y.%m.%d')"
}
}
在logstash version 5.0 and later中,你可以这样使用:
filter{
ruby {
code => "event.set('index_day', event.get('[@timestamp]').time.localtime.strftime('%Y%m%d'))"
}
}
类似用例 - 但使用 logstash file output plugin 并写入日期为 事件到达 当地时间的文件。
在 logstash version 7.12
.
改编自discuss.elastic.co,主要是用零填充偏移时间。注意!如果您的偏移量有半小时,您将需要相应地进行调整。
filter {
ruby {
code => "
require 'tzinfo'
tz = 'Europe/Oslo'
offset = TZInfo::Timezone.get(tz).current_period.utc_total_offset / (60*60)
event.set('[@metadata][local_date]',
event.get('@timestamp').time.localtime(
sprintf('+%02i:00', offset.to_s)
).strftime('%Y%m%d'))
"
}
if ([agent][type] == "filebeat") {
mutate {
add_field => ["file_path", "%{[host][name]}_%{[log][file][path]}.%{[@metadata][local_date]}"]
}
} else {
mutate {
add_field => ["file_path", "%{[agent][hostname]}_%{[agent][type]}.%{[@metadata][local_date]}"]
}
}
}