<p>Vadim Yanitskiy has uploaded this change for <strong>review</strong>.</p><p><a href="https://gerrit.osmocom.org/12190">View Change</a></p><pre style="font-family: monospace,monospace; white-space: pre-wrap;">trx_toolkit/trx_sniff.py: migrate from getopt to argparse<br><br>Change-Id: Id1dacaa32134bfa68344e6c48310390cdd85cdc9<br>---<br>M src/target/trx_toolkit/trx_sniff.py<br>1 file changed, 60 insertions(+), 116 deletions(-)<br><br></pre><pre style="font-family: monospace,monospace; white-space: pre-wrap;">git pull ssh://gerrit.osmocom.org:29418/osmocom-bb refs/changes/90/12190/1</pre><pre style="font-family: monospace,monospace; white-space: pre-wrap;"><span>diff --git a/src/target/trx_toolkit/trx_sniff.py b/src/target/trx_toolkit/trx_sniff.py</span><br><span>index 9850983..9fb567e 100755</span><br><span>--- a/src/target/trx_toolkit/trx_sniff.py</span><br><span>+++ b/src/target/trx_toolkit/trx_sniff.py</span><br><span>@@ -27,7 +27,7 @@</span><br><span> </span><br><span> import logging as log</span><br><span> import signal</span><br><span style="color: hsl(0, 100%, 40%);">-import getopt</span><br><span style="color: hsl(120, 100%, 40%);">+import argparse</span><br><span> import sys</span><br><span> </span><br><span> import scapy.all</span><br><span>@@ -36,55 +36,37 @@</span><br><span> from data_msg import *</span><br><span> </span><br><span> class Application:</span><br><span style="color: hsl(0, 100%, 40%);">-      # Application variables</span><br><span style="color: hsl(0, 100%, 40%);">- sniff_interface = "lo"</span><br><span style="color: hsl(0, 100%, 40%);">-        sniff_base_port = 5700</span><br><span style="color: hsl(0, 100%, 40%);">-  print_bursts = False</span><br><span style="color: hsl(0, 100%, 40%);">-    output_file = None</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span>   # Counters</span><br><span>   cnt_burst_dropped_num = 0</span><br><span style="color: hsl(0, 100%, 40%);">-       cnt_burst_break = None</span><br><span>       cnt_burst_num = 0</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-   cnt_frame_break = None</span><br><span>       cnt_frame_last = None</span><br><span>        cnt_frame_num = 0</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-   # Burst direction fliter</span><br><span style="color: hsl(0, 100%, 40%);">-        bf_dir_l12trx = None</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-    # Timeslot number filter</span><br><span style="color: hsl(0, 100%, 40%);">-        bf_tn_val = None</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-        # Frame number fliter</span><br><span style="color: hsl(0, 100%, 40%);">-   bf_fn_lt = None</span><br><span style="color: hsl(0, 100%, 40%);">- bf_fn_gt = None</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span>      # Internal variables</span><br><span>         lo_trigger = False</span><br><span> </span><br><span>       def __init__(self):</span><br><span>          print_copyright(CR_HOLDERS)</span><br><span style="color: hsl(0, 100%, 40%);">-             self.parse_argv()</span><br><span style="color: hsl(120, 100%, 40%);">+             self.argv = self.parse_argv()</span><br><span> </span><br><span>            # Configure logging</span><br><span>          log.basicConfig(level = log.DEBUG,</span><br><span>                   format = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s")</span><br><span> </span><br><span>                # Open requested capture file</span><br><span style="color: hsl(0, 100%, 40%);">-           if self.output_file is not None:</span><br><span style="color: hsl(0, 100%, 40%);">-                        self.ddf = DATADumpFile(self.output_file)</span><br><span style="color: hsl(120, 100%, 40%);">+             if self.argv.output_file is not None:</span><br><span style="color: hsl(120, 100%, 40%);">+                 self.ddf = DATADumpFile(self.argv.output_file)</span><br><span> </span><br><span>   def run(self):</span><br><span>               # Compose a packet filter</span><br><span>            pkt_filter = "udp and (port %d or port %d)" \</span><br><span style="color: hsl(0, 100%, 40%);">-                 % (self.sniff_base_port + 2, self.sniff_base_port + 102)</span><br><span style="color: hsl(120, 100%, 40%);">+                      % (self.argv.base_port + 2, self.argv.base_port + 102)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-              log.info("Listening on interface '%s'..." % self.sniff_interface)</span><br><span style="color: hsl(120, 100%, 40%);">+           log.info("Listening on interface '%s'..." % self.argv.sniff_if)</span><br><span> </span><br><span>                # Start sniffing...</span><br><span style="color: hsl(0, 100%, 40%);">-             scapy.all.sniff(iface = self.sniff_interface, store = 0,</span><br><span style="color: hsl(120, 100%, 40%);">+              scapy.all.sniff(iface = self.argv.sniff_if, store = 0,</span><br><span>                       filter = pkt_filter, prn = self.pkt_handler)</span><br><span> </span><br><span>             # Scapy registers its own signal handler</span><br><span>@@ -92,7 +74,7 @@</span><br><span> </span><br><span>     def pkt_handler(self, ether):</span><br><span>                # Prevent loopback packet duplication</span><br><span style="color: hsl(0, 100%, 40%);">-           if self.sniff_interface == "lo":</span><br><span style="color: hsl(120, 100%, 40%);">+            if self.argv.sniff_if == "lo":</span><br><span>                     self.lo_trigger = not self.lo_trigger</span><br><span>                        if not self.lo_trigger:</span><br><span>                              return</span><br><span>@@ -139,32 +121,34 @@</span><br><span> </span><br><span>   def burst_pass_filter(self, l12trx, fn, tn):</span><br><span>                 # Direction filter</span><br><span style="color: hsl(0, 100%, 40%);">-              if self.bf_dir_l12trx is not None:</span><br><span style="color: hsl(0, 100%, 40%);">-                      if l12trx != self.bf_dir_l12trx:</span><br><span style="color: hsl(120, 100%, 40%);">+              if self.argv.direction is not None:</span><br><span style="color: hsl(120, 100%, 40%);">+                   if self.argv.direction == "TRX" and not l12trx:</span><br><span style="color: hsl(120, 100%, 40%);">+                             return False</span><br><span style="color: hsl(120, 100%, 40%);">+                  elif self.argv.direction == "L1" and l12trx:</span><br><span>                               return False</span><br><span> </span><br><span>             # Timeslot filter</span><br><span style="color: hsl(0, 100%, 40%);">-               if self.bf_tn_val is not None:</span><br><span style="color: hsl(0, 100%, 40%);">-                  if tn != self.bf_tn_val:</span><br><span style="color: hsl(120, 100%, 40%);">+              if self.argv.pf_tn is not None:</span><br><span style="color: hsl(120, 100%, 40%);">+                       if tn != self.argv.pf_tn:</span><br><span>                            return False</span><br><span> </span><br><span>             # Frame number filter</span><br><span style="color: hsl(0, 100%, 40%);">-           if self.bf_fn_lt is not None:</span><br><span style="color: hsl(0, 100%, 40%);">-                   if fn > self.bf_fn_lt:</span><br><span style="color: hsl(120, 100%, 40%);">+             if self.argv.pf_fn_lt is not None:</span><br><span style="color: hsl(120, 100%, 40%);">+                    if fn > self.argv.pf_fn_lt:</span><br><span>                               return False</span><br><span style="color: hsl(0, 100%, 40%);">-            if self.bf_fn_gt is not None:</span><br><span style="color: hsl(0, 100%, 40%);">-                   if fn < self.bf_fn_gt:</span><br><span style="color: hsl(120, 100%, 40%);">+             if self.argv.pf_fn_gt is not None:</span><br><span style="color: hsl(120, 100%, 40%);">+                    if fn < self.argv.pf_fn_gt:</span><br><span>                               return False</span><br><span> </span><br><span>             # Burst passed ;)</span><br><span>            return True</span><br><span> </span><br><span>      def msg_handle(self, msg):</span><br><span style="color: hsl(0, 100%, 40%);">-              if self.print_bursts:</span><br><span style="color: hsl(120, 100%, 40%);">+         if self.argv.verbose:</span><br><span>                        print(msg.burst)</span><br><span> </span><br><span>                 # Append a new message to the capture</span><br><span style="color: hsl(0, 100%, 40%);">-           if self.output_file is not None:</span><br><span style="color: hsl(120, 100%, 40%);">+              if self.argv.output_file is not None:</span><br><span>                        self.ddf.append_msg(msg)</span><br><span> </span><br><span>         def burst_count(self, fn, tn):</span><br><span>@@ -180,14 +164,14 @@</span><br><span>               self.cnt_burst_num += 1</span><br><span> </span><br><span>          # Stop sniffing after N bursts</span><br><span style="color: hsl(0, 100%, 40%);">-          if self.cnt_burst_break is not None:</span><br><span style="color: hsl(0, 100%, 40%);">-                    if self.cnt_burst_num == self.cnt_burst_break:</span><br><span style="color: hsl(120, 100%, 40%);">+                if self.argv.burst_count is not None:</span><br><span style="color: hsl(120, 100%, 40%);">+                 if self.cnt_burst_num == self.argv.burst_count:</span><br><span>                              log.info("Collected required amount of bursts")</span><br><span>                            return True</span><br><span> </span><br><span>              # Stop sniffing after N frames</span><br><span style="color: hsl(0, 100%, 40%);">-          if self.cnt_frame_break is not None:</span><br><span style="color: hsl(0, 100%, 40%);">-                    if self.cnt_frame_num == self.cnt_frame_break:</span><br><span style="color: hsl(120, 100%, 40%);">+                if self.argv.frame_count is not None:</span><br><span style="color: hsl(120, 100%, 40%);">+                 if self.cnt_frame_num == self.argv.frame_count:</span><br><span>                              log.info("Collected required amount of frames")</span><br><span>                            return True</span><br><span> </span><br><span>@@ -203,88 +187,48 @@</span><br><span>              # Exit</span><br><span>               sys.exit(0)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- def print_help(self, msg = None):</span><br><span style="color: hsl(0, 100%, 40%);">-               s  = " Usage: " + sys.argv[0] + " [options]\n\n" \</span><br><span style="color: hsl(0, 100%, 40%);">-                   " Some help...\n" \</span><br><span style="color: hsl(0, 100%, 40%);">-                   "  -h --help              this text\n\n"</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-             s += " Sniffing options\n" \</span><br><span style="color: hsl(0, 100%, 40%);">-                   "  -i --sniff-interface   Set network interface (default '%s')\n"  \</span><br><span style="color: hsl(0, 100%, 40%);">-                  "  -p --sniff-base-port   Set base port number (default %d)\n\n"</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-             s += " Processing (no processing by default)\n" \</span><br><span style="color: hsl(0, 100%, 40%);">-                      "  -o --output-file       Write bursts to file\n"          \</span><br><span style="color: hsl(0, 100%, 40%);">-                  "  -v --print-bits        Print burst bits to stdout\n\n"  \</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-         s += " Count limitations (disabled by default)\n" \</span><br><span style="color: hsl(0, 100%, 40%);">-                    "  --frame-count   NUM    Stop after sniffing NUM frames\n"  \</span><br><span style="color: hsl(0, 100%, 40%);">-                        "  --burst-count   NUM    Stop after sniffing NUM bursts\n\n"</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-                s += " Filtering (disabled by default)\n" \</span><br><span style="color: hsl(0, 100%, 40%);">-                    "  --direction     DIR    Burst direction: L12TRX or TRX2L1\n"  \</span><br><span style="color: hsl(0, 100%, 40%);">-                     "  --timeslot      NUM    TDMA timeslot number [0..7]\n"        \</span><br><span style="color: hsl(0, 100%, 40%);">-                     "  --frame-num-lt  NUM    TDMA frame number lower than NUM\n"   \</span><br><span style="color: hsl(0, 100%, 40%);">-                     "  --burst-num-gt  NUM    TDMA frame number greater than NUM\n"</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-              print(s % (self.sniff_interface, self.sniff_base_port))</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-         if msg is not None:</span><br><span style="color: hsl(0, 100%, 40%);">-                     print(msg)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span>   def parse_argv(self):</span><br><span style="color: hsl(0, 100%, 40%);">-           try:</span><br><span style="color: hsl(0, 100%, 40%);">-                    opts, args = getopt.getopt(sys.argv[1:],</span><br><span style="color: hsl(0, 100%, 40%);">-                                "i:p:o:v:h", ["help", "sniff-interface=", "sniff-base-port=",</span><br><span style="color: hsl(0, 100%, 40%);">-                                   "frame-count=", "burst-count=", "direction=",</span><br><span style="color: hsl(0, 100%, 40%);">-                                     "timeslot=", "frame-num-lt=", "frame-num-gt=",</span><br><span style="color: hsl(0, 100%, 40%);">-                                    "output-file=", "print-bits"])</span><br><span style="color: hsl(0, 100%, 40%);">-              except getopt.GetoptError as err:</span><br><span style="color: hsl(0, 100%, 40%);">-                       self.print_help("[!] " + str(err))</span><br><span style="color: hsl(0, 100%, 40%);">-                    sys.exit(2)</span><br><span style="color: hsl(120, 100%, 40%);">+           parser = argparse.ArgumentParser(prog = "trx_sniff",</span><br><span style="color: hsl(120, 100%, 40%);">+                        description = "Scapy-based TRX interface sniffer")</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-                for o, v in opts:</span><br><span style="color: hsl(0, 100%, 40%);">-                       if o in ("-h", "--help"):</span><br><span style="color: hsl(0, 100%, 40%);">-                           self.print_help()</span><br><span style="color: hsl(0, 100%, 40%);">-                               sys.exit(2)</span><br><span style="color: hsl(120, 100%, 40%);">+           parser.add_argument("-v", "--verbose",</span><br><span style="color: hsl(120, 100%, 40%);">+                    dest = "verbose", action = "store_true",</span><br><span style="color: hsl(120, 100%, 40%);">+                  help = "Print burst bits to stdout")</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-                      elif o in ("-i", "--sniff-interface"):</span><br><span style="color: hsl(0, 100%, 40%);">-                              self.sniff_interface = v</span><br><span style="color: hsl(0, 100%, 40%);">-                        elif o in ("-p", "--sniff-base-port"):</span><br><span style="color: hsl(0, 100%, 40%);">-                              self.sniff_base_port = int(v)</span><br><span style="color: hsl(120, 100%, 40%);">+         trx_group = parser.add_argument_group("TRX interface")</span><br><span style="color: hsl(120, 100%, 40%);">+              trx_group.add_argument("-i", "--sniff-interface",</span><br><span style="color: hsl(120, 100%, 40%);">+                 dest = "sniff_if", type = str, default = "lo", metavar = "IF",</span><br><span style="color: hsl(120, 100%, 40%);">+                  help = "Set network interface (default '%(default)s')")</span><br><span style="color: hsl(120, 100%, 40%);">+             trx_group.add_argument("-p", "--base-port",</span><br><span style="color: hsl(120, 100%, 40%);">+                       dest = "base_port", type = int, default = 6700,</span><br><span style="color: hsl(120, 100%, 40%);">+                     help = "Set base port number (default %(default)s)")</span><br><span style="color: hsl(120, 100%, 40%);">+                trx_group.add_argument("-o", "--output-file", metavar = "FILE",</span><br><span style="color: hsl(120, 100%, 40%);">+                 dest = "output_file", type = str,</span><br><span style="color: hsl(120, 100%, 40%);">+                   help = "Write bursts to a capture file")</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-                  elif o in ("-o", "--output-file"):</span><br><span style="color: hsl(0, 100%, 40%);">-                          self.output_file = v</span><br><span style="color: hsl(0, 100%, 40%);">-                    elif o in ("-v", "--print-bits"):</span><br><span style="color: hsl(0, 100%, 40%);">-                           self.print_bursts = True</span><br><span style="color: hsl(120, 100%, 40%);">+              cnt_group = parser.add_argument_group("Count limitations (optional)")</span><br><span style="color: hsl(120, 100%, 40%);">+               cnt_group.add_argument("--frame-count", metavar = "N",</span><br><span style="color: hsl(120, 100%, 40%);">+                    dest = "frame_count", type = int,</span><br><span style="color: hsl(120, 100%, 40%);">+                   help = "Stop after sniffing N frames")</span><br><span style="color: hsl(120, 100%, 40%);">+              cnt_group.add_argument("--burst-count", metavar = "N",</span><br><span style="color: hsl(120, 100%, 40%);">+                    dest = "burst_count", type = int,</span><br><span style="color: hsl(120, 100%, 40%);">+                   help = "Stop after sniffing N bursts")</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-                    # Break counters</span><br><span style="color: hsl(0, 100%, 40%);">-                        elif o == "--frame-count":</span><br><span style="color: hsl(0, 100%, 40%);">-                            self.cnt_frame_break = int(v)</span><br><span style="color: hsl(0, 100%, 40%);">-                   elif o == "--burst-count":</span><br><span style="color: hsl(0, 100%, 40%);">-                            self.cnt_burst_break = int(v)</span><br><span style="color: hsl(120, 100%, 40%);">+         pf_group = parser.add_argument_group("Filtering (optional)")</span><br><span style="color: hsl(120, 100%, 40%);">+                pf_group.add_argument("--direction",</span><br><span style="color: hsl(120, 100%, 40%);">+                        dest = "direction", type = str, choices = ["TRX", "L1"],</span><br><span style="color: hsl(120, 100%, 40%);">+                        help = "Burst direction")</span><br><span style="color: hsl(120, 100%, 40%);">+           pf_group.add_argument("--timeslot", metavar = "TN",</span><br><span style="color: hsl(120, 100%, 40%);">+                       dest = "pf_tn", type = int, choices = range(0, 8),</span><br><span style="color: hsl(120, 100%, 40%);">+                  help = "TDMA timeslot number (equal TN)")</span><br><span style="color: hsl(120, 100%, 40%);">+           pf_group.add_argument("--frame-num-lt", metavar = "FN",</span><br><span style="color: hsl(120, 100%, 40%);">+                   dest = "pf_fn_lt", type = int,</span><br><span style="color: hsl(120, 100%, 40%);">+                      help = "TDMA frame number (lower than FN)")</span><br><span style="color: hsl(120, 100%, 40%);">+         pf_group.add_argument("--frame-num-gt", metavar = "FN",</span><br><span style="color: hsl(120, 100%, 40%);">+                   dest = "pf_fn_gt", type = int,</span><br><span style="color: hsl(120, 100%, 40%);">+                      help = "TDMA frame number (greater than FN)")</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-                     # Direction filter</span><br><span style="color: hsl(0, 100%, 40%);">-                      elif o == "--direction":</span><br><span style="color: hsl(0, 100%, 40%);">-                              if v == "L12TRX":</span><br><span style="color: hsl(0, 100%, 40%);">-                                     self.bf_dir_l12trx = True</span><br><span style="color: hsl(0, 100%, 40%);">-                               elif v == "TRX2L1":</span><br><span style="color: hsl(0, 100%, 40%);">-                                   self.bf_dir_l12trx = False</span><br><span style="color: hsl(0, 100%, 40%);">-                              else:</span><br><span style="color: hsl(0, 100%, 40%);">-                                   self.print_help("[!] Wrong direction argument")</span><br><span style="color: hsl(0, 100%, 40%);">-                                       sys.exit(2)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-                     # Timeslot pass filter</span><br><span style="color: hsl(0, 100%, 40%);">-                  elif o == "--timeslot":</span><br><span style="color: hsl(0, 100%, 40%);">-                               self.bf_tn_val = int(v)</span><br><span style="color: hsl(0, 100%, 40%);">-                         if self.bf_tn_val < 0 or self.bf_tn_val > 7:</span><br><span style="color: hsl(0, 100%, 40%);">-                                      self.print_help("[!] Wrong timeslot value")</span><br><span style="color: hsl(0, 100%, 40%);">-                                   sys.exit(2)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-                     # Frame number pass filter</span><br><span style="color: hsl(0, 100%, 40%);">-                      elif o == "--frame-num-lt":</span><br><span style="color: hsl(0, 100%, 40%);">-                           self.bf_fn_lt = int(v)</span><br><span style="color: hsl(0, 100%, 40%);">-                  elif o == "--frame-num-gt":</span><br><span style="color: hsl(0, 100%, 40%);">-                           self.bf_fn_gt = int(v)</span><br><span style="color: hsl(120, 100%, 40%);">+                return parser.parse_args()</span><br><span> </span><br><span> if __name__ == '__main__':</span><br><span>         app = Application()</span><br><span></span><br></pre><p>To view, visit <a href="https://gerrit.osmocom.org/12190">change 12190</a>. To unsubscribe, or for help writing mail filters, visit <a href="https://gerrit.osmocom.org/settings">settings</a>.</p><div itemscope itemtype="http://schema.org/EmailMessage"><div itemscope itemprop="action" itemtype="http://schema.org/ViewAction"><link itemprop="url" href="https://gerrit.osmocom.org/12190"/><meta itemprop="name" content="View Change"/></div></div>

<div style="display:none"> Gerrit-Project: osmocom-bb </div>
<div style="display:none"> Gerrit-Branch: master </div>
<div style="display:none"> Gerrit-MessageType: newchange </div>
<div style="display:none"> Gerrit-Change-Id: Id1dacaa32134bfa68344e6c48310390cdd85cdc9 </div>
<div style="display:none"> Gerrit-Change-Number: 12190 </div>
<div style="display:none"> Gerrit-PatchSet: 1 </div>
<div style="display:none"> Gerrit-Owner: Vadim Yanitskiy <axilirator@gmail.com> </div>