python3 - ctypes

ctypes

If you want to use a native library in python, but there is no binding, you can 'try' to interface the library with ctypes.

As I wanted to play with bpf, which is part of libpcap, which lacks a python3 binding, I decided to try ctypes.

What I wanted to do:

  • compile a bpf filter like dst port 445 and src net 127.0.0.0/8
  • match the bpf filter on a buffer

First, lets define the filter, we will need it later on:

filter = 'src port 445 and src net 127.0.0.0/8'
buffer = b'E\x00\x00[~\x1f@\x00@\x06\xbe{\x7f\x00\x00\x01\x7f\x00\x00\x01\x01\xbd\xb0~\xe5#Q\xd3\xe5g\xce\xcf\x80\x18\x01\x82\xfeO\x00\x00' # IPv4/TCP

Start with ctypes, import them, load the pcap library

import ctypes
 
pcap = ctypes.cdll.LoadLibrary("libpcap.so")

Now, we get the signature for pcap_compile_nopcap which is used to compile the bpf pattern to a bpf program:

int     pcap_compile_nopcap(int snaplen, int linktype, struct bpf_program *program,
            const char *buf, int optimize, bpf_u_int32 mask);

We create ctype arguments for all arguments, and call the library, the program argument requires special treatment, as we have to pass it by reference, but ctypes.byref makes this work.

snaplen = ctypes.c_int(40)
linktype = ctypes.c_int(12) # DLT_RAW on linux
program = ctypes.c_void_p()
buf = ctypes.c_char_p(filter)
optimize = ctypes.c_int(0)
mask = ctypes.c_int(0)
pcap.pcap_compile_nopcap(snaplen,linktype,ctypes.byref(program),buf,optimize,mask)
#= 0

The 0 indicates the filter was compiled successfully.

Now, the bpf_filter signature:

u_int bpf_filter(const struct bpf_insn * insn, const u_char *data, u_int len, u_int len2);

Problem, we have to pass an argument of type struct bpf_insn *, this argument is within bpf_program:

/*
 * Structure for "pcap_compile()", "pcap_setfilter()", etc..
 */
struct bpf_program {
        u_int bf_len;
        struct bpf_insn *bf_insns;
};

So, we have to create a ctypes Structure for the bpf_program struct, so we can access the bf_insns within:

# define the wrapping ctypes Structure for the bpf_program struct
class bpf_program(ctypes.Structure):
	_fields_ = [("bf_len", ctypes.c_int),("bf_insns", ctypes.c_void_p)]
 
program = bpf_program()
 
# rebuild the filter with the new defined program
pcap.pcap_compile_nopcap(snaplen,linktype,ctypes.byref(program),buf,optimize,mask)
#= 0

Now we have everything what it takes to call bpf_filter:

pcap.bpf_filter(program.bf_insns, buffer,len(buffer),len(buffer))
#= 40

bpf_filter returning 40 (> 0) indicates the match.

Once you are done, you have to free the compiled bpf_program again:

pcap.pcap_freecode(ctypes.byref(program))

summary

import ctypes
 
filter = 'src port 445 and src net 127.0.0.0/8'
buffer = b'E\x00\x00[~\x1f@\x00@\x06\xbe{\x7f\x00\x00\x01\x7f\x00\x00\x01\x01\xbd\xb0~\xe5#Q\xd3\xe5g\xce\xcf\x80\x18\x01\x82\xfeO\x00\x00' # IPv4/TCP
 
 
pcap = ctypes.cdll.LoadLibrary("libpcap.so")
 
# required so we can access bpf_program->bf_insns
class bpf_program(ctypes.Structure):
	_fields_ = [("bf_len", ctypes.c_int),("bf_insns", ctypes.c_void_p)]
 
# prepare args
snaplen = ctypes.c_int(40)
linktype = ctypes.c_int(12) # DLT_RAW on linux
program = bpf_program()
buf = ctypes.c_char_p(filter)
optimize = ctypes.c_int(0)
mask = ctypes.c_int(0)
 
# compile bpf_program
pcap.pcap_compile_nopcap(snaplen,linktype,ctypes.byref(program),buf,optimize,mask)
 
# call bpf_filter
pcap.bpf_filter(program.bf_insns, buffer,len(buffer),len(buffer))
 
# free bpf_program
pcap.pcap_freecode(ctypes.byref(program))

sqlite bpf_filter

As usual, there is an sqlite usecase for this, create a user defined function which matches src/dst host/port tuples with bpf filters.

Imagine you want to:

SELECT 
	* 
FROM 
	connections 
WHERE 
	bpf_filter('dst port 445 and src net 0.0.0.0/2', 
	connection_type, 
	local_host, 
	remote_host, 
	local_port, 
	remote_port, 
	connection_transport)

Possible solution:

  • sqlite
    • create the sqlite function bpf_filter
  • bpf_filter
    • create a fake IPv4/TCP header for the src/dst host/port pairs
      • created a class for it, bpfconnection, avoids scapy/…/… dependency
    • access pcap via ctypes
      • compile the bpf_program with pcap.pcap_compile_nopcap
      • match using pcap.bpf_filter
      • free the bpf_program with pcap.pcap_compile_nopcap
#!/opt/dionaea/bin/python3
 
# example how to add bpf_filter to sqlite, given dionaea logsql.sqlite as data source
# I was lazy, so it does not work with native IPv6 
# udp does not work too
# use a decent packet framework to create the IP/TCP headers - I did not to avoid a dependency
 
import ctypes
import struct
import socket
import sqlite3
 
 
def bpf_filter(pattern, contype, localhost, remotehost, localport, remoteport, proto):
	"""sqlite user provided function to match connections with bpf expressions"""
	pcap = ctypes.cdll.LoadLibrary("libpcap.so")	
 
	class bpf_program(ctypes.Structure):
		_fields_ = [("bf_len", ctypes.c_int),("bf_insns", ctypes.c_void_p)]
 
	class bpfconnection:
		"""build IPv4/TCP header for connections according to direction and src/dst host/port"""
		def __init__(self, contype, localhost, remotehost, localport, remoteport, proto, length=40):
			self.__len = 40 # which size to fake
			if contype == 'accept' or 'reject':
				srchost = remotehost
				dsthost = localhost
				srcport = remoteport
				dstport = localport
			elif contype == 'connect':
				dsthost = remotehost
				srchost = localhost
				dstport = remoteport
				srcport = localport
 
			if not srchost or srchost == '':
				srchost = '0.0.0.0'
			if not dsthost or dsthost == '':
				dsthost = '0.0.0.0'
 
			if srchost.startswith("::ffff:"):
				srchost = srchost[7:]
			if dsthost.startswith("::ffff:"):
				dsthost = dsthost[7:]
 
			try:
				self.__srchost = socket.inet_pton(socket.AF_INET6,srchost)
				self.__dsthost = socket.inet_pton(socket.AF_INET6,dsthost)
				self.__version = 0x06
			except socket.error:
				try:
					self.__srchost = socket.inet_pton(socket.AF_INET,srchost)
					self.__dsthost = socket.inet_pton(socket.AF_INET,dsthost)
					self.__version = 0x04
				except socket.error:
					print(" pattern '%s' conntype '%s' lhost '%s' rhost '%s' lport '%s' rport '%s' proto '%s'" % (pattern, contype, localhost, remotehost, localport, remoteport, proto))
					raise Exception("src dst family mismatch")
 
			self.__srcport = srcport
			self.__dstport = dstport
			if proto == 'tcp' or proto == 'tls':
				self.__protocol = 0x06
			elif proto == 'udp':
				self.__protocol = 0x17
 
		def build(self):
			ip = sub = b" " * 20
			if self.__version == 0x04: # IPv4
				ip = struct.pack('!BBHHHBBH4s4s',
					((self.__version << 4) | 5),# Version & Header length: 4bits each Version
					0, # TOS
					512, # Total Length
					0x4711, # Id
					0, # Fragment offset
					64, # TTL
					self.__protocol, # Protocol
					0, # Checksum
					self.__srchost, # src host
					self.__dsthost # remote host
				)
 
			if self.__protocol == 0x06: # TCP
				sub = struct.pack('HHIIB',
					socket.htons(self.__srcport), # source port
					socket.htons(self.__dstport), # dst port
					123, # seq nr
					234, # ack nr
					32 # header length
				)
			return ip + sub + b' ' * (40-(len(ip) + len(sub)))
 
	if contype == 'listen': # listen is not valid to match
		return False
 
	try:
		payload = bpfconnection(contype, localhost,remotehost,localport,remoteport,proto).build()
	except Exception as e:
		print(e)
		return False
	snaplen = ctypes.c_int(len(payload))
	linktype = ctypes.c_int(12)
	program = bpf_program()
	buf = ctypes.c_char_p(pattern)
	optimize = ctypes.c_int(0)
	mask = ctypes.c_int(0)
	if pcap.pcap_compile_nopcap(snaplen,linktype,ctypes.byref(program),buf,optimize,mask) != 0:
		print("could not compile pattern")
		return False
	r = pcap.bpf_filter(program.bf_insns, payload,len(payload),len(payload))
	pcap.pcap_freecode(ctypes.byref(program))
	return r != 0
 
dbh = sqlite3.connect('/opt/dionaea/var/dionaea/logsql.sqlite')
dbh.create_function('bpf_filter', 7, bpf_filter)
cursor = dbh.cursor()
r = cursor.execute("""SELECT * FROM connections WHERE bpf_filter('dst port 445 and src net 0.0.0.0/2', connection_type, local_host, remote_host, local_port, remote_port, connection_transport)""")
 
for i in r:
	print(i)

Due to sqlite function limitations, the bpf_program has to be compiled and free'd for every match, which is rather inefficient.
An alternative approach would be to create an aggregate, and compare this aggregate with the bpf_filter function, like:

SELECT 
	* 
FROM 
	connections 
WHERE 
	bpf_filter('dst port 445 and src net 0.0.0.0/2') = 
	bpf_connection(connection_type, 
	local_host, 
	remote_host, 
	local_port, 
	remote_port, 
	connection_transport)

But … I have no idea if that would provide better performance in regards of bpf_program lifetime.

Thats it.

Comments



2010/06/12/python3_-_ctypes.txt · Last modified: 2010/06/15 03:13 by common
chimeric.de = chi`s home Creative Commons License Valid CSS Driven by DokuWiki do yourself a favour and use a real browser - get firefox!! Recent changes RSS feed Valid XHTML 1.0