git-daemon logfile processing

dionaea git useage
the git-daemon activity for the dionaea.git repository, pull and uniq hosts/day, basically 5-10 users update their software daily.

Often the most complex part in data visualization is the processing before you can provide the data in a format your visualization software understands.
I choose the git-daemon logs as an example of such an case.
One could have used sshd logs as an example too, but I choose this, as I'm pretty sure there is no parser for the git-daemon logfiles. In doubt, I'm pretty confident, one could adjust this git-daemon parser to deal with sshd too.

basics

messages

Parsing git-daemon logs is special, as the entry for a single client spans multiple lines, and is bound via the pid.

2010-07-02_19:23:24.86055 [31206] Connection from 216.34.181.151:10534
2010-07-02_19:23:24.86064 [31206] Extended attributes (23 bytes) exist <host=git.carnivore.it>
2010-07-02_19:23:24.93354 [31206] Request upload-pack for '/honeytrap.git'
2010-07-02_19:23:26.14593 [31206] Disconnected

message format

So, basically, we have a header followed by different messages types with different fields of interest:

		self.format = { 'head' : ['connection','request','disconnect','extended'] }

message type

We define regular expressions for each format type:

		self.regex = { 
			'head': re.compile('(?P<timestamp>[\d]{4}-[\d]{2}-[\d]{2}_[\d]{2}:[\d]{2}:[\d]{2}\.[\d]*) \[(?P<pid>[\d]*)\] '),
			'connection' : re.compile('Connection from (?P<host>[\d\.]|\[[\d\w\.:]\]*)'),
			'disconnect' : re.compile('Disconnected'),
			'request' : re.compile('Request upload-pack for \'(?P<repository>.*)\''),
			'extended': re.compile('Extended attributes \(([\d]*) bytes\) exist <(?P<attributes>.*)>'),
		}

substring aliasing

As we will rename the regex substrings to the outer format name, we provide a alias map, so we can adjust the name on the fly, and extract only the data we really want, no alias mapping, the data will not get stored.

		self.alias = { 
			'connection_timestamp' : 'start',
			'connection_host': 'host',
			'connection_pid': 'pid',
 
			'extended_pid' : 'pid',
			'extended_attributes' : 'attributes',
 
			'request_repository': 'repo',
			'request_pid': 'pid',
 
			'disconnect_timestamp' : 'stop',
			'disconnect_pid' : 'pid'
		}

data conversions

Some datatypes are special, and require conversation before storing them, here it is timestamps and hosts.

		self.convert = {
			'start' : self.convert_timestamp,
			'stop' : self.convert_timestamp,
			'host' : self.convert_host,
		}

For the timestamp, we convert to unix epoch, for the host, we strip the surrounding [] from the host.

	def convert_timestamp(self, t):
		return time.mktime(time.strptime(t[:19],'%Y-%m-%d_%H:%M:%S')) + float('0'+t[19:])
 
	def convert_host(self,h):
		if h[0] == '[':
			return h[1:-1]
		return h

actions

We define actions what should happen when a given message type comes up:

		self.actions = {
			'connection': self.session_new, 
			'disconnect': self.session_close,
			'request': self.session_update,
			'extended': self.session_update,
			'error': self.session_update,
			'any': self.session_update,
		}

For

  • a new session, we store the the aliased fields of the match in a dict, using the pid as key
  • all others but disconnect, we add the new keys to the dict
  • for disconnect we store the data in a sqlite table
	def session_new(self, a):
		self.processing[a['pid']] = a
 
	def session_update(self, a):
		self.processing[a['pid']].update(a)
 
	def session_close(self, a):
		if a['pid'] in self.processing:
			self.session_update(a)
			a = self.processing[a['pid']]
			del self.processing[a['pid']]
			args = []
			for c in self.colums:
				if c in a:
					args.append(a[c])
				else:
					args.append(None)
			self.cursor.execute(self.insert_stmt,args)

sqlite table layout

We have to create the sqlite table, as we will only insert keys which got aliased names, we can use the alias names as column names. As we will insert data into the table later on, we prepare a query too.

	def create(self, tablename=None):
 
		if tablename is not None:
			self.tablename = tablename
 
		self.dbh = sqlite3.connect(self.dbname)
		self.cursor = self.dbh.cursor()
 
		self.colums = [i for i in {self.alias[x]:True for x in self.alias}]
		table = """CREATE TABLE IF NOT EXISTS \n%s (\n\t%s INTEGER PRIMARY KEY,\n""" % (self.tablename, self.__class__.__name__)
 
		for c in self.colums:
			table = table + '\t' + c + ',\n'		
 
		table = table[:-2]
		table = table + '\n)'
 
		self.cursor.execute(table)
 
		self.insert_stmt = "INSERT INTO %s (" % self.tablename
		for c in self.colums:
			self.insert_stmt += c + ','
		self.insert_stmt = self.insert_stmt[:-1]
		self.insert_stmt += ") VALUES ("
		for c in self.colums:
			self.insert_stmt += '?,'
		self.insert_stmt = self.insert_stmt[:-1]
		self.insert_stmt += ")"
		print(self.insert_stmt)

This will create a table like:

CREATE TABLE IF NOT EXISTS 
gitdaemonlogs (
	gitdaemonlog INTEGER PRIMARY KEY,
	stop,
	pid,
	repo,
	start,
	host,
	warning,
	error,
	attributes,
	type
)

And an INSERT string like:

INSERT INTO gitdaemonlogs (stop,pid,repo,start,host,warning,error,attributes,type) VALUES (?,?,?,?,?,?,?,?,?)

The table and the INSERT string change if you change the aliases, no need to adjust the table layout by hand.

processing

Now, we need to

  • match the first level
  • match the second level
  • dissect the substrings
  • rename the substrings based on the aliases
  • run the appropriate action for the match

For convenience, we provide a static callback, which can used to act on unmatched messages:

	@classmethod
	def unmatched_cb(cls,data):
		print("%s could not match '%s'" % (cls.__name__, data) )
	def feed(self,data):
		for f in self.format:
			# first level matching
			r = self.regex[f]
			m = r.match(data)
			if m is not None:
				# success matching first level
				remain = line[m.end():]
				for s in self.format[f]:
					# second level match
					rs = self.regex[s]
					ms = rs.match(remain)
					if ms is not None:
						# second level match success
 
						a = m.groupdict()
 
						# merge dicts for both formats
						a.update(ms.groupdict())
 
						# rename the keys according to self.alias
						# filter all keys not in self.alias
						keys = [x for x in filter(lambda x: s+'_'+x in self.alias, a.keys())]
						b = {self.alias[s+'_'+i]:a[i] for i in keys}
 
						# convert values if required
						for i in b:
							if i in self.convert:
								b[i] = self.convert[i](b[i])
 
						# hack for lines without pid
						if 'pid' not in b:
							b['pid'] = self.prev['pid']
						else:
							self.prev = b
 
						# act
						if s in self.actions:
							self.actions[s](b)
						return
				self.unmatched_cb(remain)
				return
		self.unmatched_cb(data)			

feeding

Last thing left is feeding data:

m = gitdaemonlog()
 
f = open("git-daemon-log","r")
for line in f.readlines():
	line = line.rstrip('\r\n')
	m.feed(line)
 
m.close()

special cases

To make things worse, there are some lines without a pid:

2010-04-29_07:54:29.55718 fatal: The remote end hung up unexpectedly

Therefore we store the previous matched element on self.prev, and reuse the message for followups without pid.

more features

This is very basic, I just wanted to point some place where a well structured framework is required: log file parsing with data extraction, manipulation and storing.
One could add

  • updating a file like awstat does via cron
  • tailing a file or pipe
  • indexes on the columns
  • commit once for each 100 entries INSERT'ed into the db

In any way, you get a sqlite database in the end.

visualisation

sqlite

SELECT
	strftime('%Y-%m-%d',start,'unixepoch','localtime') AS date, 
	repo,
	COUNT(host) AS pulls, 
	COUNT(DISTINCT host) AS hosts 
FROM
	gitdaemonlogs 
WHERE
	repo = '/dionaea.git' 
GROUP BY
	strftime('%Y-%m-%d',start,'unixepoch','localtime'),repo;

dionaea-git.sql

sqlite3 gitdaemonlog.sqlite
.timer on
.output dionaea-git.txt
.read dionaea-git.sql

gnuplot

set terminal png size 600,240 nocrop butt font "/usr/share/fonts/truetype/ttf-liberation/LiberationSans-Regular.ttf" 8
set output "dionaea-git.png"
set xdata time
set timefmt "%Y-%m-%d"
set format x "%b %d"
set ylabel "pulls"
unset y2label
unset y2tics
set datafile separator "|"
plot 'dionaea-git.txt' using 1:3 title "pulls" with lines, "" using 1:4 title "hosts" with lines axes x1y1

better visualisation?

It would be great if graphing software, such as gnuplot, could interface a database as data source, all you'd have to do is compile a query to retrieve the data you are looking for, one could create views on the tables, showing different aspects of it …
But I'm totally unaware of software which would fill the gap between logfiles and visualisation software.

src

This snippets has some more matches than the outlined above, it is meant to work with python3

#!/usr/bin/python3.1
 
import re
import sqlite3
import time
 
class gitdaemonlog:
	def __init__(self):
		self.format = { 'head' : [
							# good
							'connection','request','disconnect','extended', 
 
							# bad - errors
							'error'],
 
						# ugly
						'fatal' : [
							'any'] 
			}
		self.regex = { 
			# got logs
			'head': re.compile('(?P<timestamp>[\d]{4}-[\d]{2}-[\d]{2}_[\d]{2}:[\d]{2}:[\d]{2}\.[\d]*) \[(?P<pid>[\d]*)\] '),
			'connection' : re.compile('^Connection from (?P<host>\[([\d\.:\w]*)\]|([\d\.]*))'),
			'disconnect' : re.compile('^Disconnected'),
			'request' : re.compile('^Request (?P<type>[\w]*)-pack for \'(?P<repository>.*)\''),
			'extended': re.compile('^Extended attributes \(([\d]*) bytes\) exist <(?P<attributes>.*)>'),
 
			'error' : re.compile('^\'(.*)\': (?P<string>.*)'),
 
			# errors without pid
			'fatal': re.compile('(?P<timestamp>[\d]{4}-[\d]{2}-[\d]{2}_[\d]{2}:[\d]{2}:[\d]{2}\.[\d]*) fatal: '),
			'any': re.compile('(?P<string>.*)')
		}
 
		self.actions = {
			'connection': self.session_new, 
			'disconnect': self.session_close,
			'request': self.session_update,
			'extended': self.session_update,
			'error': self.session_update,
			'any': self.session_update,
		}
 
		self.alias = { 
			'connection_timestamp' : 'start',
			'connection_pid': 'pid',
			'connection_host': 'host',
 
			'extended_pid' : 'pid',
			'extended_attributes' : 'attributes',
 
			'request_pid': 'pid',			
			'request_type': 'type',
			'request_repository': 'repo',
 
			'disconnect_pid' : 'pid',
			'disconnect_timestamp' : 'stop',
 
			'error_pid': 'pid',
			'error_string' : 'error',
 
#			fatal has no pid, we borrow from prev element
			'any_string': 'warning',
		}
 
		self.convert = {
			'start' : self.convert_timestamp,
			'stop' : self.convert_timestamp,
			'host' : self.convert_host,
		}
 
		self.processing = {}
		self.parsed = []
		self.tablename = self.__class__.__name__ + 's'
		self.dbname = self.__class__.__name__ + '.sqlite'
 
		self.create()
		self.prev = None
 
	def create(self, tablename=None):
 
		if tablename is not None:
			self.tablename = tablename
 
		self.dbh = sqlite3.connect(self.dbname)
		self.cursor = self.dbh.cursor()
 
		self.colums = [i for i in {self.alias[x]:True for x in self.alias}]
		table = """CREATE TABLE IF NOT EXISTS \n%s (\n\t%s INTEGER PRIMARY KEY,\n""" % (self.tablename, self.__class__.__name__)
 
		for c in self.colums:
			table = table + '\t' + c + ',\n'		
 
		table = table[:-2]
		table = table + '\n)'
		print(table)
 
		self.cursor.execute(table)
 
		self.insert_stmt = "INSERT INTO %s (" % self.tablename
		for c in self.colums:
			self.insert_stmt += c + ','
		self.insert_stmt = self.insert_stmt[:-1]
		self.insert_stmt += ") VALUES ("
		for c in self.colums:
			self.insert_stmt += '?,'
		self.insert_stmt = self.insert_stmt[:-1]
		self.insert_stmt += ")"
		print(self.insert_stmt)
 
	def close(self):
		self.dbh.commit()
		self.dbh.close()
 
	def session_new(self, a):
		self.processing[a['pid']] = a
 
	def session_update(self, a):
		self.processing[a['pid']].update(a)
 
	def session_close(self, a):
		if a['pid'] in self.processing:
			self.session_update(a)
			a = self.processing[a['pid']]
			del self.processing[a['pid']]
			args = []
			for c in self.colums:
				if c in a:
					args.append(a[c])
				else:
					args.append(None)
			self.cursor.execute(self.insert_stmt,args)
 
	def convert_timestamp(self, t):
		# 2010-07-02_20:46:19.26154
		return time.mktime(time.strptime(t[:19],'%Y-%m-%d_%H:%M:%S')) + float('0'+t[19:])
 
	def convert_host(self,h):
		if h[0] == '[':
			return h[1:-1]
		return h
 
	@classmethod
	def unmatched_cb(cls,data):
		print("%s could not match '%s'" % (cls.__name__, data) )
 
	def feed(self,data):
		for f in self.format:
			r = self.regex[f]
			m = r.match(data)
			if m is not None:
				remain = line[m.end():]
				for s in self.format[f]:
					rs = self.regex[s]
					ms = rs.match(remain)
					if ms is not None:
						a = m.groupdict()
						a.update(ms.groupdict())
						keys = [x for x in filter(lambda x: s+'_'+x in self.alias, a.keys())]
						b = {self.alias[s+'_'+i]:a[i] for i in keys}
						for i in b:
							if i in self.convert:
								b[i] = self.convert[i](b[i])
						if 'pid' not in b:
							b['pid'] = self.prev['pid']
						else:
							self.prev = b
						if s in self.actions:
							self.actions[s](b)
						return
				self.unmatched_cb(remain)
				return
		self.unmatched_cb(data)			
 
m = gitdaemonlog()
 
f = open("git-daemon-log","r")
for line in f.readlines():
	line = line.rstrip('\r\n')
#	print(line)
	m.feed(line)
 
m.close()

Comments



2010/07/03/git-daemon_logfile_processing.txt · Last modified: 2010/07/03 15:15 by common
chimeric.de = chi`s home Creative Commons License Valid CSS Driven by DokuWiki do yourself a favour and use a real browser - get firefox!! Recent changes RSS feed Valid XHTML 1.0