基于 Logstash 内容的过滤,分为多个索引

Logstash content based filtering, into multiple indexs

我目前正在从 S3 存储桶中提取 JSON 日志文件,其中包含定义为 RawLog 的不同类型的日志,以及另一个值 MessageSourceType(还有更多我不关心的元数据字段).文件中的每一行都是一个单独的日志,以防产生差异。

我目前将所有这些都放入 1 个索引中,如下面的配置所示,但是,理想情况下,我想将它们分成单独的索引。例如,如果 MessageSourceType = Syslog - Linux Host 那么我需要 logstash 将 RawLog 提取为 syslog 并将其放入名为 logs-syslog 的索引中,而如果 MessageSourceType = MS Windows Event Logging XML 我希望它将 RawLog 提取为 XML 并将其放置在名为 logs-MS_Event_logs.

的索引中
filter {
   mutate {
     replace => [ "message", "%{message}" ]
   }
   json {
        source => "message"
        remove_field => "message"
   }
}

output {
   elasticsearch {
     hosts => ["http://xx.xx.xx.xx:xxxx","http://xx.xx.xx.xx:xxxx"]
     index => "logs-received"
 }

这里还有一些日志示例:

{"MsgClassTypeId":"3000","Direction":"0","ImpactedZoneEnum":"0","message":"<30>Feb 13 23:45:24 xx.xx.xx.xx Account=\"\" Action=\"\" Aggregate=\"False\" Amount=\"\" Archive=\"True\" BytesIn=\"\" BytesOut=\"\" CollectionSequence=\"825328\" Command=\"\" CommonEventId=\"3\" CommonEventName=\"General Operations\" CVE=\"\" DateInserted=\"2/13/2021 11:45:24 PM\" DInterface=\"\" DIP=\"\" Direction=\"0\" DirectionName=\"Unknown\" DMAC=\"\" DName=\"\" DNameParsed=\"\" DNameResolved=\"\" DNATIP=\"\" DNATPort=\"-1\" Domain=\"\" DomainOrigin=\"\" DPort=\"-1\" DropLog=\"False\" DropRaw=\"False\" Duration=\"\" EntityId=\"" EventClassification=\"-1\" EventCommonEventID=\"-1\" FalseAlarmRating=\"0\" Forward=\"False\" ForwardToLogMart=\"False\" GLPRAssignedRBP=\"-1\" Group=\"\" HasBeenInserted_EMDB=\"False\" HasBeenQueued_Archiving=\"True\" HasBeenQueued_EventProcessor=\"False\" HasBeenQueued_LogProcessor=\"True\" Hash=\"\" HostID=\"44\" IgnoreGlobalRBPCriteria=\"False\" ImpactedEntityId=\"0\" ImpactedEntityName=\"\" ImpactedHostId=\"-1\" ImpactedHostName=\"\" ImpactedLocationKey=\"\" ImpactedLocationName=\"\" ImpactedNetworkId=\"-1\" ImpactedNetworkName=\"\" ImpactedZoneEnum=\"0\" ImpactedZoneName=\"\" IsDNameParsedValue=\"True\" IsRemote=\"True\" IsSNameParsedValue=\"True\" ItemsIn=\"\" ItemsOut=\"\" LDSVERSION=\"1.1\" Login=\"\" LogMartMode=\"13627389\" LogSourceId=\"158\" LogSourceName=\"ip-xx-xx-xx-xx.eu-west-2.computer.internal Linux Syslog\" MediatorMsgID=\"0\" MediatorSessionID=\"1640\" MsgClassId=\"3999\" MsgClassName=\"Other Operations\" MsgClassTypeId=\"3000\" MsgClassTypeName=\"Operations\" MsgCount=\"1\" MsgDate=\"2021-02-13T23:45:24.0000000+00:00\" MsgDateOrigin=\"0\" MsgSourceHostID=\"44\" MsgSourceTypeId=\"88\" MsgSourceTypeName=\"Syslog - Linux Host\" NormalMsgDate=\"2021-02-13T23:45:24.0540000Z\" Object=\"\" ObjectName=\"\" ObjectType=\"\" OriginEntityId=\"0\" OriginEntityName=\"\" OriginHostId=\"-1\" OriginHostName=\"\" OriginLocationKey=\"\" OriginLocationName=\"\" OriginNetworkId=\"-1\" OriginNetworkName=\"\" OriginZoneEnum=\"0\" OriginZoneName=\"\" ParentProcessId=\"\" ParentProcessName=\"\" ParentProcessPath=\"\" PID=\"-1\" Policy=\"\" Priority=\"4\" Process=\"\" ProtocolId=\"-1\" ProtocolName=\"\" Quantity=\"\" Rate=\"\" Reason=\"\" Recipient=\"\" RecipientIdentity=\"\" RecipientIdentityCompany=\"\" RecipientIdentityDepartment=\"\" RecipientIdentityDomain=\"\" RecipientIdentityID=\"-1\" RecipientIdentityTitle=\"\" ResolvedImpactedName=\"\" ResolvedOriginName=\"\" ResponseCode=\"\" Result=\"\" RiskRating=\"0\" RootEntityId=\"9\" Sender=\"\" SenderIdentity=\"\" SenderIdentityCompany=\"\" SenderIdentityDepartment=\"\" SenderIdentityDomain=\"\" SenderIdentityID=\"-1\" SenderIdentityTitle=\"\" SerialNumber=\"\" ServiceId=\"-1\" ServiceName=\"\" Session=\"\" SessionType=\"\" Severity=\"\" SInterface=\"\" SIP=\"\" Size=\"\" SMAC=\"\" SName=\"\" SNameParsed=\"\" SNameResolved=\"\" SNATIP=\"\" SNATPort=\"-1\" SPort=\"-1\" Status=\"\" Subject=\"\" SystemMonitorID=\"9\" ThreatId=\"\" ThreatName=\"\" UniqueID=\"7d4c4ed3-a2fc-44bc-a7ec-0b8b68e7f456\" URL=\"\" UserAgent=\"\" UserImpactedIdentity=\"\" UserImpactedIdentityCompany=\"\" UserImpactedIdentityDomain=\"\" UserImpactedIdentityID=\"-1\" UserImpactedIdentityTitle=\"\" UserOriginIdentity=\"\" UserOriginIdentityCompany=\"\" UserOriginIdentityDepartment=\"\" UserOriginIdentityDomain=\"\" UserOriginIdentityID=\"-1\" UserOriginIdentityTitle=\"\" VendorInfo=\"\" VendorMsgID=\"\" Version=\"\" RawLog=\"02 13 2021 23:45:24 xx.xx.xx.xx <SYSD:INFO> Feb 13 23:45:24 euw2-ec2--001 metricbeat[3031]: 2021-02-13T23:45:24.264Z#011ERROR#011[logstash.node_stats]#011node_stats/node_stats.go:73#011error making http request: Get \\"https://xx.xx.xx.xx:9600/\\": dial tcp xx.xx.xx.xx:9600: connect: connection refused\"","CollectionSequence":"825328","NormalMsgDate":"2021-02-13T23:45:24.0540000Z"}

我有点不确定实现此目标的最佳方法,我想你们可能会提出一些建议。我研究了 grok 并认为这可能会实现我的 objective 但是我不确定从哪里开始。

您可以在 filter 部分使用 conditionals 并根据您正在解析的日志类型定义目标索引。

filter {
  ... other filters ...

  if [MsgSourceTypeName] == "Syslog - Linux Host" {
    mutate {
      add_field => {
        "[@metadata][target_index]" => "logs-syslog"
      }
    }
  }
  else if [MsgSourceTypeName] == "MS Windows Event Logging XML" {
    mutate {
      add_field => {
        "[@metadata][target_index]" => "logs-ms_event_log"
      }
    }
  }
}
output {
   elasticsearch {
     hosts => ["http://xx.xx.xx.xx:xxxx","http://xx.xx.xx.xx:xxxx"]
     index => "%{[@metadata][target_index]}"
   }
}