5 år 5 år
5
sept. 2013

### Aho-Corasick algorithm

Arkiv for denne måneden
###### Tags:
Python

As a part of the Snort lecture in intrusion detection systems we learned about a multi-term string search algorithm called Aho-Corasick named by it's two creators. It's actually quite old, published in 1975, but still considered the most fit algorithm for this purpose.

I tried implementing it in python in order to get a grasp of what it's going. It has a phase of building a finite state machine based on the keywords, and this process is being performed for all signatures off-line. It consists of a "go to" tree and "failure" + "output" look-up tables.

The search phase consists of walking the input string, typically network traffic or logs, character by character. The state is continuously changing depending on whether the current input symbol has a next state defined, or if none found moved to a state defined in the failure table. The output table is checked for every state visited and outputted from if not empty.

<implement deterministic conversion>

Just a simple reminder to myself, as this gave me a lot of headacke :)

`def func(a, b):	print afunc("a" is False)# output: False`

The python code with examples:

```# coding=utf8import osos.system('clear')  # empty the console

class GoTo:	# store state as index, containing list of (char, next state) pairs	def __init__(self):		self.storage = []  # store "g[state, char] = next"		self.length = 0

def list(self, state):		try:			next = []			for i in self.storage[state]:				next.append((i[0], i[1]))  # (char, next)			return next		except:			return ()

def goto_get(self, state, char):	# used by the goto construction phase		try:			state = self.storage[state]			for pair in state:				if (pair[0] == char):					return pair[1]  # next state			return False  # no match found		except:			return False  # no state exists

def get(self, state, char):		next_state = self.goto_get(state, char)		if (state == 0 and next_state is False):			return 0  # override: because all failed 0 states should return to 0 state		else:			return next_state

def set(self, state, char, next):  # state will be saved as the list-index		try:			self.storage[state].append((char, next))  # try appending		except:			while state > self.length:  # add skipped "empty" states				self.storage.append([])				self.length += 1			self.storage.append([(char, next)]) # add new state			self.length += 1

def dump(self):		return self.storage

class OutPut:	def __init__(self):		self.storage = []		self.length = 0

def set(self, state, term):  # term must be a list of one or many terms to be added		try:			state = self.storage[state]			for t in term:				state.append(t)  # try to append if exists		except:			while state > self.length:  # add skipped states				self.storage.append([])				self.length += 1			self.storage.append(term) # add new state output			self.length += 1

def get(self, state):		try:			return self.storage[state]		except:			return []

def dump(self):		return self.storage

def construct_goto(patterns, g, o):	#construct the goto structure and first round of output function	newState = 0	for term in patterns:		m = len(term)		state = 0		j = 0		while g.goto_get(state, term[j]) is not False:			state = g.goto_get(state, term[j])			j += 1			if j >= m:  # avoid accessing out of bound chars in string				break		for k in range(j, m):			newState += 1			g.set(state, term[k], newState)			state = newState		o.set(state, [term])	# skipping setting all g[0, alpha] = 0 because g.get() takes care of it	return newState

def construct_failure(patterns, f, g, o):	# where to go in the goto structure on failure and build the rest of the output function	queue = []	alphas = g.list(0)	for char, next in alphas:		queue.append(next)		f[next - 1] = 0	while (len(queue)) > 0:		r = queue.pop(0)		alphas = g.list(r)		for char, next in alphas:			queue.append(next)			state = f[r - 1]			while g.get(state, char) is False:				state = f[state - 1]			f[next - 1] = g.get(state, char)			string = o.get(f[next - 1])			o.set(next, string)

def build_ahocorasick(keywords, nocase):	if(nocase):		keywords_upper = []		for word in keywords:			keywords_upper.append(word.lower())		keywords = keywords_upper	g = GoTo()	o = OutPut()	num_states = construct_goto(keywords, g, o)	f = [0] * num_states	construct_failure(keywords, f, g, o)	return [g, o.dump(), f, keywords]

def run_ahocorasick(machine, input_string, nocase):	if(nocase):		input_string = input_string.lower()	goto_matrix = machine[0]	output_matrix = machine[1]	failure_matrix = machine[2]	keywords = machine[3]	print("Looking for %s in '%s'\n") % (keywords, input_string)	print("State machine:\ngoto matrix: %s\nfailture matrix: %s\noutput matrix: %s") % (goto_matrix.dump(), failure_matrix, output_matrix)	print("\n\tLooking for matches:")	state = 0	for i in range(0, len(input_string)):		while goto_matrix.get(state, input_string[i]) is False:			state = failure_matrix[state]		state = goto_matrix.get(state, input_string[i])		if output_matrix[state] != []:			print("end pos %s - %s") % (i + 1, output_matrix[state])	print("\tComplete\n")

nocase = True  # False or Truemachine = build_ahocorasick(("snort", "or", "snow"), nocase)run_ahocorasick(machine, "snort on snow", nocase)

machine = build_ahocorasick(("a", "ab", "abb", "abba", "b", "bb", "bba", "ba"), nocase)run_ahocorasick(machine, "abba", nocase)

machine = build_ahocorasick(("t o", "n s", "ab", "cd"), nocase)run_ahocorasick(machine, "rt on sn abcd", nocase)```