Serilog HTTP sink + Logstash:将 Serilog 消息数组拆分为单独的日志事件
Serilog HTTP sink + Logstash: Splitting Serilog message array into individual log events
我们正在使用 Serilog HTTP sink 将消息发送到 Logstash。但是HTTP报文体是这样的:
{
"events": [
{
"Timestamp": "2016-11-03T00:09:11.4899425+01:00",
"Level": "Debug",
"MessageTemplate": "Logging {@Heartbeat} from {Computer}",
"RenderedMessage": "Logging { UserName: \"Mike\", UserDomainName: \"Home\" } from \"Workstation\"",
"Properties": {
"Heartbeat": {
"UserName": "Mike",
"UserDomainName": "Home"
},
"Computer": "Workstation"
}
},
{
"Timestamp": "2016-11-03T00:09:12.4905685+01:00",
"Level": "Debug",
"MessageTemplate": "Logging {@Heartbeat} from {Computer}",
"RenderedMessage": "Logging { UserName: \"Mike\", UserDomainName: \"Home\" } from \"Workstation\"",
"Properties": {
"Heartbeat": {
"UserName": "Mike",
"UserDomainName": "Home"
},
"Computer": "Workstation"
}
}
]
}
即。日志记录事件在一个数组中进行批处理。可以一条一条发送消息,但那样还是一个单项数组。
事件随后在 Kibana 中显示为具有字段 message
和值
{
"events": [
{
// ...
},
{
// ...
}
]
}
即。从字面上看,来自 HTTP 输入的内容。
如何将 events
数组中的项目拆分为单独的日志记录事件,并将 "pull up" 属性拆分为顶层,以便我在 ElasticSearch 中有两个日志记录事件:
"Timestamp": "2016-11-03T00:09:11.4899425+01:00",
"Level": "Debug",
"MessageTemplate": "Logging {@Heartbeat} from {Computer}",
"RenderedMessage": "Logging { UserName: \"Mike\", UserDomainName: \"Home\" } from \"Workstation\"",
"Properties": {
"Heartbeat": {
"UserName": "Mike",
"UserDomainName": "Home"
},
"Computer": "Workstation"
}
"Timestamp": "2016-11-03T00:09:12.4905685+01:00",
"Level": "Debug",
"MessageTemplate": "Logging {@Heartbeat} from {Computer}",
"RenderedMessage": "Logging { UserName: \"Mike\", UserDomainName: \"Home\" } from \"Workstation\"",
"Properties": {
"Heartbeat": {
"UserName": "Mike",
"UserDomainName": "Home"
},
"Computer": "Workstation"
}
您可以使用额外的 ruby
过滤器从 sub-structure:
中提取字段,从而达到您的预期效果
filter {
split {
field => "events"
}
ruby {
code => "
event.to_hash.update(event['events'].to_hash)
event.to_hash.delete_if {|k, v| k == 'events'}
"
}
}
结果事件将如下所示:
{
"@version" => "1",
"@timestamp" => "2017-01-20T04:51:39.223Z",
"host" => "iMac.local",
"Timestamp" => "2016-11-03T00:09:12.4905685+01:00",
"Level" => "Debug",
"MessageTemplate" => "Logging {@Heartbeat} from {Computer}",
"RenderedMessage" => "Logging { UserName: \"Mike\", UserDomainName: \"Home\" } from \"Workstation\"",
"Properties" => {
"Heartbeat" => {
"UserName" => "Mike",
"UserDomainName" => "Home"
},
"Computer" => "Workstation"
}
}
升级到 Logstash 5.0 后 stopped working due to a change in the Event API:更新 event.to_hash
未反映在原始 event
中。对于 Logstash 5.0+,必须使用 event.get('field')
和 event.set('field', value)
访问器。
现在更新的解决方案是:
input {
http {
port => 8080
codec => json
}
}
filter {
split {
field => "events"
}
ruby {
code => "
event.get('events').each do |k, v|
event.set(k, v)
end
"
}
mutate {
remove_field => [ "events" ]
}
}
您现在可以通过设置 batchFormatter 来实现。默认批格式化程序会创建错误事件,但 ArrayBatchFormatter 会解决此问题:
logger.WriteTo.DurableHttpUsingFileSizeRolledBuffers(
requestUri: new Uri($"http://{elasticHost}:{elasticPort}").ToString(),
batchFormatter: new ArrayBatchFormatter());
我们正在使用 Serilog HTTP sink 将消息发送到 Logstash。但是HTTP报文体是这样的:
{
"events": [
{
"Timestamp": "2016-11-03T00:09:11.4899425+01:00",
"Level": "Debug",
"MessageTemplate": "Logging {@Heartbeat} from {Computer}",
"RenderedMessage": "Logging { UserName: \"Mike\", UserDomainName: \"Home\" } from \"Workstation\"",
"Properties": {
"Heartbeat": {
"UserName": "Mike",
"UserDomainName": "Home"
},
"Computer": "Workstation"
}
},
{
"Timestamp": "2016-11-03T00:09:12.4905685+01:00",
"Level": "Debug",
"MessageTemplate": "Logging {@Heartbeat} from {Computer}",
"RenderedMessage": "Logging { UserName: \"Mike\", UserDomainName: \"Home\" } from \"Workstation\"",
"Properties": {
"Heartbeat": {
"UserName": "Mike",
"UserDomainName": "Home"
},
"Computer": "Workstation"
}
}
]
}
即。日志记录事件在一个数组中进行批处理。可以一条一条发送消息,但那样还是一个单项数组。
事件随后在 Kibana 中显示为具有字段 message
和值
{
"events": [
{
// ...
},
{
// ...
}
]
}
即。从字面上看,来自 HTTP 输入的内容。
如何将 events
数组中的项目拆分为单独的日志记录事件,并将 "pull up" 属性拆分为顶层,以便我在 ElasticSearch 中有两个日志记录事件:
"Timestamp": "2016-11-03T00:09:11.4899425+01:00",
"Level": "Debug",
"MessageTemplate": "Logging {@Heartbeat} from {Computer}",
"RenderedMessage": "Logging { UserName: \"Mike\", UserDomainName: \"Home\" } from \"Workstation\"",
"Properties": {
"Heartbeat": {
"UserName": "Mike",
"UserDomainName": "Home"
},
"Computer": "Workstation"
}
"Timestamp": "2016-11-03T00:09:12.4905685+01:00",
"Level": "Debug",
"MessageTemplate": "Logging {@Heartbeat} from {Computer}",
"RenderedMessage": "Logging { UserName: \"Mike\", UserDomainName: \"Home\" } from \"Workstation\"",
"Properties": {
"Heartbeat": {
"UserName": "Mike",
"UserDomainName": "Home"
},
"Computer": "Workstation"
}
您可以使用额外的 ruby
过滤器从 sub-structure:
filter {
split {
field => "events"
}
ruby {
code => "
event.to_hash.update(event['events'].to_hash)
event.to_hash.delete_if {|k, v| k == 'events'}
"
}
}
结果事件将如下所示:
{
"@version" => "1",
"@timestamp" => "2017-01-20T04:51:39.223Z",
"host" => "iMac.local",
"Timestamp" => "2016-11-03T00:09:12.4905685+01:00",
"Level" => "Debug",
"MessageTemplate" => "Logging {@Heartbeat} from {Computer}",
"RenderedMessage" => "Logging { UserName: \"Mike\", UserDomainName: \"Home\" } from \"Workstation\"",
"Properties" => {
"Heartbeat" => {
"UserName" => "Mike",
"UserDomainName" => "Home"
},
"Computer" => "Workstation"
}
}
升级到 Logstash 5.0 后 event.to_hash
未反映在原始 event
中。对于 Logstash 5.0+,必须使用 event.get('field')
和 event.set('field', value)
访问器。
现在更新的解决方案是:
input {
http {
port => 8080
codec => json
}
}
filter {
split {
field => "events"
}
ruby {
code => "
event.get('events').each do |k, v|
event.set(k, v)
end
"
}
mutate {
remove_field => [ "events" ]
}
}
您现在可以通过设置 batchFormatter 来实现。默认批格式化程序会创建错误事件,但 ArrayBatchFormatter 会解决此问题:
logger.WriteTo.DurableHttpUsingFileSizeRolledBuffers(
requestUri: new Uri($"http://{elasticHost}:{elasticPort}").ToString(),
batchFormatter: new ArrayBatchFormatter());