收集网络流量统计信息时出错
Error in Gathering Statistics on the network traffic
我正在创建一个函数来使用 Python 和 pcap 计算数据包捕获延迟,但是发生了这个错误。由于我是 Python 的新手,所以我从 C 示例中获取了这个函数。这是错误:
Traceback (most recent call last):
File "prototype_sniffer_v1.py", line 266, in processing_pkts
Bps = ((pkt + 8) * 8 * 1000000) / (delay) TypeError: unsupported operand type(s) for +: 'LP_c_ubyte' and 'int'
Exception ignored on calling ctypes callback function: <function processing_pkts at 0x7fbed299df70>
此 link Gathering Statistics on the network traffic 包含 C 代码示例。
这是 C 示例:
#include <stdlib.h>
#include <stdio.h>
#include <pcap.h>
#include <tchar.h>
BOOL LoadNpcapDlls()
{
_TCHAR npcap_dir[512];
UINT len;
len = GetSystemDirectory(npcap_dir, 480);
if (!len) {
fprintf(stderr, "Error in GetSystemDirectory: %x", GetLastError());
return FALSE;
}
_tcscat_s(npcap_dir, 512, _T("\Npcap"));
if (SetDllDirectory(npcap_dir) == 0) {
fprintf(stderr, "Error in SetDllDirectory: %x", GetLastError());
return FALSE;
}
return TRUE;
}
void usage();
void dispatcher_handler(u_char *, const struct pcap_pkthdr *, const u_char *);
void main(int argc, char **argv)
{
pcap_t *fp;
char errbuf[PCAP_ERRBUF_SIZE];
struct timeval st_ts;
u_int netmask;
struct bpf_program fcode;
/* Load Npcap and its functions. */
if (!LoadNpcapDlls())
{
fprintf(stderr, "Couldn't load Npcap\n");
exit(1);
}
/* Check the validity of the command line */
if (argc != 2)
{
usage();
return;
}
/* Open the output adapter */
if ( (fp= pcap_open(argv[1], 100, PCAP_OPENFLAG_PROMISCUOUS,
1000, NULL, errbuf) ) == NULL)
{
fprintf(stderr,"\nUnable to open adapter %s.\n", errbuf);
return;
}
/* Don't care about netmask, it won't be used for this filter */
netmask=0xffffff;
//compile the filter
if (pcap_compile(fp, &fcode, "tcp", 1, netmask) <0 )
{
fprintf(stderr,"\nUnable to compile the packet filter. Check the syntax.\n");
/* Free the device list */
return;
}
//set the filter
if (pcap_setfilter(fp, &fcode)<0)
{
fprintf(stderr,"\nError setting the filter.\n");
pcap_close(fp);
/* Free the device list */
return;
}
/* Put the interface in statstics mode */
if (pcap_setmode(fp, MODE_STAT)<0)
{
fprintf(stderr,"\nError setting the mode.\n");
pcap_close(fp);
/* Free the device list */
return;
}
printf("TCP traffic summary:\n");
/* Start the main loop */
pcap_loop(fp, 0, dispatcher_handler, (PUCHAR)&st_ts);
pcap_close(fp);
return;
}
void dispatcher_handler(u_char *state,
const struct pcap_pkthdr *header,
const u_char *pkt_data)
{
struct timeval *old_ts = (struct timeval *)state;
u_int delay;
LARGE_INTEGER Bps,Pps;
struct tm ltime;
char timestr[16];
time_t local_tv_sec;
/* Calculate the delay in microseconds from the last sample. This value
* is obtained from the timestamp that the associated with the sample. */
delay = (header->ts.tv_sec - old_ts->tv_sec) * 1000000
- old_ts->tv_usec + header->ts.tv_usec;
/* Get the number of Bits per second */
Bps.QuadPart=(((*(LONGLONG*)(pkt_data + 8)) * 8 * 1000000) / (delay));
/* ^ ^
| |
| |
| |
converts bytes in bits -- |
|
delay is expressed in microseconds --
*/
/* Get the number of Packets per second */
Pps.QuadPart=(((*(LONGLONG*)(pkt_data)) * 1000000) / (delay));
/* Convert the timestamp to readable format */
local_tv_sec = header->ts.tv_sec;
localtime_s(<ime, &local_tv_sec);
strftime( timestr, sizeof timestr, "%H:%M:%S", <ime);
/* Print timestamp*/
printf("%s ", timestr);
/* Print the samples */
printf("BPS=%I64u ", Bps.QuadPart);
printf("PPS=%I64u\n", Pps.QuadPart);
//store current timestamp
old_ts->tv_sec=header->ts.tv_sec;
old_ts->tv_usec=header->ts.tv_usec;
}
void usage()
{
printf("\nShows the TCP traffic load, in bits per second and packets per second."
"\nCopyright (C) 2002 Loris Degioanni.\n");
printf("\nUsage:\n");
printf("\t tcptop adapter\n");
printf("\t You can use \"WinDump -D\" if you don't know the name of your adapters.\n");
exit(0);
}
这是我的 Python 函数:
import libpcap as pcap
import ctypes as ct
import socket
import getopt
import os
import time
import funcoes_auxiliares
from struct import *
from libpcap._platform import sockaddr, sockaddr_in, sockaddr_in6
from pcaptestutils import *
pd = ct.POINTER(pcap.pcap_t)()
if not is_windows:
breaksigint = False
#static void sigint_handler(int signum _U_)
def sigint_handler(signum):
global pd
global breaksigint
if breaksigint:
pcap.breakloop(pd)
def main(argv=sys.argv[1:]):
global program_name
program_name = os.path.basename(sys.argv[0])
global pd
global breaksigint
global ip_afinet
global ip_afinet6
# Exceção para os parâmetros
try:
opts, args = getopt.getopt(argv, "i:mnt:" if is_windows else "bi:mnrst:")
except getopt.GetoptError:
usage()
device = None
timeout = 1000
nonblock = 0
immediate = False
if not is_windows:
sigrestart = False
catchsigint = False
for opt, optarg in opts:
if not is_windows and opt == '-b':
breaksigint = True
elif opt == '-i':
device = optarg.encode("utf-8")
elif opt == '-m':
immediate = True
elif opt == '-n':
nonblock = 1
elif not is_windows and opt == '-r':
sigrestart = True
elif not is_windows and opt == '-s':
catchsigint = True
elif opt == '-t':
try:
timeout = int(optarg)
except:
error('Timeout value "{}" is not a number', optarg)
if timeout < 0:
error("Timeout value {:d} is negative", timeout)
if timeout > INT_MAX:
error("Timeout value {:d} is too large (> {:d})",
timeout, INT_MAX)
else:
usage()
expression = args
errbuf = ct.create_string_buffer(pcap.PCAP_ERRBUF_SIZE)
# ----------------------------------------------------------
# Lista de dispositivos disponíveis
if device is None:
deviceList = ct.POINTER(pcap.pcap_if_t)()
if pcap.findalldevs(ct.byref(deviceList), errbuf) == -1:
error("{}", ebuf2str(errbuf))
if not deviceList:
error("Não há interfaces disponíveis para captura")
print('\nAvailable network devices:')
devices = deviceList
while devices:
device = devices.contents
print("\t[*] {}".format(device.name.decode("utf-8")))
devices = device.next
device = deviceList[0].name
ip_tuple = funcoes_auxiliares.getStdIp(deviceList[0])
(ip_afinet, ip_afinet6) = ip_tuple
# print(ip_afinet)
# print(ip_afinet6)
pcap.freealldevs(deviceList)
# ----------------------------------------------------------
errbuf[0] = b"[=12=]"
# if not is_windows:
# # If we were told to catch SIGINT, do so.
# if catchsigint:
# action = sigaction()
# action.sa_handler = sigint_handler
# sigemptyset(ct.byref(action.sa_mask))
# # Should SIGINT interrupt, or restart, system calls?
# action.sa_flags = SA_RESTART if sigrestart else 0
# if sigaction(SIGINT, ct.byref(action), NULL) == -1:
# error("Can't catch SIGINT: {}", strerror(errno))
pd = pcap.create(device, errbuf)
if not pd:
error("{}", ebuf2str(errbuf))
# define o comprimento do instantâneo a ser usado
# em um identificador de captura quando o
# identificador é ativado para snaplen.
status = pcap.set_snaplen(pd, 65535)
if status != 0:
error("{}: pcap.set_snaplen failed: {}",
device2str(device), status2str(status));
if immediate:
try:
status = pcap.set_immediate_mode(pd, 1)
except AttributeError:
error("pcap.set_immediate_mode is not available on this platform")
if status != 0:
error("{}: pcap.set_immediate_mode failed: {}",
device2str(device), status2str(status))
status = pcap.set_timeout(pd, timeout)
if status != 0:
error("{}: pcap.set_timeout failed: {}",
device2str(device), status2str(status))
status = pcap.activate(pd)
if status < 0:
# pcap.activate() failed.
error("{}: {}\n({})",
device2str(device), status2str(status), geterr2str(pd))
elif status > 0:
# pcap.activate() succeeded, but it's warning us
# of a problem it had.
warning("{}: {}\n({})",
device2str(device), status2str(status), geterr2str(pd))
localnet = pcap.bpf_u_int32()
netmask = pcap.bpf_u_int32()
if pcap.lookupnet(device, ct.byref(localnet), ct.byref(netmask), errbuf) < 0:
localnet = pcap.bpf_u_int32(0)
netmask = pcap.bpf_u_int32(0)
warning("{}", ebuf2str(errbuf))
fcode = pcap.bpf_program()
cmdbuf = " ".join(expression).encode("utf-8")
if pcap.compile(pd, ct.byref(fcode), cmdbuf, 1, netmask) < 0:
error("{}", geterr2str(pd))
if pcap.setfilter(pd, ct.byref(fcode)) < 0:
error("{}", geterr2str(pd))
if pcap.setnonblock(pd, nonblock, errbuf) == -1:
error("pcap.setnonblock failed: {}", ebuf2str(errbuf))
# -----------------------------------------
# create a save file for write
savefile = 'testsavefile'
savefile = savefile.encode('utf-8')
pdd = pcap.dump_open_append(pd, savefile)
if not pdd:
error("{}", geterr2str(pd))
# -----------------------------------------
print("\nSniffing on device: \n\t*{}*".format(device2str(device)))
if os.path.exists("qtdPktpTime.txt"):
os.remove("qtdPktpTime.txt")
cont_i = 0
while cont_i <= 10:
# while True:
packet_count = ct.c_int(0)
status = pcap.dispatch(pd, -1, processing_pkts,
ct.cast(ct.pointer(packet_count), ct.POINTER(ct.c_ubyte)))
# execute for write in save file
pcap.dispatch(pd, -1, pcap.dump,
ct.cast(pdd, ct.POINTER(ct.c_ubyte)))
if status < 0:
break
if status != 0:
print("\n{:d} packets seen, {:d} packets counted after "
"pcap.dispatch returns".format(status, packet_count.value))
ps = pcap.stat()
pcap.stats(pd, ct.byref(ps))
print("{:d} ps_recv, {:d} ps_drop, {:d} ps_ifdrop".format(
ps.ps_recv, ps.ps_drop, ps.ps_ifdrop))
print("\n")
cont_i += 1
if status == pcap.PCAP_ERROR_BREAK:
# We got interrupted, so perhaps we didn't manage to finish a
# line we were printing. Print an extra newline, just in case.
print()
print("Broken out of loop from SIGINT handler")
sys.stdout.flush()
if status == pcap.PCAP_ERROR:
# Error. Report it.
print("{}: pcap.dispatch: {}".format(program_name, geterr2str(pd)),
file=sys.stderr)
pcap.freecode(ct.byref(fcode))
pcap.close(pd)
funcoes_auxiliares.plotGraf1()
return 1 if status == -1 else 0
# --------------------------------------------------------------------------------------
# AUXILIAR FUNCTIONS
# --------------------------------------------------------------------------------------
# Convert a string of 6 characters of ethernet address into a dash separated hex string
def eth_addr (a) :
b = "%.2x:%.2x:%.2x:%.2x:%.2x:%.2x" % (ord(a[0]) , ord(a[1]) , ord(a[2]), ord(a[3]), ord(a[4]) , ord(a[5]))
return b
# Função callback usada no processamento dos pacotes
@pcap.pcap_handler
def processing_pkts(arg, hdr, pkt):
counterp = ct.cast(arg, ct.POINTER(ct.c_int))
counterp[0] += 1
old_ts_tv_sec = 0
old_ts_tv_usec = 0
pkt_addr = hdr.contents
pkt_time_cap = pkt_addr.ts
pkt_len = pkt_addr.len
pkt_caplen = pkt_addr.caplen
eth_length = 14
eth_header = pkt[:eth_length]
eth = unpack('!6s6sH' , bytes(eth_header))
eth_protocol = socket.ntohs(eth[2])
count_pkt = 0
delay = (pkt_time_cap.tv_sec - old_ts_tv_sec) * 1000000 - old_ts_tv_usec + pkt_time_cap.tv_usec
Bps = (ct.cast(pkt,ct.POINTER(ct.c_uint64))[1] * 8 * 1000000) / (delay)
Pps = (ct.cast(pkt,ct.POINTER(ct.c_uint64))[0] * 1000000) / delay
old_ts_tv_sec = pkt_time_cap.tv_sec
old_ts_tv_usec = pkt_time_cap.tv_usec
print('Bps: ' + str(int(Bps)))
print('Pps: ' + str(int(Pps)))
time_cap = time.ctime(pkt_time_cap.tv_sec)
# Parser IP packets
if eth_protocol == 8:
ip_header = pkt[eth_length:20+eth_length]
iph = unpack('!BBHHHBBH4s4s' , bytes(ip_header))
version_ihl = iph[0]
version = version_ihl >> 4
ihl = version_ihl & 0xF
iph_length = ihl * 4
ttl = iph[5]
protocol = iph[6]
s_addr = socket.inet_ntoa(iph[8]);
d_addr = socket.inet_ntoa(iph[9]);
# print('Version: ' + str(version) + '| IP Header Length: ' + str(ihl) + ' | ' + 'TTL: ' + str(ttl) + ' | ' + 'Protocol: ' + str(protocol) + ' | ' +'Source Address: ' + str(s_addr) + ' | ' +'Destination Address: ' + str(d_addr) + ' | ' + 'length: ' + str(pkt_len) + ' | ' + 'cap length: ' + str(pkt_caplen) + ' | ' + 'Cap time: ' + str(pkt_time_cap))
print('\nVersion: ' + str(version) + '| IP Header Length: ' + str(ihl) + ' | ' + 'TTL: ' + str(ttl) + ' | ' + 'Protocol: ' + str(protocol) + ' | ' +'Source Address: ' + str(s_addr) + ' | ' +'Destination Address: ' + str(d_addr) + ' | ' + 'length: ' + str(pkt_len))
count_pkt = 0
if s_addr == ip_afinet:
# print('confirma ip')
count_pkt += 1
arq_qtdPktpTime = open("qtdPktpTime.txt", "a")
arq_qtdPktpTime.write(time_cap + ',' + str(counterp[0]) + '\n')
# funcoes_auxiliares.plotGraf1()
# TCP protocol
if protocol == 6:
t = iph_length + eth_length
tcp_header = pkt[t:t+20]
tcph = unpack('!HHLLBBHHH' , bytes(tcp_header))
source_port = tcph[0]
dest_port = tcph[1]
sequence = tcph[2]
acknowledgement = tcph[3]
doff_reserved = tcph[4]
tcph_length = doff_reserved >> 4
print('Source Port: ' + str(source_port) + ' >> Dest Port: ' + str(dest_port) + ' >> Sequence Number: ' + str(sequence) + ' >> Acknowledgement: ' + str(acknowledgement) + ' >> TCP header length: ' + str(tcph_length))
h_size = eth_length + iph_length + tcph_length * 4
# data_size = len(pkt) - h_size
# data_size = pkt_len - h_size
#get data from the packet
data = pkt[h_size:pkt_len]
# print('Data : ' + str(data))
# UDP procotol
elif protocol == 17:
u = iph_length + eth_length
udph_length = 8
udp_header = pkt[u:u+8]
udph = unpack('!HHHH' , bytes(udp_header))
source_port = udph[0]
dest_port = udph[1]
length = udph[2]
checksum = udph[3]
# print('Source Port: ' + str(source_port) + ' >> Dest Port: ' + str(dest_port) + '>> Length: ' + str(length) + ' >> Checksum: ' + str(checksum))
print('Source Port: ' + str(source_port) + ' >> Dest Port: ' + str(dest_port) + ' >> Checksum: ' + str(checksum))
h_size = eth_length + iph_length + udph_length
# data = pkt[h_size:pkt_len]
# print(data)
# pcap.dump
# função para captura e uso de parametros no execução do código no terminal
def usage():
print("Usage: {} [ {} ] [ -i interface ] [ -t timeout] "
"[ expression ]".format(program_name,
"-mn" if is_windows else "-bmnrs"), file=sys.stderr)
sys.exit(1)
# --------------------------------------------------------------------------------------
if __name__.rpartition(".")[-1] == "__main__":
sys.exit(main())
可以帮我解决这个问题吗??我很感激任何提示
OP 在 Python 中移植并产生错误的 C 代码是:
/* Calculate the delay in microseconds from the last sample. This value
* is obtained from the timestamp that the associated with the sample. */
delay = (header->ts.tv_sec - old_ts->tv_sec) * 1000000
- old_ts->tv_usec + header->ts.tv_usec;
/* Get the number of Bits per second */
Bps.QuadPart=(((*(LONGLONG*)(pkt_data + 8)) * 8 * 1000000) / (delay));
/* ^ ^
| |
| |
| |
converts bytes in bits -- |
|
delay is expressed in microseconds --
*/
/* Get the number of Packets per second */
Pps.QuadPart=(((*(LONGLONG*)(pkt_data)) * 1000000) / (delay));
Python 代码缺少 pkt_data + 8
字节指针计算的转换和取消引用。由于*(LONGLONG*)(pkt_data + 8) == ((LONGLONG*)(pkt_data))[1]
,Python代码实现了后一版本的转换和取消引用:
delay = (pkt_time_cap.tv_sec - old_ts_tv_sec) * 1000000 - old_ts_tv_usec + pkt_time_cap.tv_usec
Bps = ct.cast(pkt,ct.POINTER(ct.c_uint64))[1] * 8 * 1000000 / delay
Pps = ct.cast(pkt,ct.POINTER(ct.c_uint64))[0] * 1000000 / delay
由于没有功能示例,我将通过创建指向两个 64 位值的字节指针来测试代码是否正确来进行演示。原始代码是将 8 添加到字节指针,转换为 64 位指针并取消引用它。这与直接转换为 64 位指针并索引第二个值是一样的,例如*(LONGLONG*)(pkt_data + 8) == ((LONGLONG*)(pkt_data))[1]
:
>>> import ctypes as ct
>>> data = (ct.c_uint64 * 2)(100,200)
>>> pkt = ct.cast(ct.byref(data), ct.POINTER(ct.c_ubyte))
>>> pkt
<__main__.LP_c_ubyte object at 0x000002D38C9BA5C0>
>>> pkt + 8
Traceback (most recent call last):
File "<interactive input>", line 1, in <module>
TypeError: unsupported operand type(s) for +: 'LP_c_ubyte' and 'int'
由于 pkt
是一个字节指针,上面显示了与 OP 相同的错误。下面将指针转换回 64 位指针并访问第二个值:
>>> ct.cast(pkt,ct.POINTER(ct.c_uint64))[1]
200
我正在创建一个函数来使用 Python 和 pcap 计算数据包捕获延迟,但是发生了这个错误。由于我是 Python 的新手,所以我从 C 示例中获取了这个函数。这是错误:
Traceback (most recent call last):
File "prototype_sniffer_v1.py", line 266, in processing_pkts
Bps = ((pkt + 8) * 8 * 1000000) / (delay) TypeError: unsupported operand type(s) for +: 'LP_c_ubyte' and 'int'
Exception ignored on calling ctypes callback function: <function processing_pkts at 0x7fbed299df70>
此 link Gathering Statistics on the network traffic 包含 C 代码示例。
这是 C 示例:
#include <stdlib.h>
#include <stdio.h>
#include <pcap.h>
#include <tchar.h>
BOOL LoadNpcapDlls()
{
_TCHAR npcap_dir[512];
UINT len;
len = GetSystemDirectory(npcap_dir, 480);
if (!len) {
fprintf(stderr, "Error in GetSystemDirectory: %x", GetLastError());
return FALSE;
}
_tcscat_s(npcap_dir, 512, _T("\Npcap"));
if (SetDllDirectory(npcap_dir) == 0) {
fprintf(stderr, "Error in SetDllDirectory: %x", GetLastError());
return FALSE;
}
return TRUE;
}
void usage();
void dispatcher_handler(u_char *, const struct pcap_pkthdr *, const u_char *);
void main(int argc, char **argv)
{
pcap_t *fp;
char errbuf[PCAP_ERRBUF_SIZE];
struct timeval st_ts;
u_int netmask;
struct bpf_program fcode;
/* Load Npcap and its functions. */
if (!LoadNpcapDlls())
{
fprintf(stderr, "Couldn't load Npcap\n");
exit(1);
}
/* Check the validity of the command line */
if (argc != 2)
{
usage();
return;
}
/* Open the output adapter */
if ( (fp= pcap_open(argv[1], 100, PCAP_OPENFLAG_PROMISCUOUS,
1000, NULL, errbuf) ) == NULL)
{
fprintf(stderr,"\nUnable to open adapter %s.\n", errbuf);
return;
}
/* Don't care about netmask, it won't be used for this filter */
netmask=0xffffff;
//compile the filter
if (pcap_compile(fp, &fcode, "tcp", 1, netmask) <0 )
{
fprintf(stderr,"\nUnable to compile the packet filter. Check the syntax.\n");
/* Free the device list */
return;
}
//set the filter
if (pcap_setfilter(fp, &fcode)<0)
{
fprintf(stderr,"\nError setting the filter.\n");
pcap_close(fp);
/* Free the device list */
return;
}
/* Put the interface in statstics mode */
if (pcap_setmode(fp, MODE_STAT)<0)
{
fprintf(stderr,"\nError setting the mode.\n");
pcap_close(fp);
/* Free the device list */
return;
}
printf("TCP traffic summary:\n");
/* Start the main loop */
pcap_loop(fp, 0, dispatcher_handler, (PUCHAR)&st_ts);
pcap_close(fp);
return;
}
void dispatcher_handler(u_char *state,
const struct pcap_pkthdr *header,
const u_char *pkt_data)
{
struct timeval *old_ts = (struct timeval *)state;
u_int delay;
LARGE_INTEGER Bps,Pps;
struct tm ltime;
char timestr[16];
time_t local_tv_sec;
/* Calculate the delay in microseconds from the last sample. This value
* is obtained from the timestamp that the associated with the sample. */
delay = (header->ts.tv_sec - old_ts->tv_sec) * 1000000
- old_ts->tv_usec + header->ts.tv_usec;
/* Get the number of Bits per second */
Bps.QuadPart=(((*(LONGLONG*)(pkt_data + 8)) * 8 * 1000000) / (delay));
/* ^ ^
| |
| |
| |
converts bytes in bits -- |
|
delay is expressed in microseconds --
*/
/* Get the number of Packets per second */
Pps.QuadPart=(((*(LONGLONG*)(pkt_data)) * 1000000) / (delay));
/* Convert the timestamp to readable format */
local_tv_sec = header->ts.tv_sec;
localtime_s(<ime, &local_tv_sec);
strftime( timestr, sizeof timestr, "%H:%M:%S", <ime);
/* Print timestamp*/
printf("%s ", timestr);
/* Print the samples */
printf("BPS=%I64u ", Bps.QuadPart);
printf("PPS=%I64u\n", Pps.QuadPart);
//store current timestamp
old_ts->tv_sec=header->ts.tv_sec;
old_ts->tv_usec=header->ts.tv_usec;
}
void usage()
{
printf("\nShows the TCP traffic load, in bits per second and packets per second."
"\nCopyright (C) 2002 Loris Degioanni.\n");
printf("\nUsage:\n");
printf("\t tcptop adapter\n");
printf("\t You can use \"WinDump -D\" if you don't know the name of your adapters.\n");
exit(0);
}
这是我的 Python 函数:
import libpcap as pcap
import ctypes as ct
import socket
import getopt
import os
import time
import funcoes_auxiliares
from struct import *
from libpcap._platform import sockaddr, sockaddr_in, sockaddr_in6
from pcaptestutils import *
pd = ct.POINTER(pcap.pcap_t)()
if not is_windows:
breaksigint = False
#static void sigint_handler(int signum _U_)
def sigint_handler(signum):
global pd
global breaksigint
if breaksigint:
pcap.breakloop(pd)
def main(argv=sys.argv[1:]):
global program_name
program_name = os.path.basename(sys.argv[0])
global pd
global breaksigint
global ip_afinet
global ip_afinet6
# Exceção para os parâmetros
try:
opts, args = getopt.getopt(argv, "i:mnt:" if is_windows else "bi:mnrst:")
except getopt.GetoptError:
usage()
device = None
timeout = 1000
nonblock = 0
immediate = False
if not is_windows:
sigrestart = False
catchsigint = False
for opt, optarg in opts:
if not is_windows and opt == '-b':
breaksigint = True
elif opt == '-i':
device = optarg.encode("utf-8")
elif opt == '-m':
immediate = True
elif opt == '-n':
nonblock = 1
elif not is_windows and opt == '-r':
sigrestart = True
elif not is_windows and opt == '-s':
catchsigint = True
elif opt == '-t':
try:
timeout = int(optarg)
except:
error('Timeout value "{}" is not a number', optarg)
if timeout < 0:
error("Timeout value {:d} is negative", timeout)
if timeout > INT_MAX:
error("Timeout value {:d} is too large (> {:d})",
timeout, INT_MAX)
else:
usage()
expression = args
errbuf = ct.create_string_buffer(pcap.PCAP_ERRBUF_SIZE)
# ----------------------------------------------------------
# Lista de dispositivos disponíveis
if device is None:
deviceList = ct.POINTER(pcap.pcap_if_t)()
if pcap.findalldevs(ct.byref(deviceList), errbuf) == -1:
error("{}", ebuf2str(errbuf))
if not deviceList:
error("Não há interfaces disponíveis para captura")
print('\nAvailable network devices:')
devices = deviceList
while devices:
device = devices.contents
print("\t[*] {}".format(device.name.decode("utf-8")))
devices = device.next
device = deviceList[0].name
ip_tuple = funcoes_auxiliares.getStdIp(deviceList[0])
(ip_afinet, ip_afinet6) = ip_tuple
# print(ip_afinet)
# print(ip_afinet6)
pcap.freealldevs(deviceList)
# ----------------------------------------------------------
errbuf[0] = b"[=12=]"
# if not is_windows:
# # If we were told to catch SIGINT, do so.
# if catchsigint:
# action = sigaction()
# action.sa_handler = sigint_handler
# sigemptyset(ct.byref(action.sa_mask))
# # Should SIGINT interrupt, or restart, system calls?
# action.sa_flags = SA_RESTART if sigrestart else 0
# if sigaction(SIGINT, ct.byref(action), NULL) == -1:
# error("Can't catch SIGINT: {}", strerror(errno))
pd = pcap.create(device, errbuf)
if not pd:
error("{}", ebuf2str(errbuf))
# define o comprimento do instantâneo a ser usado
# em um identificador de captura quando o
# identificador é ativado para snaplen.
status = pcap.set_snaplen(pd, 65535)
if status != 0:
error("{}: pcap.set_snaplen failed: {}",
device2str(device), status2str(status));
if immediate:
try:
status = pcap.set_immediate_mode(pd, 1)
except AttributeError:
error("pcap.set_immediate_mode is not available on this platform")
if status != 0:
error("{}: pcap.set_immediate_mode failed: {}",
device2str(device), status2str(status))
status = pcap.set_timeout(pd, timeout)
if status != 0:
error("{}: pcap.set_timeout failed: {}",
device2str(device), status2str(status))
status = pcap.activate(pd)
if status < 0:
# pcap.activate() failed.
error("{}: {}\n({})",
device2str(device), status2str(status), geterr2str(pd))
elif status > 0:
# pcap.activate() succeeded, but it's warning us
# of a problem it had.
warning("{}: {}\n({})",
device2str(device), status2str(status), geterr2str(pd))
localnet = pcap.bpf_u_int32()
netmask = pcap.bpf_u_int32()
if pcap.lookupnet(device, ct.byref(localnet), ct.byref(netmask), errbuf) < 0:
localnet = pcap.bpf_u_int32(0)
netmask = pcap.bpf_u_int32(0)
warning("{}", ebuf2str(errbuf))
fcode = pcap.bpf_program()
cmdbuf = " ".join(expression).encode("utf-8")
if pcap.compile(pd, ct.byref(fcode), cmdbuf, 1, netmask) < 0:
error("{}", geterr2str(pd))
if pcap.setfilter(pd, ct.byref(fcode)) < 0:
error("{}", geterr2str(pd))
if pcap.setnonblock(pd, nonblock, errbuf) == -1:
error("pcap.setnonblock failed: {}", ebuf2str(errbuf))
# -----------------------------------------
# create a save file for write
savefile = 'testsavefile'
savefile = savefile.encode('utf-8')
pdd = pcap.dump_open_append(pd, savefile)
if not pdd:
error("{}", geterr2str(pd))
# -----------------------------------------
print("\nSniffing on device: \n\t*{}*".format(device2str(device)))
if os.path.exists("qtdPktpTime.txt"):
os.remove("qtdPktpTime.txt")
cont_i = 0
while cont_i <= 10:
# while True:
packet_count = ct.c_int(0)
status = pcap.dispatch(pd, -1, processing_pkts,
ct.cast(ct.pointer(packet_count), ct.POINTER(ct.c_ubyte)))
# execute for write in save file
pcap.dispatch(pd, -1, pcap.dump,
ct.cast(pdd, ct.POINTER(ct.c_ubyte)))
if status < 0:
break
if status != 0:
print("\n{:d} packets seen, {:d} packets counted after "
"pcap.dispatch returns".format(status, packet_count.value))
ps = pcap.stat()
pcap.stats(pd, ct.byref(ps))
print("{:d} ps_recv, {:d} ps_drop, {:d} ps_ifdrop".format(
ps.ps_recv, ps.ps_drop, ps.ps_ifdrop))
print("\n")
cont_i += 1
if status == pcap.PCAP_ERROR_BREAK:
# We got interrupted, so perhaps we didn't manage to finish a
# line we were printing. Print an extra newline, just in case.
print()
print("Broken out of loop from SIGINT handler")
sys.stdout.flush()
if status == pcap.PCAP_ERROR:
# Error. Report it.
print("{}: pcap.dispatch: {}".format(program_name, geterr2str(pd)),
file=sys.stderr)
pcap.freecode(ct.byref(fcode))
pcap.close(pd)
funcoes_auxiliares.plotGraf1()
return 1 if status == -1 else 0
# --------------------------------------------------------------------------------------
# AUXILIAR FUNCTIONS
# --------------------------------------------------------------------------------------
# Convert a string of 6 characters of ethernet address into a dash separated hex string
def eth_addr (a) :
b = "%.2x:%.2x:%.2x:%.2x:%.2x:%.2x" % (ord(a[0]) , ord(a[1]) , ord(a[2]), ord(a[3]), ord(a[4]) , ord(a[5]))
return b
# Função callback usada no processamento dos pacotes
@pcap.pcap_handler
def processing_pkts(arg, hdr, pkt):
counterp = ct.cast(arg, ct.POINTER(ct.c_int))
counterp[0] += 1
old_ts_tv_sec = 0
old_ts_tv_usec = 0
pkt_addr = hdr.contents
pkt_time_cap = pkt_addr.ts
pkt_len = pkt_addr.len
pkt_caplen = pkt_addr.caplen
eth_length = 14
eth_header = pkt[:eth_length]
eth = unpack('!6s6sH' , bytes(eth_header))
eth_protocol = socket.ntohs(eth[2])
count_pkt = 0
delay = (pkt_time_cap.tv_sec - old_ts_tv_sec) * 1000000 - old_ts_tv_usec + pkt_time_cap.tv_usec
Bps = (ct.cast(pkt,ct.POINTER(ct.c_uint64))[1] * 8 * 1000000) / (delay)
Pps = (ct.cast(pkt,ct.POINTER(ct.c_uint64))[0] * 1000000) / delay
old_ts_tv_sec = pkt_time_cap.tv_sec
old_ts_tv_usec = pkt_time_cap.tv_usec
print('Bps: ' + str(int(Bps)))
print('Pps: ' + str(int(Pps)))
time_cap = time.ctime(pkt_time_cap.tv_sec)
# Parser IP packets
if eth_protocol == 8:
ip_header = pkt[eth_length:20+eth_length]
iph = unpack('!BBHHHBBH4s4s' , bytes(ip_header))
version_ihl = iph[0]
version = version_ihl >> 4
ihl = version_ihl & 0xF
iph_length = ihl * 4
ttl = iph[5]
protocol = iph[6]
s_addr = socket.inet_ntoa(iph[8]);
d_addr = socket.inet_ntoa(iph[9]);
# print('Version: ' + str(version) + '| IP Header Length: ' + str(ihl) + ' | ' + 'TTL: ' + str(ttl) + ' | ' + 'Protocol: ' + str(protocol) + ' | ' +'Source Address: ' + str(s_addr) + ' | ' +'Destination Address: ' + str(d_addr) + ' | ' + 'length: ' + str(pkt_len) + ' | ' + 'cap length: ' + str(pkt_caplen) + ' | ' + 'Cap time: ' + str(pkt_time_cap))
print('\nVersion: ' + str(version) + '| IP Header Length: ' + str(ihl) + ' | ' + 'TTL: ' + str(ttl) + ' | ' + 'Protocol: ' + str(protocol) + ' | ' +'Source Address: ' + str(s_addr) + ' | ' +'Destination Address: ' + str(d_addr) + ' | ' + 'length: ' + str(pkt_len))
count_pkt = 0
if s_addr == ip_afinet:
# print('confirma ip')
count_pkt += 1
arq_qtdPktpTime = open("qtdPktpTime.txt", "a")
arq_qtdPktpTime.write(time_cap + ',' + str(counterp[0]) + '\n')
# funcoes_auxiliares.plotGraf1()
# TCP protocol
if protocol == 6:
t = iph_length + eth_length
tcp_header = pkt[t:t+20]
tcph = unpack('!HHLLBBHHH' , bytes(tcp_header))
source_port = tcph[0]
dest_port = tcph[1]
sequence = tcph[2]
acknowledgement = tcph[3]
doff_reserved = tcph[4]
tcph_length = doff_reserved >> 4
print('Source Port: ' + str(source_port) + ' >> Dest Port: ' + str(dest_port) + ' >> Sequence Number: ' + str(sequence) + ' >> Acknowledgement: ' + str(acknowledgement) + ' >> TCP header length: ' + str(tcph_length))
h_size = eth_length + iph_length + tcph_length * 4
# data_size = len(pkt) - h_size
# data_size = pkt_len - h_size
#get data from the packet
data = pkt[h_size:pkt_len]
# print('Data : ' + str(data))
# UDP procotol
elif protocol == 17:
u = iph_length + eth_length
udph_length = 8
udp_header = pkt[u:u+8]
udph = unpack('!HHHH' , bytes(udp_header))
source_port = udph[0]
dest_port = udph[1]
length = udph[2]
checksum = udph[3]
# print('Source Port: ' + str(source_port) + ' >> Dest Port: ' + str(dest_port) + '>> Length: ' + str(length) + ' >> Checksum: ' + str(checksum))
print('Source Port: ' + str(source_port) + ' >> Dest Port: ' + str(dest_port) + ' >> Checksum: ' + str(checksum))
h_size = eth_length + iph_length + udph_length
# data = pkt[h_size:pkt_len]
# print(data)
# pcap.dump
# função para captura e uso de parametros no execução do código no terminal
def usage():
print("Usage: {} [ {} ] [ -i interface ] [ -t timeout] "
"[ expression ]".format(program_name,
"-mn" if is_windows else "-bmnrs"), file=sys.stderr)
sys.exit(1)
# --------------------------------------------------------------------------------------
if __name__.rpartition(".")[-1] == "__main__":
sys.exit(main())
可以帮我解决这个问题吗??我很感激任何提示
OP 在 Python 中移植并产生错误的 C 代码是:
/* Calculate the delay in microseconds from the last sample. This value
* is obtained from the timestamp that the associated with the sample. */
delay = (header->ts.tv_sec - old_ts->tv_sec) * 1000000
- old_ts->tv_usec + header->ts.tv_usec;
/* Get the number of Bits per second */
Bps.QuadPart=(((*(LONGLONG*)(pkt_data + 8)) * 8 * 1000000) / (delay));
/* ^ ^
| |
| |
| |
converts bytes in bits -- |
|
delay is expressed in microseconds --
*/
/* Get the number of Packets per second */
Pps.QuadPart=(((*(LONGLONG*)(pkt_data)) * 1000000) / (delay));
Python 代码缺少 pkt_data + 8
字节指针计算的转换和取消引用。由于*(LONGLONG*)(pkt_data + 8) == ((LONGLONG*)(pkt_data))[1]
,Python代码实现了后一版本的转换和取消引用:
delay = (pkt_time_cap.tv_sec - old_ts_tv_sec) * 1000000 - old_ts_tv_usec + pkt_time_cap.tv_usec
Bps = ct.cast(pkt,ct.POINTER(ct.c_uint64))[1] * 8 * 1000000 / delay
Pps = ct.cast(pkt,ct.POINTER(ct.c_uint64))[0] * 1000000 / delay
由于没有功能示例,我将通过创建指向两个 64 位值的字节指针来测试代码是否正确来进行演示。原始代码是将 8 添加到字节指针,转换为 64 位指针并取消引用它。这与直接转换为 64 位指针并索引第二个值是一样的,例如*(LONGLONG*)(pkt_data + 8) == ((LONGLONG*)(pkt_data))[1]
:
>>> import ctypes as ct
>>> data = (ct.c_uint64 * 2)(100,200)
>>> pkt = ct.cast(ct.byref(data), ct.POINTER(ct.c_ubyte))
>>> pkt
<__main__.LP_c_ubyte object at 0x000002D38C9BA5C0>
>>> pkt + 8
Traceback (most recent call last):
File "<interactive input>", line 1, in <module>
TypeError: unsupported operand type(s) for +: 'LP_c_ubyte' and 'int'
由于 pkt
是一个字节指针,上面显示了与 OP 相同的错误。下面将指针转换回 64 位指针并访问第二个值:
>>> ct.cast(pkt,ct.POINTER(ct.c_uint64))[1]
200