[PATCH] libosmocore[master]: contrib: add fsm-to-dot.py to draw osmo_fsm dotty graphs

This is merely a historical archive of years 2008-2021, before the migration to mailman3.

A maintained and still updated list archive can be found at https://lists.osmocom.org/hyperkitty/list/gerrit-log@lists.osmocom.org/.

Neels Hofmeyr gerrit-no-reply at lists.osmocom.org
Wed Dec 7 00:07:41 UTC 2016


Review at  https://gerrit.osmocom.org/1367

contrib: add fsm-to-dot.py to draw osmo_fsm dotty graphs

Add a first version of a python script that tries to analyze .c source files to
draw graphs of osmo_fsm implementations. So far it uses quick-and-dirty
regexes.

Change-Id: I155f57a608d600f59aedfd27ef66eb9772c124e7
---
A contrib/fsm-to-dot.py
1 file changed, 710 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.osmocom.org:29418/libosmocore refs/changes/67/1367/1

diff --git a/contrib/fsm-to-dot.py b/contrib/fsm-to-dot.py
new file mode 100755
index 0000000..3549d1e
--- /dev/null
+++ b/contrib/fsm-to-dot.py
@@ -0,0 +1,710 @@
+#!/usr/bin/env python
+
+__doc__ = '''
+fsm-to-dot: convert FSM definitons to graph images
+
+Usage:
+  ./fsm-to-dot.py ~/openbsc/openbsc/src/libvlr/*.c
+  for f in *.dot ; do dot -Tpng $f > $f.png; done
+  # dot comes from 'apt-get install graphviz'
+
+Looks for osmo_fsm finite state machine definitions and madly parses .c files
+to draw graphs of them. This uses wild regexes that rely on coding style etc..
+No proper C parsing is done here (pycparser sucked, unfortunately).
+'''
+
+import sys, re
+
+def err(msg):
+  sys.stderr.write(msg + '\n')
+
+class listdict(object):
+  def __getattr__(ld, name):
+    if name == 'add':
+      return ld.__getattribute__(name)
+    return ld.__dict__.__getattribute__(name)
+
+  def _have(ld, name):
+    l = ld.__dict__.get(name)
+    if not l:
+      l = []
+      ld.__dict__[name] = l
+    return l
+
+  def add(ld, name, item):
+    l = ld._have(name)
+    l.append(item)
+    return ld
+
+  def add_dict(ld, d):
+    for k,v in d.items():
+      ld.add(k, v)
+
+  def __setitem__(ld, name, val):
+    return ld.__dict__.__setitem__(name, val)
+
+  def __getitem__(ld, name):
+    return ld.__dict__.__getitem__(name)
+
+  def __str__(ld):
+    return ld.__dict__.__str__()
+
+  def __repr__(ld):
+    return ld.__dict__.__repr__()
+
+  def update(ld, other_ld):
+    for name, items in other_ld.items():
+      ld.extend(name, items)
+    return ld
+
+  def extend(ld, name, vals):
+    l = ld._have(name)
+    l.extend(vals)
+    return ld
+
+re_state_start = re.compile(r'\[([A-Z_][A-Z_0-9]*)\]')
+re_event = re.compile(r'S\(([A-Z_][A-Z_0-9]*)\)')
+re_action = re.compile(r'.action *= *([a-z_][a-z_0-9]*)')
+
+def state_starts(line):
+  m = re_state_start.search(line)
+  if m:
+    return m.group(1)
+  return None
+
+def in_event_starts(line):
+  return line.find('in_event_mask') >= 0
+
+def out_state_starts(line):
+  return line.find('out_state_mask') >= 0
+
+def states_or_events(line):
+  return re_event.findall(line)
+
+def parse_action(line):
+  a = re_action.findall(line)
+  if a:
+    return a[0]
+  return None
+
+def _common_prefix(a, b):
+  for l in reversed(range(1,len(a))):
+    aa = a[:l+1]
+    if b.startswith(aa):
+      return aa
+  return ''
+
+def common_prefix(strs):
+  if not strs:
+    return ''
+  p = None
+  for s in strs:
+    if p is None:
+      p = s
+      continue
+    p = _common_prefix(p, s)
+    if not p:
+      return ''
+  return p
+
+KIND_STATE = 'KIND_STATE'
+KIND_FUNC = 'KIND_FUNC'
+KIND_FSM = 'KIND_FSM'
+BOX_SHAPES = {
+  KIND_STATE : None,
+  KIND_FUNC : 'box',
+  KIND_FSM : 'box3d',
+}
+
+class Event:
+  def __init__(event, name):
+    event.name = name
+    event.short_name = name
+
+  def __cmp__(event, other):
+    return cmp(event.name, other.name)
+
+class Edge:
+  def __init__(edge, to_state, event_name=None, style=None, action=None):
+    edge.to_state = to_state
+    edge.style = style
+    edge.events = []
+    edge.actions = []
+    edge.add_event_name(event_name)
+    edge.add_action(action)
+
+  def add_event_name(edge, event_name):
+    if not event_name:
+      return
+    edge.add_event(Event(event_name))
+
+  def add_event(edge, event):
+    if not event:
+      return
+    if event in edge.events:
+      return
+    edge.events.append(event)
+
+  def add_events(edge, events):
+    for event in events:
+      edge.add_event(event)
+
+  def add_action(edge, action):
+    if not action or action in edge.actions:
+      return
+    edge.actions.append(action)
+
+  def add_actions(edge, actions):
+    for action in actions:
+      edge.add_action(action)
+
+  def event_names(edge):
+    return sorted([event.name for event in edge.events])
+
+  def event_labels(edge):
+    return sorted([event.short_name for event in edge.events])
+
+  def action_labels(edge):
+    return sorted([action + '()' for action in edge.actions])
+
+  def has_event_name(edge, event_name):
+    return event_name in edge.event_names()
+
+class State:
+  name = None
+  short_name = None
+  action = None
+  label = None
+  in_event_names = None
+  out_state_names = None
+  out_edges = None
+  kind = None
+
+  def __init__(state):
+    state.in_event_names = []
+    state.out_state_names = []
+    state.out_edges = []
+    state.kind = KIND_STATE
+
+  def add_out_edge(state, edge):
+    for out_edge in state.out_edges:
+      if out_edge.to_state is edge.to_state:
+        if out_edge.style == edge.style:
+          out_edge.add_events(edge.events)
+          out_edge.add_actions(edge.actions)
+          return
+      # sanity
+      if out_edge.to_state.get_label() == edge.to_state.get_label():
+        raise Exception('Two distinct states exist with identical labels.')
+    state.out_edges.append(edge)
+
+  def get_label(state):
+    if state.label:
+      return state.label
+    l = [state.short_name]
+    if state.action:
+      if state.short_name == state.action:
+        l = []
+      l.append(state.action + '()')
+    return r'\n'.join(l)
+
+  def event_names(state):
+    event_names = []
+    for out_edge in state.out_edges:
+      event_names.extend(out_edge.event_names())
+    return event_names
+
+  def shape_str(state):
+    shape = BOX_SHAPES.get(state.kind, None)
+    if not shape:
+      return ''
+    return ',shape=%s' % shape
+
+  def __repr__(state):
+    return 'State(name=%r,short_name=%r,out=%d)' % (state.name, state.short_name, len(state.out_edges))
+
+class Fsm:
+  def __init__(fsm, struct_name, states_struct_name, from_file=None):
+    fsm.states = []
+    fsm.struct_name = struct_name
+    fsm.states_struct_name = states_struct_name
+    fsm.from_file = from_file
+    fsm.action_funcs = set()
+    fsm.event_names = set()
+
+  def parse_states(fsm, src):
+    state = None
+    started = None
+
+    IN_EVENTS = 'events'
+    OUT_STATES = 'states'
+
+    lines = src.splitlines()
+
+    for line in lines:
+      state_name = state_starts(line)
+      if state_name:
+        state = State()
+        fsm.states.append(state)
+        started = None
+        state.name = state_name
+
+      if in_event_starts(line):
+        started = IN_EVENTS
+      if out_state_starts(line):
+        started = OUT_STATES
+
+      if not state or not started:
+        continue
+
+      tokens = states_or_events(line)
+      if started == IN_EVENTS:
+        state.in_event_names.extend(tokens)
+      elif started == OUT_STATES:
+        state.out_state_names.extend(tokens)
+      else:
+        err('ignoring: %r' % tokens)
+
+      a = parse_action(line)
+      if a:
+        state.action = a
+
+
+    for state in fsm.states:
+      if state.action:
+        fsm.action_funcs.add(state.action)
+      if state.in_event_names:
+        fsm.event_names.update(state.in_event_names)
+
+    fsm.make_states_short_names()
+    fsm.ref_out_states()
+
+  def make_states_short_names(fsm):
+    p = common_prefix([s.name for s in fsm.states])
+    for s in fsm.states:
+      s.short_name = s.name[len(p):]
+    return p
+
+  def make_events_short_names(fsm):
+    p = common_prefix(fsm.event_names)
+    for state in fsm.states:
+      for edge in state.out_edges:
+        for event in edge.events:
+          event.short_name = event.name[len(p):]
+
+  def ref_out_states(fsm):
+    for state in fsm.states:
+      for e in [Edge(fsm.find_state_by_name(n, True)) for n in state.out_state_names]:
+        state.add_out_edge(e)
+
+  def find_state_by_name(fsm, name, strict=False):
+    for state in fsm.states:
+      if state.name == name:
+        return state
+    if strict:
+      raise Exception("State not found: %r" % name);
+    return None
+
+  def find_state_by_action(fsm, action):
+    for state in fsm.states:
+      if state.action == action:
+        return state
+    return None
+
+  def add_special_state(fsm, additional_states, name, in_state=None,
+                        out_state=None, event_name=None, kind=KIND_FUNC,
+                        state_action=None, label=None, edge_action=None):
+    additional_state = None
+    for s in additional_states:
+      if s.short_name == name:
+        additional_state = s
+        break;
+
+    if not additional_state:
+      for s in fsm.states:
+        if s.short_name == name:
+          additional_state = s
+          break;
+
+    if kind == KIND_FUNC and not state_action:
+      state_action = name
+
+    if not additional_state:
+      additional_state = State()
+      additional_state.short_name = name
+      additional_state.action = state_action
+      additional_state.kind = kind
+      additional_state.label = label
+      additional_states.append(additional_state)
+
+    if out_state:
+      additional_state.out_state_names.append(out_state.name)
+      additional_state.add_out_edge(Edge(out_state, event_name, 'dotted',
+                                         action=edge_action))
+
+    if in_state:
+      in_state.out_state_names.append(additional_state.name)
+      in_state.add_out_edge(Edge(additional_state, event_name, 'dotted',
+                                 action=edge_action))
+
+
+  def find_event_edges(fsm, c_files):
+    # enrich state transitions between the states with event labels
+    func_to_state_transitions = listdict()
+    for c_file in c_files:
+      func_to_state_transitions.update( c_file.find_state_transitions(fsm.event_names) )
+
+    # edges between explicit states
+    for state in fsm.states:
+      transitions = func_to_state_transitions.get(state.action)
+      if not transitions:
+        continue
+
+      for to_state_name, event_name in transitions:
+        if not event_name:
+          continue
+        for out_edge in state.out_edges:
+          if out_edge.to_state.name == to_state_name:
+            out_edge.add_event_name(event_name)
+
+    additional_states = []
+
+
+    # functions that aren't state actions but still effect state transitions
+    for func_name, transitions in func_to_state_transitions.items():
+      if func_name in fsm.action_funcs:
+        continue
+      for to_state_name, event_name in transitions:
+        to_state = fsm.find_state_by_name(to_state_name)
+        if not to_state:
+          continue
+        fsm.add_special_state(additional_states, func_name, None, to_state, event_name)
+
+
+    event_sources = c_files.find_event_sources(fsm.event_names)
+
+    for state in fsm.states:
+
+      for in_event_name in state.in_event_names:
+        funcs_for_in_event = event_sources.get(in_event_name)
+        if not funcs_for_in_event:
+          continue
+
+        found = False
+        for out_edge in state.out_edges:
+          if out_edge.has_event_name(in_event_name):
+            out_edge.action = r'\n'.join([(f + '()') for f in funcs_for_in_event
+                                          if f != state.action])
+
+        # if any functions that don't belong to a state trigger events, add
+        # them to the graph as well
+        additional_funcs = [f for f in funcs_for_in_event if f not in fsm.action_funcs]
+        for af in additional_funcs:
+          fsm.add_special_state(additional_states, af, None, state, in_event_name)
+
+    fsm.states.extend(additional_states)
+
+    # do any existing action functions by chance call other action functions?
+    for state in fsm.states:
+      if not state.action:
+        continue
+      callers = c_files.find_callers(state.action)
+      if not callers:
+        continue
+      for other_state in fsm.states:
+        if other_state.action in callers:
+          other_state.add_out_edge(Edge(state, None, 'dotted'))
+
+  def add_fsm_alloc(fsm, c_files):
+
+    allocating_funcs = []
+    for c_file in c_files:
+      allocating_funcs.extend(c_file.fsm_allocators.get(fsm.struct_name, []))
+
+    starting_state = None
+    if fsm.states:
+      # assume the first state starts
+      starting_state = fsm.states[0]
+
+    additional_states = []
+    for func_name in allocating_funcs:
+      fsm.add_special_state(additional_states, func_name, None, starting_state)
+
+    fsm.states.extend(additional_states)
+
+  def add_cross_fsm_links(fsm, fsms, c_files, fsm_meta):
+    for state in fsm.states:
+      if not state.action:
+        continue
+      if state.kind == KIND_FSM:
+        continue
+      callers = c_files.find_callers(state.action)
+
+      if state.kind == KIND_FUNC:
+        callers.append(state.action)
+
+      if not callers:
+        continue
+
+      for caller in callers:
+        for calling_fsm in fsms:
+          if calling_fsm is fsm:
+            continue
+          calling_state = calling_fsm.find_state_by_action(caller)
+          if not calling_state:
+            continue
+          if calling_state.kind == KIND_FSM:
+            continue
+
+          label = None
+          if state.kind == KIND_STATE:
+            label=fsm.struct_name + ': ' + state.short_name
+          edge_action = caller
+          if calling_state.action == edge_action:
+            edge_action = None
+          calling_fsm.add_special_state(calling_fsm.states, fsm.struct_name,
+            calling_state, kind=KIND_FSM, edge_action=edge_action, label=label)
+
+          label = None
+          if calling_state.kind == KIND_STATE:
+            label=calling_fsm.struct_name + ': ' + calling_state.short_name
+          edge_action = caller
+          if state.action == edge_action:
+            edge_action = None
+          fsm.add_special_state(fsm.states, calling_fsm.struct_name, None,
+            state, kind=KIND_FSM, edge_action=edge_action,
+            label=label)
+
+          # meta overview
+          meta_called_fsm = fsm_meta.have_state(fsm.struct_name, KIND_FSM)
+          meta_calling_fsm = fsm_meta.have_state(calling_fsm.struct_name, KIND_FSM)
+          meta_calling_fsm.add_out_edge(Edge(meta_called_fsm))
+
+
+  def have_state(fsm, name, kind=KIND_STATE):
+    state = fsm.find_state_by_name(name)
+    if not state:
+      state = State()
+      state.name = name
+      state.short_name = name
+      state.kind = kind
+      fsm.states.append(state)
+    return state
+
+  def to_dot(fsm):
+    out = ['digraph G {', 'rankdir=LR;']
+
+    for state in fsm.states:
+      out.append('%s [label="%s"%s]' % (state.short_name, state.get_label(),
+                  state.shape_str()))
+
+    for state in fsm.states:
+      for out_edge in state.out_edges:
+        attrs = []
+        labels = []
+        if out_edge.events:
+          labels.extend(out_edge.event_labels())
+        if out_edge.actions:
+          labels.extend(out_edge.action_labels())
+        if labels:
+          attrs.append('label="%s"' % (r'\n'.join(labels)))
+        if out_edge.style:
+          attrs.append('style=%s'% out_edge.style)
+        attrs_str = ''
+        if attrs:
+          attrs_str = ' [%s]' % (','.join(attrs))
+        out.append('%s->%s%s' % (state.short_name, out_edge.to_state.short_name, attrs_str))
+
+    out.append('}\n')
+
+    return '\n'.join(out)
+
+  def write_dot_file(fsm):
+    dot_path = '%s.dot' % fsm.struct_name
+    f = open(dot_path, 'w')
+    f.write(fsm.to_dot())
+    f.close()
+    print(dot_path)
+
+
+re_fsm = re.compile(r'struct osmo_fsm ([a-z_][a-z_0-9]*) =')
+re_fsm_states_struct_name = re.compile(r'\bstates = ([a-z_][a-z_0-9]*)\W*,')
+re_fsm_states = re.compile(r'struct osmo_fsm_state ([a-z_][a-z_0-9]*)\[\] =')
+re_func = re.compile(r'(\b[a-z_][a-z_0-9]*\b)\([^)]*\)\W*^{', re.MULTILINE)
+re_state_trigger = re.compile(r'osmo_fsm_inst_state_chg\([^,]+,\W*([A-Z_][A-Z_0-9]*)\W*,', re.M)
+re_fsm_alloc = re.compile(r'osmo_fsm_inst_alloc[_child]*\(\W*&([a-z_][a-z_0-9]*),', re.M)
+re_fsm_event_dispatch = re.compile(r'osmo_fsm_inst_dispatch\(\W*[^,]+,\W*([A-Z_][A-Z_0-9]*)\W*,', re.M)
+
+class CFile():
+  def __init__(c_file, path):
+    c_file.path = path
+    c_file.src = open(path).read()
+    c_file.funcs = {}
+    c_file.fsm_allocators = listdict()
+
+  def extract_block(c_file, brace_open, brace_close, start):
+    pos = 0
+    try:
+      src = c_file.src
+      block_start = src.find(brace_open, start)
+
+      pos = block_start
+      level = 1
+      while level > 0:
+        pos += 1
+        if src[pos] == brace_open:
+          level += 1
+        elif src[pos] == brace_close:
+          level -= 1
+
+      return src[block_start+1:pos]
+    except:
+      print("Error while trying to extract a code block from %r char pos %d" % (c_file.path, pos))
+      print("Block start at char pos %d" % block_start)
+      try:
+        print(src[block_start - 20 : block_start + 20])
+        print('...')
+        print(src[pos - 20 : pos + 20])
+      except:
+        pass
+      return ''
+
+
+  def find_fsms(c_file):
+    fsms = []
+    for m in re_fsm.finditer(c_file.src):
+      struct_name = m.group(1)
+      struct_def = c_file.extract_block('{', '}', m.start())
+      states_struct_name = re_fsm_states_struct_name.findall(struct_def)[0]
+      fsm = Fsm(struct_name, states_struct_name, c_file)
+      fsms.append(fsm)
+    return fsms
+
+  def find_fsm_states(c_file, fsms):
+    for m in re_fsm_states.finditer(c_file.src):
+      states_struct_name = m.group(1)
+      for fsm in fsms:
+        if states_struct_name == fsm.states_struct_name:
+          fsm.parse_states(c_file.extract_block('{', '}', m.start()))
+
+  def parse_functions(c_file):
+    funcs = {}
+    for m in re_func.finditer(c_file.src):
+      name = m.group(1)
+      func_src = c_file.extract_block('{', '}', m.start())
+      funcs[name] = func_src
+    c_file.funcs = funcs
+    c_file.find_fsm_allocators()
+
+  def find_callers(c_file, func_name):
+    func_call = func_name + '('
+    callers = []
+    for func_name, src in c_file.funcs.items():
+      if src.find(func_call) >= 0:
+        callers.append(func_name)
+    return callers
+
+  def find_fsm_allocators(c_file):
+    c_file.fsm_allocators = listdict()
+    for func_name, src in c_file.funcs.items():
+      for m in re_fsm_alloc.finditer(src):
+        fsm_struct_name = m.group(1)
+        c_file.fsm_allocators.add(fsm_struct_name, func_name)
+
+  def find_state_transitions(c_file, event_names):
+    TO_STATE = 'TO_STATE'
+    EVENT = 'EVENT'
+    func_to_state_transitions = listdict()
+
+    for func_name, src in c_file.funcs.items():
+      found_tokens = []
+
+      for m in re_state_trigger.finditer(src):
+        to_state = m.group(1)
+        found_tokens.append((m.start(), TO_STATE, to_state))
+
+      for event in event_names:
+        re_event = re.compile(r'\b(' + event + r')\b')
+        for m in re_event.finditer(src):
+          event = m.group(1)
+          found_tokens.append((m.start(), EVENT, event))
+
+      found_tokens = sorted(found_tokens)
+
+      last_event = None
+      for start, kind, name in found_tokens:
+        if kind == EVENT:
+          last_event = name
+        else:
+          func_to_state_transitions.add(func_name, (name, last_event))
+
+    return func_to_state_transitions
+
+
+  def find_event_sources(c_file, event_names):
+    c_file.event_sources = listdict()
+    for func_name, src in c_file.funcs.items():
+      for m in re_fsm_event_dispatch.finditer(src):
+        event_name = m.group(1)
+        c_file.event_sources.add(event_name, func_name)
+
+class CFiles(list):
+
+  def find_callers(c_files, func_name):
+    callers = []
+    for c_file in c_files:
+      callers.extend(c_file.find_callers(func_name))
+    return callers
+
+  def find_func_to_state_transitions(c_files):
+    func_to_state_transitions = listdict()
+    for c_file in c_files:
+      func_to_state_transitions.update( c_file.find_state_transitions(fsm.event_names) )
+    return func_to_state_transitions
+
+  def find_event_sources(c_files, event_names):
+    event_sources = listdict()
+    for c_file in c_files:
+      for event, sources in c_file.event_sources.items():
+        if event in event_names:
+          event_sources.extend(event, sources)
+    return event_sources
+
+c_files = CFiles()
+paths_seen = set()
+for path in sys.argv[1:]:
+  if path in paths_seen:
+    continue
+  paths_seen.add(path)
+  c_file = CFile(path)
+  c_files.append(c_file)
+
+for c_file in c_files:
+  c_file.parse_functions()
+
+fsms = []
+for c_file in c_files:
+  fsms.extend(c_file.find_fsms())
+
+for c_file in c_files:
+  c_file.find_fsm_states(fsms)
+  c_file.find_event_sources(fsms)
+
+for fsm in fsms:
+  fsm.find_event_edges(c_files)
+  fsm.add_fsm_alloc(c_files)
+
+fsm_meta = Fsm("meta", "meta")
+for fsm in fsms:
+  fsm.add_cross_fsm_links(fsms, c_files, fsm_meta)
+
+for fsm in fsms:
+  fsm.make_events_short_names()
+
+for fsm in fsms:
+  fsm.write_dot_file()
+
+fsm_meta.write_dot_file()
+
+
+# vim: tabstop=2 shiftwidth=2 expandtab

-- 
To view, visit https://gerrit.osmocom.org/1367
To unsubscribe, visit https://gerrit.osmocom.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I155f57a608d600f59aedfd27ef66eb9772c124e7
Gerrit-PatchSet: 1
Gerrit-Project: libosmocore
Gerrit-Branch: master
Gerrit-Owner: Neels Hofmeyr <nhofmeyr at sysmocom.de>



More information about the gerrit-log mailing list