在 Snort 警报上执行脚本
Execute script on Snort alert
我目前正在试验 Raspberry Pi。我是运行 Snort,一款包检测软件。在 Snort 发出警报的情况下,我想执行 (Python) 脚本。
Snort 在 raspberry pi 上执行如下:
sudo snort -q -A console -i eth0 -c /etc/snort/snort.conf
我创建了一个 python 脚本,该脚本在调用时控制 raspberry pi 的 GPIO 引脚。把它更多地放在上下文中;当 raspberry pi 收到 ping/ICMP 数据包时,红色警报灯点亮并由同一设备控制。
snort 规则当前有效,当 ICMP 数据包到达时,将向控制台输出警报。但是我不知道如何让 snort 执行 python 脚本
您可以改为将警报记录到一个文件中,然后执行类似 的操作,但您只需调用 python 脚本而不是通知发送。
snort 中的实验性东西可能很难弄清楚,因为当出现问题时没有太多的支持。
以下是 3 个选项,希望其中一个可行:
- "Strict"
subprocess
使用子流程的方法 PIPE
s
- 使用
pexpect
的方法 -- "Pexpect is a pure Python module for spawning child applications; controlling them; and responding to expected patterns in their output." -- 并不是说这是你必须从默认 python 安装中单独获取的唯一非标准包。
- 使用伪终端和老式
select
读取文件描述符的方法
每种方法都捆绑在一个 try_[SOME APPROACH]
函数中。您应该能够更新顶部的 3 个参数,然后 comment/uncomment 底部的一种方法来试一试。
可能值得独立测试两半。换句话说,snort + my rpi.py
(下)。然后,如果可行,我的 timed_printer.py
(下图)和您的 python 脚本将切换 RPi GPIO。如果它们都独立工作,那么您可以确信不需要做太多工作就能使整个工作流程正常运行。
代码
import subprocess
_cmd_lst = ['python', '-u', 'timed_printer.py'] # sudo snort -q -A console -i eth0 -c /etc/snort/snort.conf
_rpi_lst = ['python', '-u', 'rpi.py'] # python script that toggles RPi
_alert = 'TIME' # The keyword you're looking for
# in snort output
#===============================================================================
# Simple helper function that calls the RPi toggle script
def toggle_rpi():
subprocess.call(_rpi_lst)
def try_subprocess(cmd_lst, alert, rpi_lst):
p = subprocess.Popen(' '.join(cmd_lst), shell=True, stdout=subprocess.PIPE, bufsize=1)
try:
while True:
for line in iter(p.stdout.readline, b''):
print("try_subprocess() read: %s" % line.strip())
if alert in line:
print("try_subprocess() found alert: %s" % alert)
toggle_rpi()
except KeyboardInterrupt: print(" Caught Ctrl+C -- killing subprocess...")
except Exception as ex: print ex
finally:
print("Cleaning up...")
p.kill()
print("Goodbye.")
def try_pexpect(cmd_lst, alert, rpi_lst):
import pexpect # http://pexpect.sourceforge.net/pexpect.html
p = pexpect.spawn(' '.join(cmd_lst))
try:
while True:
p.expect(alert) # This blocks until <alert> is found in the output of cmd_str
print("try_pexpect() found alert: %s" % alert)
toggle_rpi()
except KeyboardInterrupt: print(" Caught Ctrl+C -- killing subprocess...")
except Exception as ex: print ex
finally:
print("Cleaning up...")
p.close(force=True)
print("Goodbye.")
def try_pty(cmd_lst, alert, rpi_lst, MAX_READ=2048):
import pty, os, select
mfd, sfd = pty.openpty()
p = subprocess.Popen(' '.join(cmd_lst), shell=True, stdout=sfd, bufsize=1)
try:
while True:
rlist, _, _, = select.select([mfd], [], [])
if rlist:
data = os.read(mfd, MAX_READ)
print("try_pty() read: %s" % data.strip())
if not data:
print("try_pty() got EOF -- exiting")
break
if alert in data:
print("try_pty() found alert: %s" % alert)
toggle_rpi()
elif p.poll() is not None:
print("try_pty() had subprocess end -- exiting")
break
except KeyboardInterrupt: print(" Caught Ctrl+C -- killing subprocess...")
except Exception as ex: print ex
finally:
print("Cleaning up...")
os.close(sfd)
os.close(mfd)
p.kill()
print("Goodbye.")
#===============================================================================
try_subprocess(_cmd_lst, _alert, _rpi_lst)
#try_pexpect(_cmd_lst, _alert, _rpi_lst)
#try_pty(_cmd_lst, _alert, _rpi_lst)
测试笔记
为了模拟您的 snort 脚本(一个 "hangs" 然后打印一些东西,然后返回挂起等的脚本),我写了这个简单的 python 脚本,我称之为 timed_printer.py
:
import time
while True:
print("TIME: %s" % time.time())
time.sleep(5)
我的 rpi.py
文件很简单:
print("TOGGLING OUTPUT PIN")
这里没有明确的输出刷新,试图最好地模拟正常输出。
最后的考虑
第一种方法将一次读取整行。因此,如果您希望 alert
包含在一行中,您会没事的。
第二种方法 (pexpect
) 将阻塞,直到遇到 alert
。
第三种方法将读作 as soon as data is available,我应该指出,这不一定是整行。如果您看到 try_pty() read:
带有 snort 输出行的片段,导致您错过警报,则需要添加某种缓冲解决方案。
文档
参考文献: 1, 2
如果管道输出延迟接收警报,直到刷新 snort 的标准输出缓冲区:
#!/usr/bin/env python
from __future__ import print_function
from subprocess import Popen, PIPE, STDOUT
snort_process = Popen(['snort', '-A', 'console', '-c', 'snort.conf'],
stdout=PIPE, stderr=STDOUT, bufsize=1,
universal_newlines=True, close_fds=True)
with snort_process.stdout:
for line in iter(snort_process.stdout.readline, ''):
#XXX run python script here:
# subprocess.call([sys.executable or 'python', '-m', 'your_module'])
print(line, end='')
rc = snort_process.wait()
那么你可以try a pseudo-tty to enable line-buffereing on snort's side.
或运行 snort -A unsock
命令并在使用 Unix 域套接字生成警报后立即打印每个警报:
#!/usr/bin/env python
import ctypes
import os
import socket
from subprocess import Popen
from snort import Alertpkt
# listen for alerts using unix domain sockets (UDS)
snort_log_dir = os.getcwd()
server_address = os.path.join(snort_log_dir, 'snort_alert')
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
try:
os.remove(server_address)
except OSError:
pass
sock.bind(server_address)
# start snort process
snort_process = Popen(['snort', '-A', 'unsock', '-l', snort_log_dir,
'-c', 'snort.conf'], close_fds=True)
# receive alerts
alert = Alertpkt()
try:
while 1:
if sock.recv_into(alert) != ctypes.sizeof(alert):
break # EOF
#XXX run python script here `subprocess.call([sys.executable or 'python', '-m', 'your_module'])`
print("{:03d} {}".format(alert.val, alert.data))
except KeyboardInterrupt:
pass
finally:
sock.close()
os.remove(server_address)
if snort_process.poll() is None: # the process is still running
snort_process.kill()
snort_process.wait() # wait for snort process to exit
在您的情况下,您可以 运行 每个警报的脚本而不是打印。
snort.Alertpkt
is a ctypes's defition of C struct Alertpkt
.
要试用,您可以下载 the gist that contains a dummy snort
script in addition to all the python modules 和 运行 run-script-on-alert-unsock.py
(或 run-script-on-alert-pty.py
)。
我目前正在试验 Raspberry Pi。我是运行 Snort,一款包检测软件。在 Snort 发出警报的情况下,我想执行 (Python) 脚本。
Snort 在 raspberry pi 上执行如下:
sudo snort -q -A console -i eth0 -c /etc/snort/snort.conf
我创建了一个 python 脚本,该脚本在调用时控制 raspberry pi 的 GPIO 引脚。把它更多地放在上下文中;当 raspberry pi 收到 ping/ICMP 数据包时,红色警报灯点亮并由同一设备控制。
snort 规则当前有效,当 ICMP 数据包到达时,将向控制台输出警报。但是我不知道如何让 snort 执行 python 脚本
您可以改为将警报记录到一个文件中,然后执行类似
snort 中的实验性东西可能很难弄清楚,因为当出现问题时没有太多的支持。
以下是 3 个选项,希望其中一个可行:
- "Strict"
subprocess
使用子流程的方法PIPE
s - 使用
pexpect
的方法 -- "Pexpect is a pure Python module for spawning child applications; controlling them; and responding to expected patterns in their output." -- 并不是说这是你必须从默认 python 安装中单独获取的唯一非标准包。 - 使用伪终端和老式
select
读取文件描述符的方法
每种方法都捆绑在一个 try_[SOME APPROACH]
函数中。您应该能够更新顶部的 3 个参数,然后 comment/uncomment 底部的一种方法来试一试。
可能值得独立测试两半。换句话说,snort + my rpi.py
(下)。然后,如果可行,我的 timed_printer.py
(下图)和您的 python 脚本将切换 RPi GPIO。如果它们都独立工作,那么您可以确信不需要做太多工作就能使整个工作流程正常运行。
代码
import subprocess
_cmd_lst = ['python', '-u', 'timed_printer.py'] # sudo snort -q -A console -i eth0 -c /etc/snort/snort.conf
_rpi_lst = ['python', '-u', 'rpi.py'] # python script that toggles RPi
_alert = 'TIME' # The keyword you're looking for
# in snort output
#===============================================================================
# Simple helper function that calls the RPi toggle script
def toggle_rpi():
subprocess.call(_rpi_lst)
def try_subprocess(cmd_lst, alert, rpi_lst):
p = subprocess.Popen(' '.join(cmd_lst), shell=True, stdout=subprocess.PIPE, bufsize=1)
try:
while True:
for line in iter(p.stdout.readline, b''):
print("try_subprocess() read: %s" % line.strip())
if alert in line:
print("try_subprocess() found alert: %s" % alert)
toggle_rpi()
except KeyboardInterrupt: print(" Caught Ctrl+C -- killing subprocess...")
except Exception as ex: print ex
finally:
print("Cleaning up...")
p.kill()
print("Goodbye.")
def try_pexpect(cmd_lst, alert, rpi_lst):
import pexpect # http://pexpect.sourceforge.net/pexpect.html
p = pexpect.spawn(' '.join(cmd_lst))
try:
while True:
p.expect(alert) # This blocks until <alert> is found in the output of cmd_str
print("try_pexpect() found alert: %s" % alert)
toggle_rpi()
except KeyboardInterrupt: print(" Caught Ctrl+C -- killing subprocess...")
except Exception as ex: print ex
finally:
print("Cleaning up...")
p.close(force=True)
print("Goodbye.")
def try_pty(cmd_lst, alert, rpi_lst, MAX_READ=2048):
import pty, os, select
mfd, sfd = pty.openpty()
p = subprocess.Popen(' '.join(cmd_lst), shell=True, stdout=sfd, bufsize=1)
try:
while True:
rlist, _, _, = select.select([mfd], [], [])
if rlist:
data = os.read(mfd, MAX_READ)
print("try_pty() read: %s" % data.strip())
if not data:
print("try_pty() got EOF -- exiting")
break
if alert in data:
print("try_pty() found alert: %s" % alert)
toggle_rpi()
elif p.poll() is not None:
print("try_pty() had subprocess end -- exiting")
break
except KeyboardInterrupt: print(" Caught Ctrl+C -- killing subprocess...")
except Exception as ex: print ex
finally:
print("Cleaning up...")
os.close(sfd)
os.close(mfd)
p.kill()
print("Goodbye.")
#===============================================================================
try_subprocess(_cmd_lst, _alert, _rpi_lst)
#try_pexpect(_cmd_lst, _alert, _rpi_lst)
#try_pty(_cmd_lst, _alert, _rpi_lst)
测试笔记
为了模拟您的 snort 脚本(一个 "hangs" 然后打印一些东西,然后返回挂起等的脚本),我写了这个简单的 python 脚本,我称之为 timed_printer.py
:
import time
while True:
print("TIME: %s" % time.time())
time.sleep(5)
我的 rpi.py
文件很简单:
print("TOGGLING OUTPUT PIN")
这里没有明确的输出刷新,试图最好地模拟正常输出。
最后的考虑
第一种方法将一次读取整行。因此,如果您希望 alert
包含在一行中,您会没事的。
第二种方法 (pexpect
) 将阻塞,直到遇到 alert
。
第三种方法将读作 as soon as data is available,我应该指出,这不一定是整行。如果您看到 try_pty() read:
带有 snort 输出行的片段,导致您错过警报,则需要添加某种缓冲解决方案。
文档
参考文献: 1, 2
如果管道输出延迟接收警报,直到刷新 snort 的标准输出缓冲区:
#!/usr/bin/env python
from __future__ import print_function
from subprocess import Popen, PIPE, STDOUT
snort_process = Popen(['snort', '-A', 'console', '-c', 'snort.conf'],
stdout=PIPE, stderr=STDOUT, bufsize=1,
universal_newlines=True, close_fds=True)
with snort_process.stdout:
for line in iter(snort_process.stdout.readline, ''):
#XXX run python script here:
# subprocess.call([sys.executable or 'python', '-m', 'your_module'])
print(line, end='')
rc = snort_process.wait()
那么你可以try a pseudo-tty to enable line-buffereing on snort's side.
或运行 snort -A unsock
命令并在使用 Unix 域套接字生成警报后立即打印每个警报:
#!/usr/bin/env python
import ctypes
import os
import socket
from subprocess import Popen
from snort import Alertpkt
# listen for alerts using unix domain sockets (UDS)
snort_log_dir = os.getcwd()
server_address = os.path.join(snort_log_dir, 'snort_alert')
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
try:
os.remove(server_address)
except OSError:
pass
sock.bind(server_address)
# start snort process
snort_process = Popen(['snort', '-A', 'unsock', '-l', snort_log_dir,
'-c', 'snort.conf'], close_fds=True)
# receive alerts
alert = Alertpkt()
try:
while 1:
if sock.recv_into(alert) != ctypes.sizeof(alert):
break # EOF
#XXX run python script here `subprocess.call([sys.executable or 'python', '-m', 'your_module'])`
print("{:03d} {}".format(alert.val, alert.data))
except KeyboardInterrupt:
pass
finally:
sock.close()
os.remove(server_address)
if snort_process.poll() is None: # the process is still running
snort_process.kill()
snort_process.wait() # wait for snort process to exit
在您的情况下,您可以 运行 每个警报的脚本而不是打印。
snort.Alertpkt
is a ctypes's defition of C struct Alertpkt
.
要试用,您可以下载 the gist that contains a dummy snort
script in addition to all the python modules 和 运行 run-script-on-alert-unsock.py
(或 run-script-on-alert-pty.py
)。