Splunk:Return 来自搜索的一个或 True,在另一个搜索中使用该结果

Splunk: Return One or True from a search, use that result in another search

在 Splunk 中,我正在寻找“以配置文件开头:[配置文件名称]”的日志,并从找到的事件中检索配置文件名称。然后我想使用配置文件名称来查找其他事件(来自不同的来源),如果发现一个或多个错误,我想让它在每个平台上算作一个发现的错误。

为了让事情更清楚,我有以下搜索查询(查询一个):

index="myIndex" "started with profile" BD_L*
| table _raw, platform, RUNID
| eval Platform=case(searchmatch("LINUX"),"LINUX",searchmatch("AIX"),"AIX",searchmatch("DB2"),"DB2", searchmatch("SQL"),"SQL", searchmatch("WEBSPHERE"),"WEBSPHERE", searchmatch("SYBASE"),"SYBASE", searchmatch("WINDOWS"),"WINDOWS", true(),"ZLINUX") 
| stats count by Platform
| rename count AS "Amount"

从上述查询中找到的事件包含以下(原始):

Discovery run, 2021101306351355 started with profile BD_L2_Windows

上述查询将 return 包含上述原始数据的事件列表,并会产生以下 table。这是一个 table,每个平台的发现数量 运行:

使用下面的代码,我可以从事件中提取 RUNID。 RUNID 是我在查找错误时需要在第二次搜索中使用的:

| rex "Discovery run, (?<RUNID>.+) started with profile"

使用RUNID我可以查找错误(查询二):

index="myIndex" source="/*/RUNID/*" CASE("ERROR") CTJT* 
| dedup _raw   
| stats  count
| rename count AS "Amount"

现在,我正在寻找一种方法将上述两个查询合并为一个,并计算至少有一个错误的平台数量。所以假设我们有以下模拟:

这应该会产生以下结果:

Platform     |     Amount
Linux        |          1

我需要从查询 2 中找到一些方法 return true 或者 one 并在查询 1 中使用它对结果进行分组,但由于缺乏经验,我无法进行。我还没有找到任何与我的问题相似的东西,希望这里的任何人都可以帮助我。

到目前为止,我能想到以下内容,但这仍然显示与上面显示的 table 相同的结果(每个平台的发现计数 运行,而不是具有至少一个错误):

index="myIndex" "started with profile" BD_L* 
| table _raw, platform, RUNID 
| eval Platform=case(searchmatch("LINUX"),"LINUX",searchmatch("AIX"),"AIX",searchmatch("DB2"),"DB2", searchmatch("SQL"),"SQL", searchmatch("WEBSPHERE"),"WEBSPHERE", searchmatch("SYBASE"),"SYBASE", searchmatch("WINDOWS"),"WINDOWS", true(),"ZLINUX") 
| join type=left RUNID
    [ search index="myIndex" source="/*/RUNID/*" CASE("ERROR") CTJT*
        | dedup _raw
        | stats count 
    ]
| stats count by Platform

如何解决这个问题?提前致谢。

编辑:

根据要求,数据样本:

查询 1:

2021-10-25 22:01:10,065 ProcessFlowManager [RMI TCP Connection(20)-127.0.0.1]  INFO processflowmgr.ProcessFlowManagerImpl - Discovery run, 2021102522011000 started with profile BD_L2_Windows

查询 2:

2021-10-25 22:02:11,537 DiscoverManager [DiscoverWorker-47] 2021102522011000#SessionSensor-XX.XXX.XXX.XX-[135,445] ERROR cdb.TivoliStdMsgLogger - CTJTD3028E Sensor SessionSensor encountered an error, Seed: XX.XXX.XXX.XX:[135, 445], Run ID: 2021102522011000.

编辑 2:

我已经尝试了最新的答案并得到了以下结果:

我期待平台列表和与平台相关的错误数量。类似于:

Platform | Amount (of errors)
zLinux | 2
Windows | 4

首先...不要 dedup _raw

_raw 事件从不 重复(除非你在摄取时做错了什么)

其次,针对您的实际问题 - 尝试按照以下方式进行操作:

index="myIndex" "started with profile" BD_L* 
| eval Platform=case(match(_raw,"LINUX"),"LINUX",match(_raw,"AIX"),"AIX",match(_raw,"DB2"),"DB2", match(_raw,"SQL"),"SQL", match(_raw,"WEBSPHERE"),"WEBSPHERE", match(_raw,"SYBASE"),"SYBASE", match(_raw,"WINDOWS"),"WINDOWS", true(),"ZLINUX") 
| stats count by Platform RUNID
| join type=left RUNID
    [ search index="myIndex" source="/*/RUNID/*" CASE("ERROR") CTJT*
        | stats count by RUNID
    ]
| stats count by Platform

如果您可以提供来自您的两个指数的一些示例数据,我们可以让您更接近一个好的解决方案 - 但这应该会让您找到答案

可能一种方法在此搜索中使用join - 但我们需要示例数据来验证: )

编辑示例数据

这应该为您提供一种不使用 join 的方法,并获得您要查找的内容:

index=ndx sourcetype=srctp 
| rex field=_raw " (?<runid>\d{10,})[\s\#]"
| rex field=_raw "BD_\w+_(?<platform>\w+)"
| rex field=_raw "(?<error>[eEoOrR]{5})"
| stats values(error) as error values(platform) as platform by runid
| where isnotnull(error)

假设 platform 总是以 BD_<string>_<platform> 的格式出现,这将拉取 runidplatform、和 error(如果有)来自事件,然后通过 values() stats 函数

将它们分组

最后,where 子句将确保只显示给定 runid

出现 error 的结果

编辑 2 - OP 进一步更新了问题:添加 | stats count by platform:

index=ndx sourcetype=srctp 
| rex field=_raw " (?<runid>\d{10,})[\s\#]"
| rex field=_raw "BD_\w+_(?<platform>\w+)"
| rex field=_raw "(?<error>[eEoOrR]{5})"
| stats values(error) as error values(platform) as platform by runid
| where isnotnull(error)
| stats count by platform

我在一位同事的帮助下成功完成了:

index="myIndex" "started with profile" BD_L* 
| eval platform=case(searchmatch("LINUX"),"LINUX",searchmatch("AIX"),"AIX",searchmatch("DB2"),"DB2", searchmatch("SQL"),"SQL", searchmatch("WEBSPHERE"),"WEBSPHERE", searchmatch("SYBASE"),"SYBASE", searchmatch("WINDOWS"),"WINDOWS", true(),"ZLINUX")
| rex "Discovery run, (?<RUNID>.+) started with profile"
| stats count by platform, RUNID
| join RUNID 
    [ search index="myIndex" source="/opt/XXX/XXXXX/XXXX/log/sensors/*/*" CASE("ERROR") CTJT* 
    | rex field=source "^/opt/XXX/XXXXX/XXXX/log/sensors/(?<RUNID>.+)/" 
    | stats count by RUNID ] 
| stats count by platform