使用 fluentd 创建字段
Create Field using fluentd
我有使用 Fluentd 使用并发送到 Elasticsearch 的日志。如果找到字符串,我想创建一个新字段。
示例日志:
{
"@timestamp": "2021-01-29T08:05:38.613Z",
"@version": "1",
"message": "Started Application in 110.374 seconds (JVM running for 113.187)",
"level": "INFO"
}
我想创建一个新字段 STARTIME,在这种情况下,该值将是 113.187
我尝试过的是,使用 record_transformer 和 ruby 拆分来获取值,但是当它匹配时似乎从日志文件中删除了我想要的字符串。
<filter**>
@type record_transformer
enable_ruby true
<record>
STARTIME ${record["message"].split("JVM running").last.split(")")}
</record>
</filter>
如何创建具有所需值的新字段?
我现在已经使用了下面建议的选项:
<filter**>
@type record_transformer
enable_ruby true
<record>
STARTIME ${record["message"].split("JVM running for ").last.split(")")[0]}
</record>
</filter>
这让我离得更近了。现在发生的是字段 STARTIME 被创建,当日志条目匹配时,它的值为 113.187 这是正确的,但是与此模式不匹配的所有其他行都被添加到新的场.
也许这不是使用 Fluentd 转换解决此问题的直接答案,但您可以使用 Elasticsearch ingestion pipelines together with grok processor 来提取数据。这是一个模拟的例子:
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description": "Enrich logs",
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"(JVM running for %{NUMBER:start_time})"
]
}
}
]
},
"docs": [
{
"_source": {
"@timestamp": "2021-01-29T08:05:38.613Z",
"@version": "1",
"message": "Started Application in 110.374 seconds (JVM running for 113.187)",
"level": "INFO"
}
}
]
}
_source
是您提供的文档,并且有一个 grok 处理器可以从 message
字段中提取 start_time
。调用此管道会导致:
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"start_time" : "113.187",
"@timestamp" : "2021-01-29T08:05:38.613Z",
"level" : "INFO",
"@version" : "1",
"message" : "Started Application in 110.374 seconds (JVM running for 113.187)"
},
"_ingest" : {
"timestamp" : "2021-01-29T14:09:43.447147676Z"
}
}
}
]
}
您可以看到,转换后,您的文档包含 "start_time" : "113.187"
个值。
您可以尝试这样的操作:
<record>
STARTIME ${ s = record['message'][/JVM running for \d{3}.\d{3}/]; s ? s.split(' ')[-1] : nil }
</record>
STARTIME
将具有有效值,否则 null
。
我有使用 Fluentd 使用并发送到 Elasticsearch 的日志。如果找到字符串,我想创建一个新字段。
示例日志:
{
"@timestamp": "2021-01-29T08:05:38.613Z",
"@version": "1",
"message": "Started Application in 110.374 seconds (JVM running for 113.187)",
"level": "INFO"
}
我想创建一个新字段 STARTIME,在这种情况下,该值将是 113.187
我尝试过的是,使用 record_transformer 和 ruby 拆分来获取值,但是当它匹配时似乎从日志文件中删除了我想要的字符串。
<filter**>
@type record_transformer
enable_ruby true
<record>
STARTIME ${record["message"].split("JVM running").last.split(")")}
</record>
</filter>
如何创建具有所需值的新字段?
我现在已经使用了下面建议的选项:
<filter**>
@type record_transformer
enable_ruby true
<record>
STARTIME ${record["message"].split("JVM running for ").last.split(")")[0]}
</record>
</filter>
这让我离得更近了。现在发生的是字段 STARTIME 被创建,当日志条目匹配时,它的值为 113.187 这是正确的,但是与此模式不匹配的所有其他行都被添加到新的场.
也许这不是使用 Fluentd 转换解决此问题的直接答案,但您可以使用 Elasticsearch ingestion pipelines together with grok processor 来提取数据。这是一个模拟的例子:
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description": "Enrich logs",
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"(JVM running for %{NUMBER:start_time})"
]
}
}
]
},
"docs": [
{
"_source": {
"@timestamp": "2021-01-29T08:05:38.613Z",
"@version": "1",
"message": "Started Application in 110.374 seconds (JVM running for 113.187)",
"level": "INFO"
}
}
]
}
_source
是您提供的文档,并且有一个 grok 处理器可以从 message
字段中提取 start_time
。调用此管道会导致:
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"start_time" : "113.187",
"@timestamp" : "2021-01-29T08:05:38.613Z",
"level" : "INFO",
"@version" : "1",
"message" : "Started Application in 110.374 seconds (JVM running for 113.187)"
},
"_ingest" : {
"timestamp" : "2021-01-29T14:09:43.447147676Z"
}
}
}
]
}
您可以看到,转换后,您的文档包含 "start_time" : "113.187"
个值。
您可以尝试这样的操作:
<record>
STARTIME ${ s = record['message'][/JVM running for \d{3}.\d{3}/]; s ? s.split(' ')[-1] : nil }
</record>
STARTIME
将具有有效值,否则 null
。