如果输入相同,如何区分 logstash 过滤器中的普通日志和审核日志?

how to differentiate Normal logs and auditing logs in logstash filter, if the input is same?

我在 Kubernetes 中有一个应用程序 运行。我有两种类型的日志,

  1. 正常日志
  2. 审计日志

例如,

以下是一个日志文件的例子,它有两种数据,普通日志和审计

[INFO] [] [%{TraceId}] 2021-10-02_18:37:01.601 com.MyExample.Test Getting app config from Config server with Base URL BASE_URL for application.
[INFO] [] [%{TraceId}] 2021-10-02_18:25:11.807 Audit Hello world,test file,123454,123Addb,true

上面的例子,第一行是普通日志,第二行是审计数据。

我希望这两个日志都转到不同的索引(2 个不同的索引)并且我想为这两个日志应用不同类型的 logstash 过滤器。

关于如何解决上述问题有什么建议吗?

另外添加需求流程图: enter image description here

我当前的 logstash.conf 文件仅用于审计目的,我使用的是 .csv 文件

input {
  file {
    path => "<path_to_dir>/audit-*.csv"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}
filter {
  csv {
      separator => ","
      skip_header => "false"
      columns => ["TraceId","timestamp","fcId","Ean","source","Quantity","updateSent","message"]
  }
}
output {
   elasticsearch {
     hosts => "http://localhost:9200"
     index => "audit"
  }
stdout {}
} 

提前致谢!

好吧,这不是很干净,但如果没有标签说 auditlog.

,现在我看不到其他方法

您可以尝试将 JAVACLASSwhich is part of logstash core, with grok, probably you are about to use grok anyways later to analyze your log. Grok can tag 事件与 _grokparsefailure 进行匹配,以防审核日志中没有匹配项,而根据您所说的,不应该有。在 grok 之后,只需使用 if 并检查标签是否存在,如果存在,则为审核日志,否则为基本日志,然后根据需要处理您的事件。

要使 JAVACLASS 模式在 grok 内工作,您需要将 ecs_compatibility 设置为 v1,如 here 所述。

免责声明,这是仅基于您提供的 2 个示例的想法,这可能是完全错误的,在这种情况下请提供更多示例,因为没有清晰的模式就不可能提出解决方案。

具体例子:

日志行

[INFO] [] [123trace456] 2021-10-02_18:37:01.601 com.MyExample.Test Getting app config from Config server with Base URL BASE_URL for application.

Grok 模式

\[%{LOGLEVEL:loglevel}\] \[\] \[(?<traceId>[0-9a-z]*)\] (?<timestamp>[0-9\-_:\.]*) %{JAVACLASS:class} %{GREEDYDATA:message}

结果

{
  "loglevel": [
    [
      "INFO"
    ]
  ],
  "traceId": [
    [
      "123trace456"
    ]
  ],
  "timestamp": [
    [
      "2021-10-02_18:37:01.601"
    ]
  ],
  "class": [
    [
      "com.MyExample.Test"
    ]
  ],
  "message": [
    [
      "Getting app config from Config server with Base URL BASE_URL for application."
    ]
  ]
}

我不知道那是什么类型的时间戳,也不知道你的跟踪 ID 是什么样子,所以我只是做了任何事情,重点是 JAVACLASS 模式的用法。我通过谷歌搜索找到了很多例子,请不要忘记,google 是你的朋友!