If you want to use a native library in python, but there is no binding, you can 'try' to interface the library with ctypes.
As I wanted to play with bpf, which is part of libpcap, which lacks a python3 binding, I decided to try ctypes.
What I wanted to do:
First, lets define the filter, we will need it later on:
filter = 'src port 445 and src net 127.0.0.0/8'
buffer = b'E\x00\x00[~\x1f@\x00@\x06\xbe{\x7f\x00\x00\x01\x7f\x00\x00\x01\x01\xbd\xb0~\xe5#Q\xd3\xe5g\xce\xcf\x80\x18\x01\x82\xfeO\x00\x00' # IPv4/TCP
Start with ctypes, import them, load the pcap library
import ctypes
pcap = ctypes.cdll.LoadLibrary("libpcap.so")
Now, we get the signature for pcap_compile_nopcap which is used to compile the bpf pattern to a bpf program:
int pcap_compile_nopcap(int snaplen, int linktype, struct bpf_program *program,
const char *buf, int optimize, bpf_u_int32 mask);
We create ctype arguments for all arguments, and call the library, the program argument requires special treatment, as we have to pass it by reference, but ctypes.byref makes this work.
snaplen = ctypes.c_int(40)
linktype = ctypes.c_int(12) # DLT_RAW on linux
program = ctypes.c_void_p()
buf = ctypes.c_char_p(filter)
optimize = ctypes.c_int(0)
mask = ctypes.c_int(0)
pcap.pcap_compile_nopcap(snaplen,linktype,ctypes.byref(program),buf,optimize,mask)
#= 0
The 0 indicates the filter was compiled successfully.
Now, the bpf_filter signature:
u_int bpf_filter(const struct bpf_insn * insn, const u_char *data, u_int len, u_int len2);
Problem, we have to pass an argument of type struct bpf_insn *, this argument is within bpf_program:
/*
* Structure for "pcap_compile()", "pcap_setfilter()", etc..
*/
struct bpf_program {
u_int bf_len;
struct bpf_insn *bf_insns;
};
So, we have to create a ctypes Structure for the bpf_program struct, so we can access the bf_insns within:
# define the wrapping ctypes Structure for the bpf_program struct
class bpf_program(ctypes.Structure):
_fields_ = [("bf_len", ctypes.c_int),("bf_insns", ctypes.c_void_p)]
program = bpf_program()
# rebuild the filter with the new defined program
pcap.pcap_compile_nopcap(snaplen,linktype,ctypes.byref(program),buf,optimize,mask)
#= 0
Now we have everything what it takes to call bpf_filter:
pcap.bpf_filter(program.bf_insns, buffer,len(buffer),len(buffer))
#= 40
bpf_filter returning 40 (> 0) indicates the match.
Once you are done, you have to free the compiled bpf_program again:
pcap.pcap_freecode(ctypes.byref(program))
import ctypes
filter = 'src port 445 and src net 127.0.0.0/8'
buffer = b'E\x00\x00[~\x1f@\x00@\x06\xbe{\x7f\x00\x00\x01\x7f\x00\x00\x01\x01\xbd\xb0~\xe5#Q\xd3\xe5g\xce\xcf\x80\x18\x01\x82\xfeO\x00\x00' # IPv4/TCP
pcap = ctypes.cdll.LoadLibrary("libpcap.so")
# required so we can access bpf_program->bf_insns
class bpf_program(ctypes.Structure):
_fields_ = [("bf_len", ctypes.c_int),("bf_insns", ctypes.c_void_p)]
# prepare args
snaplen = ctypes.c_int(40)
linktype = ctypes.c_int(12) # DLT_RAW on linux
program = bpf_program()
buf = ctypes.c_char_p(filter)
optimize = ctypes.c_int(0)
mask = ctypes.c_int(0)
# compile bpf_program
pcap.pcap_compile_nopcap(snaplen,linktype,ctypes.byref(program),buf,optimize,mask)
# call bpf_filter
pcap.bpf_filter(program.bf_insns, buffer,len(buffer),len(buffer))
# free bpf_program
pcap.pcap_freecode(ctypes.byref(program))
As usual, there is an sqlite usecase for this, create a user defined function which matches src/dst host/port tuples with bpf filters.
Imagine you want to:
SELECT
*
FROM
connections
WHERE
bpf_filter('dst port 445 and src net 0.0.0.0/2',
connection_type,
local_host,
remote_host,
local_port,
remote_port,
connection_transport)
Possible solution:
#!/opt/dionaea/bin/python3
# example how to add bpf_filter to sqlite, given dionaea logsql.sqlite as data source
# I was lazy, so it does not work with native IPv6
# udp does not work too
# use a decent packet framework to create the IP/TCP headers - I did not to avoid a dependency
import ctypes
import struct
import socket
import sqlite3
def bpf_filter(pattern, contype, localhost, remotehost, localport, remoteport, proto):
"""sqlite user provided function to match connections with bpf expressions"""
pcap = ctypes.cdll.LoadLibrary("libpcap.so")
class bpf_program(ctypes.Structure):
_fields_ = [("bf_len", ctypes.c_int),("bf_insns", ctypes.c_void_p)]
class bpfconnection:
"""build IPv4/TCP header for connections according to direction and src/dst host/port"""
def __init__(self, contype, localhost, remotehost, localport, remoteport, proto, length=40):
self.__len = 40 # which size to fake
if contype == 'accept' or 'reject':
srchost = remotehost
dsthost = localhost
srcport = remoteport
dstport = localport
elif contype == 'connect':
dsthost = remotehost
srchost = localhost
dstport = remoteport
srcport = localport
if not srchost or srchost == '':
srchost = '0.0.0.0'
if not dsthost or dsthost == '':
dsthost = '0.0.0.0'
if srchost.startswith("::ffff:"):
srchost = srchost[7:]
if dsthost.startswith("::ffff:"):
dsthost = dsthost[7:]
try:
self.__srchost = socket.inet_pton(socket.AF_INET6,srchost)
self.__dsthost = socket.inet_pton(socket.AF_INET6,dsthost)
self.__version = 0x06
except socket.error:
try:
self.__srchost = socket.inet_pton(socket.AF_INET,srchost)
self.__dsthost = socket.inet_pton(socket.AF_INET,dsthost)
self.__version = 0x04
except socket.error:
print(" pattern '%s' conntype '%s' lhost '%s' rhost '%s' lport '%s' rport '%s' proto '%s'" % (pattern, contype, localhost, remotehost, localport, remoteport, proto))
raise Exception("src dst family mismatch")
self.__srcport = srcport
self.__dstport = dstport
if proto == 'tcp' or proto == 'tls':
self.__protocol = 0x06
elif proto == 'udp':
self.__protocol = 0x17
def build(self):
ip = sub = b" " * 20
if self.__version == 0x04: # IPv4
ip = struct.pack('!BBHHHBBH4s4s',
((self.__version << 4) | 5),# Version & Header length: 4bits each Version
0, # TOS
512, # Total Length
0x4711, # Id
0, # Fragment offset
64, # TTL
self.__protocol, # Protocol
0, # Checksum
self.__srchost, # src host
self.__dsthost # remote host
)
if self.__protocol == 0x06: # TCP
sub = struct.pack('HHIIB',
socket.htons(self.__srcport), # source port
socket.htons(self.__dstport), # dst port
123, # seq nr
234, # ack nr
32 # header length
)
return ip + sub + b' ' * (40-(len(ip) + len(sub)))
if contype == 'listen': # listen is not valid to match
return False
try:
payload = bpfconnection(contype, localhost,remotehost,localport,remoteport,proto).build()
except Exception as e:
print(e)
return False
snaplen = ctypes.c_int(len(payload))
linktype = ctypes.c_int(12)
program = bpf_program()
buf = ctypes.c_char_p(pattern)
optimize = ctypes.c_int(0)
mask = ctypes.c_int(0)
if pcap.pcap_compile_nopcap(snaplen,linktype,ctypes.byref(program),buf,optimize,mask) != 0:
print("could not compile pattern")
return False
r = pcap.bpf_filter(program.bf_insns, payload,len(payload),len(payload))
pcap.pcap_freecode(ctypes.byref(program))
return r != 0
dbh = sqlite3.connect('/opt/dionaea/var/dionaea/logsql.sqlite')
dbh.create_function('bpf_filter', 7, bpf_filter)
cursor = dbh.cursor()
r = cursor.execute("""SELECT * FROM connections WHERE bpf_filter('dst port 445 and src net 0.0.0.0/2', connection_type, local_host, remote_host, local_port, remote_port, connection_transport)""")
for i in r:
print(i)
Due to sqlite function limitations, the bpf_program has to be compiled and free'd for every match, which is rather inefficient.
An alternative approach would be to create an aggregate, and compare this aggregate with the bpf_filter function, like:
SELECT
*
FROM
connections
WHERE
bpf_filter('dst port 445 and src net 0.0.0.0/2') =
bpf_connection(connection_type,
local_host,
remote_host,
local_port,
remote_port,
connection_transport)
But … I have no idea if that would provide better performance in regards of bpf_program lifetime.
Thats it.