Splunk 查询根据同一索引中的其他事件过滤掉

Splunk query filter out based on other event in same index

我有一个名为 Events

的索引

它包含一堆不同的事件,所有事件都有一个名为 EventName 的 属性。 现在我想查询 return 匹配以下内容的所有内容:

如果 AccountId 存在于 EventName AccountCreated 的事件中,并且至少有 1 个 EventName FavoriteCreated 的事件具有相同的 AccountId -> return 所有 EventName == [= 的事件18=]

示例事件:

已创建帐户

{
   "AccountId": 1234,
   "EventName": "AccountCreated",
   "SomeOtherProperty": "Some value",
   "Brand": "My Brand",
   "DeviceType": "Mobile",
   "EventTime": "2020-06-01T12:13:14Z"
}

FavoriteCreated

{
   "AccountId": 1234,
   "EventName": "FavoritesCreated,
   "Brand": "My Brand",
   "DeviceType": "Mobile",
   "EventTime": "2020-06-01T12:13:14Z"
}

鉴于以下两个事件,我想创建 1 个 returnAccountCreated 事件的查询。

我已经尝试了以下但它不起作用,我一定是遗漏了一些简单的东西?

index=events EventName=AccountCreated 
  [search index=events EventName=FavoriteCreated | dedup AccountId | fields AccountId]
| table AccountId, SomeOtherProperty

我预计这里有 ~6000 次点击,但我只收到 2298 次事件。我错过了什么?

更新 根据下面@warren 给出的答案,以下查询有效。唯一的问题是它使用的 JOIN 限制了我们只能从子搜索中获得 50K 个结果。当 运行 这个查询时,我总共得到 5900 个结果 = 正确。

index=events EventName=AccountCreated AccountId=*
| stats count by AccountId, EventName
| fields - count
| join AccountId 
    [ | search index=events EventName=FavoriteCreated AccountId=*
    | stats count by AccountId ]
| fields - count
| table AccountId, EventName

然后我尝试像这样使用他更新的示例,但问题似乎是 returns FavoriteCreated 事件而不是 AccountCreated。 当 运行 这个查询时,我得到 25 494 次点击 = 不正确。

index=events AccountId=* (EventName=AccountCreated OR EventName=FavoriteCreated)
| stats values(EventName) as EventName by AccountId
| eval EventName=mvindex(EventName,-1)
| search EventName="FavoriteCreated"
| table AccountId, EventName

更新 2 - 工作中 @warren 太棒了,这是一个完整的工作查询,如果存在 1 个或多个 FavoriteCreated 事件,只有 returns 来自 AccountCreated 事件的数据。

index=events AccountId=* (EventName=AccountCreated OR EventName=FavoriteCreated)
| stats 
    values(Brand) as Brand,
    values(DeviceType) as DeviceType,
    values(Email) as Email,
    values(EventName) as EventName
    values(EventTime) as EventTime,
    values(Locale) as Locale,
    values(ClientIp) as ClientIp
  by AccountId
| where mvcount(EventName)>1
| eval EventName=mvindex(EventName,0)
| eval EventTime=mvindex(EventTime,0)
| eval ClientIp=mvindex(ClientIp,0)
| eval DeviceType=mvindex(DeviceType,0)

事实证明,如果 SubSearch 结果大于 10 000 个,Splunk 会截断它...

当您使用 "normal" 子搜索时,您可能找到了 of your issues - that subsearches are capped at 50,000 (when doing a join) events (or 60 seconds run time (or 10,000 个结果)。

首先放弃 dedup 以支持 stats:

index=events EventName=AccountCreated AccountId=*
| stats count by AccountId, SomeOtherProperty [, more, fields, as, desired]
| fields - count
| search 
    [ | search index=events EventName=FavoriteCreated AccountId=*
    | stats count by AccountId 
    | fields - count]
<rest of search>

如果这不能让您到达您想去的地方(即,您仍然 在您的子搜索中有太多结果),您可以尝试 join

index=events EventName=AccountCreated AccountId=*
| stats count by AccountId, SomeOtherProperty [, more, fields, as, desired]
| fields - count
| join AccountId 
    [ | search index=events EventName=FavoriteCreated AccountId=*
    | stats count by AccountId ]
| fields - count
<rest of search>

还有 种方法可以满足您的需求 - 但这两种方法应该可以帮助您实现目标

这是一种 join-less 方法,它只会显示 "FavoriteCreated" 个事件:

index=events AccountId=* (EventName=AccountCreated OR EventName=FavoriteCreated)
| stats values(EventName) as EventName values(SomeOtherProperty) as SomeOtherProperty by AccountId
| eval EventName=mvindex(EventName,-1)
| search EventName="FavoriteCreated"

如果在同一时间范围内还有一个 "AccountCreated" 事件,这里显示 "FavoriteCreated"

index=events AccountId=* (EventName=AccountCreated OR EventName=FavoriteCreated)
| stats values(EventName) as EventName values(SomeOtherProperty) as SomeOtherProperty by AccountId
| where mvcount(EventName)>1

如果你想 'pretend' values() 没有发生(即丢弃 "favoriteCreated" 条目),请添加:

| eval EventName=mvindex(EventName,0)