
the git-daemon activity for the dionaea.git repository, pull and uniq hosts/day, basically 5-10 users update their software daily.
Often the most complex part in data visualization is the processing before you can provide the data in a format your visualization software understands.
I choose the git-daemon logs as an example of such an case.
One could have used sshd logs as an example too, but I choose this, as I'm pretty sure there is no parser for the git-daemon logfiles.
In doubt, I'm pretty confident, one could adjust this git-daemon parser to deal with sshd too.
Parsing git-daemon logs is special, as the entry for a single client spans multiple lines, and is bound via the pid.
2010-07-02_19:23:24.86055 [31206] Connection from 216.34.181.151:10534
2010-07-02_19:23:24.86064 [31206] Extended attributes (23 bytes) exist <host=git.carnivore.it>
2010-07-02_19:23:24.93354 [31206] Request upload-pack for '/honeytrap.git'
2010-07-02_19:23:26.14593 [31206] Disconnected
So, basically, we have a header followed by different messages types with different fields of interest:
self.format = { 'head' : ['connection','request','disconnect','extended'] }
We define regular expressions for each format type:
self.regex = {
'head': re.compile('(?P<timestamp>[\d]{4}-[\d]{2}-[\d]{2}_[\d]{2}:[\d]{2}:[\d]{2}\.[\d]*) \[(?P<pid>[\d]*)\] '),
'connection' : re.compile('Connection from (?P<host>[\d\.]|\[[\d\w\.:]\]*)'),
'disconnect' : re.compile('Disconnected'),
'request' : re.compile('Request upload-pack for \'(?P<repository>.*)\''),
'extended': re.compile('Extended attributes \(([\d]*) bytes\) exist <(?P<attributes>.*)>'),
}
As we will rename the regex substrings to the outer format name, we provide a alias map, so we can adjust the name on the fly, and extract only the data we really want, no alias mapping, the data will not get stored.
self.alias = {
'connection_timestamp' : 'start',
'connection_host': 'host',
'connection_pid': 'pid',
'extended_pid' : 'pid',
'extended_attributes' : 'attributes',
'request_repository': 'repo',
'request_pid': 'pid',
'disconnect_timestamp' : 'stop',
'disconnect_pid' : 'pid'
}
Some datatypes are special, and require conversation before storing them, here it is timestamps and hosts.
self.convert = {
'start' : self.convert_timestamp,
'stop' : self.convert_timestamp,
'host' : self.convert_host,
}
For the timestamp, we convert to unix epoch, for the host, we strip the surrounding [] from the host.
def convert_timestamp(self, t):
return time.mktime(time.strptime(t[:19],'%Y-%m-%d_%H:%M:%S')) + float('0'+t[19:])
def convert_host(self,h):
if h[0] == '[':
return h[1:-1]
return h
We define actions what should happen when a given message type comes up:
self.actions = {
'connection': self.session_new,
'disconnect': self.session_close,
'request': self.session_update,
'extended': self.session_update,
'error': self.session_update,
'any': self.session_update,
}
For
a new session, we store the the aliased fields of the match in a dict, using the pid as key
all others but disconnect, we add the new keys to the dict
for disconnect we store the data in a sqlite table
def session_new(self, a):
self.processing[a['pid']] = a
def session_update(self, a):
self.processing[a['pid']].update(a)
def session_close(self, a):
if a['pid'] in self.processing:
self.session_update(a)
a = self.processing[a['pid']]
del self.processing[a['pid']]
args = []
for c in self.colums:
if c in a:
args.append(a[c])
else:
args.append(None)
self.cursor.execute(self.insert_stmt,args)
We have to create the sqlite table, as we will only insert keys which got aliased names, we can use the alias names as column names.
As we will insert data into the table later on, we prepare a query too.
def create(self, tablename=None):
if tablename is not None:
self.tablename = tablename
self.dbh = sqlite3.connect(self.dbname)
self.cursor = self.dbh.cursor()
self.colums = [i for i in {self.alias[x]:True for x in self.alias}]
table = """CREATE TABLE IF NOT EXISTS \n%s (\n\t%s INTEGER PRIMARY KEY,\n""" % (self.tablename, self.__class__.__name__)
for c in self.colums:
table = table + '\t' + c + ',\n'
table = table[:-2]
table = table + '\n)'
self.cursor.execute(table)
self.insert_stmt = "INSERT INTO %s (" % self.tablename
for c in self.colums:
self.insert_stmt += c + ','
self.insert_stmt = self.insert_stmt[:-1]
self.insert_stmt += ") VALUES ("
for c in self.colums:
self.insert_stmt += '?,'
self.insert_stmt = self.insert_stmt[:-1]
self.insert_stmt += ")"
print(self.insert_stmt)
This will create a table like:
CREATE TABLE IF NOT EXISTS
gitdaemonlogs (
gitdaemonlog INTEGER PRIMARY KEY,
stop,
pid,
repo,
start,
host,
warning,
error,
attributes,
type
)
And an INSERT string like:
INSERT INTO gitdaemonlogs (stop,pid,repo,start,host,warning,error,attributes,type) VALUES (?,?,?,?,?,?,?,?,?)
The table and the INSERT string change if you change the aliases, no need to adjust the table layout by hand.
Now, we need to
For convenience, we provide a static callback, which can used to act on unmatched messages:
@classmethod
def unmatched_cb(cls,data):
print("%s could not match '%s'" % (cls.__name__, data) )
def feed(self,data):
for f in self.format:
# first level matching
r = self.regex[f]
m = r.match(data)
if m is not None:
# success matching first level
remain = line[m.end():]
for s in self.format[f]:
# second level match
rs = self.regex[s]
ms = rs.match(remain)
if ms is not None:
# second level match success
a = m.groupdict()
# merge dicts for both formats
a.update(ms.groupdict())
# rename the keys according to self.alias
# filter all keys not in self.alias
keys = [x for x in filter(lambda x: s+'_'+x in self.alias, a.keys())]
b = {self.alias[s+'_'+i]:a[i] for i in keys}
# convert values if required
for i in b:
if i in self.convert:
b[i] = self.convert[i](b[i])
# hack for lines without pid
if 'pid' not in b:
b['pid'] = self.prev['pid']
else:
self.prev = b
# act
if s in self.actions:
self.actions[s](b)
return
self.unmatched_cb(remain)
return
self.unmatched_cb(data)
Last thing left is feeding data:
m = gitdaemonlog()
f = open("git-daemon-log","r")
for line in f.readlines():
line = line.rstrip('\r\n')
m.feed(line)
m.close()
To make things worse, there are some lines without a pid:
2010-04-29_07:54:29.55718 fatal: The remote end hung up unexpectedly
Therefore we store the previous matched element on self.prev, and reuse the message for followups without pid.
This is very basic, I just wanted to point some place where a well structured framework is required: log file parsing with data extraction, manipulation and storing.
One could add
In any way, you get a sqlite database in the end.
SELECT
strftime('%Y-%m-%d',start,'unixepoch','localtime') AS date,
repo,
COUNT(host) AS pulls,
COUNT(DISTINCT host) AS hosts
FROM
gitdaemonlogs
WHERE
repo = '/dionaea.git'
GROUP BY
strftime('%Y-%m-%d',start,'unixepoch','localtime'),repo;
dionaea-git.sql
sqlite3 gitdaemonlog.sqlite
.timer on
.output dionaea-git.txt
.read dionaea-git.sql
set terminal png size 600,240 nocrop butt font "/usr/share/fonts/truetype/ttf-liberation/LiberationSans-Regular.ttf" 8
set output "dionaea-git.png"
set xdata time
set timefmt "%Y-%m-%d"
set format x "%b %d"
set ylabel "pulls"
unset y2label
unset y2tics
set datafile separator "|"
plot 'dionaea-git.txt' using 1:3 title "pulls" with lines, "" using 1:4 title "hosts" with lines axes x1y1
It would be great if graphing software, such as gnuplot, could interface a database as data source, all you'd have to do is compile a query to retrieve the data you are looking for, one could create views on the tables, showing different aspects of it …
But I'm totally unaware of software which would fill the gap between logfiles and visualisation software.
This snippets has some more matches than the outlined above, it is meant to work with python3
#!/usr/bin/python3.1
import re
import sqlite3
import time
class gitdaemonlog:
def __init__(self):
self.format = { 'head' : [
# good
'connection','request','disconnect','extended',
# bad - errors
'error'],
# ugly
'fatal' : [
'any']
}
self.regex = {
# got logs
'head': re.compile('(?P<timestamp>[\d]{4}-[\d]{2}-[\d]{2}_[\d]{2}:[\d]{2}:[\d]{2}\.[\d]*) \[(?P<pid>[\d]*)\] '),
'connection' : re.compile('^Connection from (?P<host>\[([\d\.:\w]*)\]|([\d\.]*))'),
'disconnect' : re.compile('^Disconnected'),
'request' : re.compile('^Request (?P<type>[\w]*)-pack for \'(?P<repository>.*)\''),
'extended': re.compile('^Extended attributes \(([\d]*) bytes\) exist <(?P<attributes>.*)>'),
'error' : re.compile('^\'(.*)\': (?P<string>.*)'),
# errors without pid
'fatal': re.compile('(?P<timestamp>[\d]{4}-[\d]{2}-[\d]{2}_[\d]{2}:[\d]{2}:[\d]{2}\.[\d]*) fatal: '),
'any': re.compile('(?P<string>.*)')
}
self.actions = {
'connection': self.session_new,
'disconnect': self.session_close,
'request': self.session_update,
'extended': self.session_update,
'error': self.session_update,
'any': self.session_update,
}
self.alias = {
'connection_timestamp' : 'start',
'connection_pid': 'pid',
'connection_host': 'host',
'extended_pid' : 'pid',
'extended_attributes' : 'attributes',
'request_pid': 'pid',
'request_type': 'type',
'request_repository': 'repo',
'disconnect_pid' : 'pid',
'disconnect_timestamp' : 'stop',
'error_pid': 'pid',
'error_string' : 'error',
# fatal has no pid, we borrow from prev element
'any_string': 'warning',
}
self.convert = {
'start' : self.convert_timestamp,
'stop' : self.convert_timestamp,
'host' : self.convert_host,
}
self.processing = {}
self.parsed = []
self.tablename = self.__class__.__name__ + 's'
self.dbname = self.__class__.__name__ + '.sqlite'
self.create()
self.prev = None
def create(self, tablename=None):
if tablename is not None:
self.tablename = tablename
self.dbh = sqlite3.connect(self.dbname)
self.cursor = self.dbh.cursor()
self.colums = [i for i in {self.alias[x]:True for x in self.alias}]
table = """CREATE TABLE IF NOT EXISTS \n%s (\n\t%s INTEGER PRIMARY KEY,\n""" % (self.tablename, self.__class__.__name__)
for c in self.colums:
table = table + '\t' + c + ',\n'
table = table[:-2]
table = table + '\n)'
print(table)
self.cursor.execute(table)
self.insert_stmt = "INSERT INTO %s (" % self.tablename
for c in self.colums:
self.insert_stmt += c + ','
self.insert_stmt = self.insert_stmt[:-1]
self.insert_stmt += ") VALUES ("
for c in self.colums:
self.insert_stmt += '?,'
self.insert_stmt = self.insert_stmt[:-1]
self.insert_stmt += ")"
print(self.insert_stmt)
def close(self):
self.dbh.commit()
self.dbh.close()
def session_new(self, a):
self.processing[a['pid']] = a
def session_update(self, a):
self.processing[a['pid']].update(a)
def session_close(self, a):
if a['pid'] in self.processing:
self.session_update(a)
a = self.processing[a['pid']]
del self.processing[a['pid']]
args = []
for c in self.colums:
if c in a:
args.append(a[c])
else:
args.append(None)
self.cursor.execute(self.insert_stmt,args)
def convert_timestamp(self, t):
# 2010-07-02_20:46:19.26154
return time.mktime(time.strptime(t[:19],'%Y-%m-%d_%H:%M:%S')) + float('0'+t[19:])
def convert_host(self,h):
if h[0] == '[':
return h[1:-1]
return h
@classmethod
def unmatched_cb(cls,data):
print("%s could not match '%s'" % (cls.__name__, data) )
def feed(self,data):
for f in self.format:
r = self.regex[f]
m = r.match(data)
if m is not None:
remain = line[m.end():]
for s in self.format[f]:
rs = self.regex[s]
ms = rs.match(remain)
if ms is not None:
a = m.groupdict()
a.update(ms.groupdict())
keys = [x for x in filter(lambda x: s+'_'+x in self.alias, a.keys())]
b = {self.alias[s+'_'+i]:a[i] for i in keys}
for i in b:
if i in self.convert:
b[i] = self.convert[i](b[i])
if 'pid' not in b:
b['pid'] = self.prev['pid']
else:
self.prev = b
if s in self.actions:
self.actions[s](b)
return
self.unmatched_cb(remain)
return
self.unmatched_cb(data)
m = gitdaemonlog()
f = open("git-daemon-log","r")
for line in f.readlines():
line = line.rstrip('\r\n')
# print(line)
m.feed(line)
m.close()