10 år 10 år

As a part of the Snort lecture in intrusion detection systems we learned about a multi-term string search algorithm called Aho-Corasick named by it's two creators. It's actually quite old, published in 1975, but still considered the most fit algorithm for this purpose.

I tried implementing it in python in order to get a grasp of what it's going. It has a phase of building a finite state machine based on the keywords, and this process is being performed for all signatures off-line. It consists of a "go to" tree and "failure" + "output" look-up tables.

The search phase consists of walking the input string, typically network traffic or logs, character by character. The state is continuously changing depending on whether the current input symbol has a next state defined, or if none found moved to a state defined in the failure table. The output table is checked for every state visited and outputted from if not empty.

<implement deterministic conversion>

Just a simple reminder to myself, as this gave me a lot of headacke :)

def func(a, b):
print a
func("a" is False)
# output: False

The python code with examples:

# coding=utf8
import os
os.system('clear') # empty the console

class GoTo:
# store state as index, containing list of (char, next state) pairs
def __init__(self):
self.storage = [] # store "g[state, char] = next"
self.length = 0

def list(self, state):
try:
next = []
for i in self.storage[state]:
next.append((i[0], i[1])) # (char, next)
return next
except:
return ()

def goto_get(self, state, char):
# used by the goto construction phase
try:
state = self.storage[state]
for pair in state:
if (pair[0] == char):
return pair[1] # next state
return False # no match found
except:
return False # no state exists

def get(self, state, char):
next_state = self.goto_get(state, char)
if (state == 0 and next_state is False):
return 0 # override: because all failed 0 states should return to 0 state
else:
return next_state

def set(self, state, char, next): # state will be saved as the list-index
try:
self.storage[state].append((char, next)) # try appending
except:
while state > self.length: # add skipped "empty" states
self.storage.append([])
self.length += 1
self.storage.append([(char, next)]) # add new state
self.length += 1

def dump(self):
return self.storage

class OutPut:
def __init__(self):
self.storage = []
self.length = 0

def set(self, state, term): # term must be a list of one or many terms to be added
try:
state = self.storage[state]
for t in term:
state.append(t) # try to append if exists
except:
while state > self.length: # add skipped states
self.storage.append([])
self.length += 1
self.storage.append(term) # add new state output
self.length += 1

def get(self, state):
try:
return self.storage[state]
except:
return []

def dump(self):
return self.storage

def construct_goto(patterns, g, o):
#construct the goto structure and first round of output function
newState = 0
for term in patterns:
m = len(term)
state = 0
j = 0
while g.goto_get(state, term[j]) is not False:
state = g.goto_get(state, term[j])
j += 1
if j >= m: # avoid accessing out of bound chars in string
break
for k in range(j, m):
newState += 1
g.set(state, term[k], newState)
state = newState
o.set(state, [term])
# skipping setting all g[0, alpha] = 0 because g.get() takes care of it
return newState

def construct_failure(patterns, f, g, o):
# where to go in the goto structure on failure and build the rest of the output function
queue = []
alphas = g.list(0)
for char, next in alphas:
queue.append(next)
f[next - 1] = 0
while (len(queue)) > 0:
r = queue.pop(0)
alphas = g.list(r)
for char, next in alphas:
queue.append(next)
state = f[r - 1]
while g.get(state, char) is False:
state = f[state - 1]
f[next - 1] = g.get(state, char)
string = o.get(f[next - 1])
o.set(next, string)

def build_ahocorasick(keywords, nocase):
if(nocase):
keywords_upper = []
for word in keywords:
keywords_upper.append(word.lower())
keywords = keywords_upper
g = GoTo()
o = OutPut()
num_states = construct_goto(keywords, g, o)
f = [0] * num_states
construct_failure(keywords, f, g, o)
return [g, o.dump(), f, keywords]

def run_ahocorasick(machine, input_string, nocase):
if(nocase):
input_string = input_string.lower()
goto_matrix = machine[0]
output_matrix = machine[1]
failure_matrix = machine[2]
keywords = machine[3]
print("Looking for %s in '%s'\n") % (keywords, input_string)
print("State machine:\ngoto matrix: %s\nfailture matrix: %s\noutput matrix: %s") % (goto_matrix.dump(), failure_matrix, output_matrix)
print("\n\tLooking for matches:")
state = 0
for i in range(0, len(input_string)):
while goto_matrix.get(state, input_string[i]) is False:
state = failure_matrix[state]
state = goto_matrix.get(state, input_string[i])
if output_matrix[state] != []:
print("end pos %s - %s") % (i + 1, output_matrix[state])
print("\tComplete\n")

nocase = True # False or True
machine = build_ahocorasick(("snort", "or", "snow"), nocase)
run_ahocorasick(machine, "snort on snow", nocase)

machine = build_ahocorasick(("a", "ab", "abb", "abba", "b", "bb", "bba", "ba"), nocase)
run_ahocorasick(machine, "abba", nocase)

machine = build_ahocorasick(("t o", "n s", "ab", "cd"), nocase)
run_ahocorasick(machine, "rt on sn abcd", nocase)