如何在使用统计信息加入数据集之前过滤掉事件

How to filter out events before joining datasets with stats

我有一些事件(2 个不同的源类型——process_events 和 socket_events)看起来像这样:

{
  "action": "added",
  "columns": {
    "time": "1527895541",
    "success": "1",
    "action": "connect",
    "auid": "1000",
    "family": "2",
    "local_address": "",
    "local_port": "0",
    "path": "/usr/bin/curl",
    "pid": "30220",
    "remote_address": "127.0.0.2",
    "remote_port": "80"
  },
  "unixTime": 1527895545,
  "hostIdentifier": "HOST_ONE",
  "name": "socket_events",
  "numerics": false
}
{
  "action": "added",
  "columns": {
    "time": "1527895541",
    "success": "1",
    "action": "connect",
    "auid": "1000",
    "family": "2",
    "local_address": "",
    "local_port": "0",
    "path": "/usr/bin/curl",
    "pid": "30220",
    "remote_address": "10.10.10.10",
    "remote_port": "12345"
  },
  "unixTime": 1527895545,
  "hostIdentifier": "HOST_ONE",
  "name": "socket_events",
  "numerics": false
}
{
  "action": "added",
  "columns": {
    "uid": "0",
    "time": "1527895541",
    "pid": "30220",
    "path": "/usr/bin/curl",
    "auid": "1000",
    "cmdline": "curl google.com",
    "ctime": "1503452096",
    "cwd": "",
    "egid": "0",
    "euid": "0",
    "gid": "0",
    "parent": ""
  },
  "unixTime": 1527895550,
  "hostIdentifier": "HOST_ONE",
  "name": "process_events",
  "numerics": false
}

当前查询:

(name=socket_events OR name=process_Events) columns.path=*bin*
| stats values(*) as * by hostIdentifier, columns.path, columns.pid

结果

+-------------------------------------------------------------------------------------------+
| hostIdentifier | columns.path  | columns.pid | cmdline         | columns.remote_addr | columns.remote_p
+-------------------------------------------------------------------------------------------+
| HOST_ONE       | /usr/bin/curl | 30220       | curl google.com | 127.0.0.2           | 80
|                |               |             |                 | 10.10.10.10         | 12345
+-------------------------------------------------------------------------------------------+

有没有办法让我应用像这样的过滤逻辑

If columns.remote is multivalue AND one of the remote_address!=127.0.0.0/8 AND > remote_port>5000, then pipe it to stats

If columns.remote is not multivalue AND remote_address!=127.0.0.0/8 AND remote_port>5000, then pipe it to stats()

Else, ignore

我觉得我需要在 | stats ... 之前应用过滤器,因为我需要在 JOIN 与 process_events 之前排除所有不满足条件的 socket_events 事件.

任何帮助都会很棒!

此外,示例数据取自 https://osquery.readthedocs.io/en/stable/deployment/process-auditing/

无法过滤掉 stats 之前的多值字段,因为 stats 使它们成为多值。在加入活动之前尝试过滤掉不需要的 IP 地址。

(name=socket_events OR name=process_Events) columns.path=*bin*
| where (isnull(columns.remote_addr) OR NOT cidrmatch("127.0.0.0/8", columns.remote_addr))
| stats values(*) as * by hostIdentifier, columns.path, columns.pid

isnull 函数保留没有 remote_addr 字段的行。