Splunk - 匹配来自同一数据源的不同事件中的不同字段

Splunk - Match different fields in different events from same data source

我有一个数据源,我需要在其中 return 来自单个数据源的所有事件对(event1、event2),其中 event1 的 field1 匹配 event2 的 field2。

例如,假设我有以下数据。

我需要 return 一对事件,其中事件 1 的字段 id 匹配字段 referrer_id来自事件 2。比方说,要获得以下报告。

在 sql 中,我可以使用以下命令轻松完成此操作。

select a.first_name as first1, a.last_name as last1, b.first_name as first2,
       b.last_name as last2, b.date as date
from myTable a
inner join myTable b on a.id = b.referrer_id;

return以下table,

这正是我需要的数据。

现在,我一直在尝试在 splunk 查询中复制它,并且 运行 遇到了很多问题。首先,我尝试使用事务命令,但它将所有相关事件聚合在一起,而不是一次将它们配对。

接下来,我尝试使用子搜索,首先找到 id 然后在子搜索中搜索,首先是 id[=51 的第一个事件=] 并通过 referral_id 附加第二个事件。然后,由于 append 创建了一个新行而不是附加到同一行,因此使用 stats 通过匹配的 id 字段聚合结果行。我确实尝试过使用 appendcols,但这对我来说 return 没有任何用处。

...
| table id
| map search="search id=$id$
  | fields first_name, last_name, id
  | rename first_name as first1
  | rename last_name as last1
  | rename id as match_id
  | append [search $id$
    | search referral_id=$id$
    | fields first_name, last_name, referral_id, date
    | rename first_name as first2
    | rename last_name as span2
    | rename referral_id as match_id]"
| fields first1, last1, first2, last2, match_id, time
| stats values(first1) as first1, values(last1) as last1, values(first2) as first2,
  values(last2) as last2, values(time) as time by id

上面的查询对我有效,并给出了我需要的 table,但由于在整个时间范围内重复搜索,而且还受到地图 的限制,速度非常慢maxsearches,无论出于何种原因,都不能设置为无限制。

这似乎是一个过于复杂的解决方案,尤其是与 sql 查询相比。当然,必须存在一种更简单、更快速的方法可以做到这一点,它不受任意限制设置或多次重复搜索查询的限制。如果有任何帮助,我将不胜感激。

我最终使用了追加。使用连接给了我更快的结果,但并没有产生每个匹配对,对于我的例子,它将 return 2 行而不是三行,returning Adam 和 Betty,但不是 return与卡罗尔一起爱上亚当。

使用 append returned 一个完整的列表,并使用 id 的 stats 给了我我正在寻找的结果,每个匹配对的完整列表。它还提供了额外的空字段,所以我不得不删除它们,然后将生成的 mv 处理到它们自己的单独行中。 Splunk 不提供多字段 mv 扩展,所以我使用了一个解决方法。

...
| rename id as matchId, first_name as first1, last_name as last1
| table matchId, first1, last1
| append [
  search ... 
  | rename referrer_id as matchId, first_name as first2, last_name as last2
  | table matchId, first2, last2, date]
| stats list(first1) as first1, list(last1) as last1, list(first2) as first2, list(last2) as last2, list(date) as date by matchId
| search first1!=null last1!=null first2!=null last2!=null
| eval zipped=mvzip(mvzip(first2, last2, ","), date, ",")
| mvexpand zipped
| makemv zipped delim=","
| eval first2=mvindex(zipped, 0)
| eval last2=mvindex(zipped, 1)
| eval date=mvindex(zipped, 2)
| fields - zipped

这比使用具有多个子搜索的地图要快,并且会给出所有结果。还是受限于子搜索的最大尺寸,但至少提供了必要的数据。