聚合日志将第一个时间戳设置为@timestamp
Aggregated logs set first timestamp as @timestamp
我正在为“389 目录服务器”日志开发过滤器,这些日志显示用户在服务器上的操作、连接、搜索、添加、修改等的历史记录...
我正在使用聚合过滤器将所有这些日志行合并为一个事件。
但是,我希望事件的最终@timestamp(用户断开连接后)是第一个事件的@timestamp(首次建立连接时)
我试过使用日期过滤器,虽然它确实改变了每个事件(每个日志行)的@timestamp,但聚合过滤器生成的最终映射仍然使用处理日志的时间。
我可以将第一个@timestamp 保存到地图中的另一个字段,但如何用该字段替换@timestamp?
由于过滤器很长,我将只包括开始和结束:
filter {
grok {
match => { "message" => [
"^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} fd=%{NUMBER:file_descriptor} slot=%{NUMBER} %{WORD:connection_method} connection from %{IP:source} to %{IP:destination}$",
"^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} %{NOTSPACE:ssl_version} (?<encryption_method>%{NOTSPACE} %{NOTSPACE})$",
"^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} op=%{NUMBER:op_number} %{WORD:ldap_operation} dn=%{QUOTEDSTRING:user_dn} method=%{NOTSPACE:bind_method} version=%{NUMBER:ldap_version}($)?(mech=%{NOTSPACE:auth_mechanism}$)?",
"^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} op=%{NUMBER:op_number} %{WORD:ldap_operation} err=%{NUMBER:error_code} tag=%{NUMBER:tag_number} nentries=%{NUMBER:number_of_entries} etime=%{NUMBER:operation_time}($)?(dn=%{QUOTEDSTRING}$)?",
"^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} op=%{NUMBER:op_number} %{WORD:ldap_operation} base=%{QUOTEDSTRING:search_base} scope=%{NUMBER:search_scope} filter=%{QUOTEDSTRING:search_filter} attrs=%{QUOTEDSTRING:search_attributes}$",
"^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} op=%{NUMBER:op_number} %{WORD:ldap_operation}$",
"^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} op=%{NUMBER:op_number} fd=%{NUMBER:file_descriptor} %{WORD:connection_result} - %{WORD:connection_code}$"
]
}
}
if "" in [connection_method] {
aggregate {
task_id => "%{connection_id}"
code => "
map['timestamp'] = event['@timestamp']
map['tags'] ||= ['aggregated']
map['source'] = event['source']
map['destination'] = event['destination']
map['file_descriptor'] = event['file_descriptor']
map['connection_method'] = event['connection_method']
"
map_action => "create"
}
}
else if "" in [connection_code] {
mutate {
add_tag => [ "map_finished" ]
}
aggregate {
task_id => "%{connection_id}"
code => "
map['operations'][event['op_number']]['connection_code'] = event['connection_code']
map['operations'][event['op_number']]['connection_result'] = event['connection_result']
"
map_action => "update"
}
}
else {
aggregate {
task_id => "%{connection_id}"
code => "
map['@timestamp'] = map['timestamp']
"
timeout => 0
push_map_as_event_on_timeout => true
}
}
}
在过滤器中使用日期?
ISO8601 表示您的日期类型
date {
# timezone => "America/Asuncion"
match => ["timestamp", "ISO8601"]
target => "@timestamp"
}
弄清楚了,我没有realize/understand的是,当我到达最终事件并推出地图时,它(地图)将被 logstash 作为一个新事件(如来自文件的新日志行)和 logstash 将尝试将该事件与其中一个过滤器匹配,并且失败,因为最终映射不包含将匹配任何过滤器的 massage 字段。
创建新过滤器解决了问题。
filter {
if "aggregated" in [tags] {
date {
match => [ "timestamp", "dd/MMM/YYYY:HH:mm:ss Z" ]
target => "@timestamp"
add_tag => [ "tmatch" ]
}
}
}
您还可以使用以下代码行根据特定事件时间戳更新聚合事件时间戳。但是,如果您使用日志文件时间戳作为事件时间戳,请确保在聚合操作之前应用日期过滤器。
map['@timestamp'] ||= event.get('@timestamp');
我正在为“389 目录服务器”日志开发过滤器,这些日志显示用户在服务器上的操作、连接、搜索、添加、修改等的历史记录... 我正在使用聚合过滤器将所有这些日志行合并为一个事件。
但是,我希望事件的最终@timestamp(用户断开连接后)是第一个事件的@timestamp(首次建立连接时) 我试过使用日期过滤器,虽然它确实改变了每个事件(每个日志行)的@timestamp,但聚合过滤器生成的最终映射仍然使用处理日志的时间。
我可以将第一个@timestamp 保存到地图中的另一个字段,但如何用该字段替换@timestamp?
由于过滤器很长,我将只包括开始和结束:
filter {
grok {
match => { "message" => [
"^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} fd=%{NUMBER:file_descriptor} slot=%{NUMBER} %{WORD:connection_method} connection from %{IP:source} to %{IP:destination}$",
"^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} %{NOTSPACE:ssl_version} (?<encryption_method>%{NOTSPACE} %{NOTSPACE})$",
"^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} op=%{NUMBER:op_number} %{WORD:ldap_operation} dn=%{QUOTEDSTRING:user_dn} method=%{NOTSPACE:bind_method} version=%{NUMBER:ldap_version}($)?(mech=%{NOTSPACE:auth_mechanism}$)?",
"^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} op=%{NUMBER:op_number} %{WORD:ldap_operation} err=%{NUMBER:error_code} tag=%{NUMBER:tag_number} nentries=%{NUMBER:number_of_entries} etime=%{NUMBER:operation_time}($)?(dn=%{QUOTEDSTRING}$)?",
"^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} op=%{NUMBER:op_number} %{WORD:ldap_operation} base=%{QUOTEDSTRING:search_base} scope=%{NUMBER:search_scope} filter=%{QUOTEDSTRING:search_filter} attrs=%{QUOTEDSTRING:search_attributes}$",
"^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} op=%{NUMBER:op_number} %{WORD:ldap_operation}$",
"^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} op=%{NUMBER:op_number} fd=%{NUMBER:file_descriptor} %{WORD:connection_result} - %{WORD:connection_code}$"
]
}
}
if "" in [connection_method] {
aggregate {
task_id => "%{connection_id}"
code => "
map['timestamp'] = event['@timestamp']
map['tags'] ||= ['aggregated']
map['source'] = event['source']
map['destination'] = event['destination']
map['file_descriptor'] = event['file_descriptor']
map['connection_method'] = event['connection_method']
"
map_action => "create"
}
}
else if "" in [connection_code] {
mutate {
add_tag => [ "map_finished" ]
}
aggregate {
task_id => "%{connection_id}"
code => "
map['operations'][event['op_number']]['connection_code'] = event['connection_code']
map['operations'][event['op_number']]['connection_result'] = event['connection_result']
"
map_action => "update"
}
}
else {
aggregate {
task_id => "%{connection_id}"
code => "
map['@timestamp'] = map['timestamp']
"
timeout => 0
push_map_as_event_on_timeout => true
}
}
}
在过滤器中使用日期?
ISO8601 表示您的日期类型
date {
# timezone => "America/Asuncion"
match => ["timestamp", "ISO8601"]
target => "@timestamp"
}
弄清楚了,我没有realize/understand的是,当我到达最终事件并推出地图时,它(地图)将被 logstash 作为一个新事件(如来自文件的新日志行)和 logstash 将尝试将该事件与其中一个过滤器匹配,并且失败,因为最终映射不包含将匹配任何过滤器的 massage 字段。
创建新过滤器解决了问题。
filter {
if "aggregated" in [tags] {
date {
match => [ "timestamp", "dd/MMM/YYYY:HH:mm:ss Z" ]
target => "@timestamp"
add_tag => [ "tmatch" ]
}
}
}
您还可以使用以下代码行根据特定事件时间戳更新聚合事件时间戳。但是,如果您使用日志文件时间戳作为事件时间戳,请确保在聚合操作之前应用日期过滤器。
map['@timestamp'] ||= event.get('@timestamp');