如何在使用统计信息加入数据集之前过滤掉事件
How to filter out events before joining datasets with stats
我有一些事件(2 个不同的源类型——process_events 和 socket_events)看起来像这样:
{
"action": "added",
"columns": {
"time": "1527895541",
"success": "1",
"action": "connect",
"auid": "1000",
"family": "2",
"local_address": "",
"local_port": "0",
"path": "/usr/bin/curl",
"pid": "30220",
"remote_address": "127.0.0.2",
"remote_port": "80"
},
"unixTime": 1527895545,
"hostIdentifier": "HOST_ONE",
"name": "socket_events",
"numerics": false
}
{
"action": "added",
"columns": {
"time": "1527895541",
"success": "1",
"action": "connect",
"auid": "1000",
"family": "2",
"local_address": "",
"local_port": "0",
"path": "/usr/bin/curl",
"pid": "30220",
"remote_address": "10.10.10.10",
"remote_port": "12345"
},
"unixTime": 1527895545,
"hostIdentifier": "HOST_ONE",
"name": "socket_events",
"numerics": false
}
{
"action": "added",
"columns": {
"uid": "0",
"time": "1527895541",
"pid": "30220",
"path": "/usr/bin/curl",
"auid": "1000",
"cmdline": "curl google.com",
"ctime": "1503452096",
"cwd": "",
"egid": "0",
"euid": "0",
"gid": "0",
"parent": ""
},
"unixTime": 1527895550,
"hostIdentifier": "HOST_ONE",
"name": "process_events",
"numerics": false
}
当前查询:
(name=socket_events OR name=process_Events) columns.path=*bin*
| stats values(*) as * by hostIdentifier, columns.path, columns.pid
结果
+-------------------------------------------------------------------------------------------+
| hostIdentifier | columns.path | columns.pid | cmdline | columns.remote_addr | columns.remote_p
+-------------------------------------------------------------------------------------------+
| HOST_ONE | /usr/bin/curl | 30220 | curl google.com | 127.0.0.2 | 80
| | | | | 10.10.10.10 | 12345
+-------------------------------------------------------------------------------------------+
有没有办法让我应用像这样的过滤逻辑
If columns.remote is multivalue AND one of the remote_address!=127.0.0.0/8 AND > remote_port>5000, then pipe it to stats
If columns.remote is not multivalue AND remote_address!=127.0.0.0/8
AND remote_port>5000, then pipe it to stats()
Else, ignore
我觉得我需要在 | stats ...
之前应用过滤器,因为我需要在 JOIN 与 process_events
之前排除所有不满足条件的 socket_events
事件.
任何帮助都会很棒!
此外,示例数据取自 https://osquery.readthedocs.io/en/stable/deployment/process-auditing/
无法过滤掉 stats
之前的多值字段,因为 stats
使它们成为多值。在加入活动之前尝试过滤掉不需要的 IP 地址。
(name=socket_events OR name=process_Events) columns.path=*bin*
| where (isnull(columns.remote_addr) OR NOT cidrmatch("127.0.0.0/8", columns.remote_addr))
| stats values(*) as * by hostIdentifier, columns.path, columns.pid
isnull
函数保留没有 remote_addr 字段的行。
我有一些事件(2 个不同的源类型——process_events 和 socket_events)看起来像这样:
{
"action": "added",
"columns": {
"time": "1527895541",
"success": "1",
"action": "connect",
"auid": "1000",
"family": "2",
"local_address": "",
"local_port": "0",
"path": "/usr/bin/curl",
"pid": "30220",
"remote_address": "127.0.0.2",
"remote_port": "80"
},
"unixTime": 1527895545,
"hostIdentifier": "HOST_ONE",
"name": "socket_events",
"numerics": false
}
{
"action": "added",
"columns": {
"time": "1527895541",
"success": "1",
"action": "connect",
"auid": "1000",
"family": "2",
"local_address": "",
"local_port": "0",
"path": "/usr/bin/curl",
"pid": "30220",
"remote_address": "10.10.10.10",
"remote_port": "12345"
},
"unixTime": 1527895545,
"hostIdentifier": "HOST_ONE",
"name": "socket_events",
"numerics": false
}
{
"action": "added",
"columns": {
"uid": "0",
"time": "1527895541",
"pid": "30220",
"path": "/usr/bin/curl",
"auid": "1000",
"cmdline": "curl google.com",
"ctime": "1503452096",
"cwd": "",
"egid": "0",
"euid": "0",
"gid": "0",
"parent": ""
},
"unixTime": 1527895550,
"hostIdentifier": "HOST_ONE",
"name": "process_events",
"numerics": false
}
当前查询:
(name=socket_events OR name=process_Events) columns.path=*bin*
| stats values(*) as * by hostIdentifier, columns.path, columns.pid
结果
+-------------------------------------------------------------------------------------------+
| hostIdentifier | columns.path | columns.pid | cmdline | columns.remote_addr | columns.remote_p
+-------------------------------------------------------------------------------------------+
| HOST_ONE | /usr/bin/curl | 30220 | curl google.com | 127.0.0.2 | 80
| | | | | 10.10.10.10 | 12345
+-------------------------------------------------------------------------------------------+
有没有办法让我应用像这样的过滤逻辑
If columns.remote is multivalue AND one of the remote_address!=127.0.0.0/8 AND > remote_port>5000, then pipe it to stats
If columns.remote is not multivalue AND remote_address!=127.0.0.0/8 AND remote_port>5000, then pipe it to stats()
Else, ignore
我觉得我需要在 | stats ...
之前应用过滤器,因为我需要在 JOIN 与 process_events
之前排除所有不满足条件的 socket_events
事件.
任何帮助都会很棒!
此外,示例数据取自 https://osquery.readthedocs.io/en/stable/deployment/process-auditing/
无法过滤掉 stats
之前的多值字段,因为 stats
使它们成为多值。在加入活动之前尝试过滤掉不需要的 IP 地址。
(name=socket_events OR name=process_Events) columns.path=*bin*
| where (isnull(columns.remote_addr) OR NOT cidrmatch("127.0.0.0/8", columns.remote_addr))
| stats values(*) as * by hostIdentifier, columns.path, columns.pid
isnull
函数保留没有 remote_addr 字段的行。