<p>Vadim Yanitskiy <strong>merged</strong> this change.</p><p><a href="https://gerrit.osmocom.org/12190">View Change</a></p><div style="white-space:pre-wrap">Approvals:
Jenkins Builder: Verified
Pau Espin Pedrol: Looks good to me, approved
</div><pre style="font-family: monospace,monospace; white-space: pre-wrap;">trx_toolkit/trx_sniff.py: migrate from getopt to argparse<br><br>Change-Id: Id1dacaa32134bfa68344e6c48310390cdd85cdc9<br>---<br>M src/target/trx_toolkit/trx_sniff.py<br>1 file changed, 60 insertions(+), 116 deletions(-)<br><br></pre><pre style="font-family: monospace,monospace; white-space: pre-wrap;"><span>diff --git a/src/target/trx_toolkit/trx_sniff.py b/src/target/trx_toolkit/trx_sniff.py</span><br><span>index 9850983..9fb567e 100755</span><br><span>--- a/src/target/trx_toolkit/trx_sniff.py</span><br><span>+++ b/src/target/trx_toolkit/trx_sniff.py</span><br><span>@@ -27,7 +27,7 @@</span><br><span> </span><br><span> import logging as log</span><br><span> import signal</span><br><span style="color: hsl(0, 100%, 40%);">-import getopt</span><br><span style="color: hsl(120, 100%, 40%);">+import argparse</span><br><span> import sys</span><br><span> </span><br><span> import scapy.all</span><br><span>@@ -36,55 +36,37 @@</span><br><span> from data_msg import *</span><br><span> </span><br><span> class Application:</span><br><span style="color: hsl(0, 100%, 40%);">- # Application variables</span><br><span style="color: hsl(0, 100%, 40%);">- sniff_interface = "lo"</span><br><span style="color: hsl(0, 100%, 40%);">- sniff_base_port = 5700</span><br><span style="color: hsl(0, 100%, 40%);">- print_bursts = False</span><br><span style="color: hsl(0, 100%, 40%);">- output_file = None</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span> # Counters</span><br><span> cnt_burst_dropped_num = 0</span><br><span style="color: hsl(0, 100%, 40%);">- cnt_burst_break = None</span><br><span> cnt_burst_num = 0</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- cnt_frame_break = None</span><br><span> cnt_frame_last = None</span><br><span> cnt_frame_num = 0</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- # Burst direction fliter</span><br><span style="color: hsl(0, 100%, 40%);">- bf_dir_l12trx = None</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Timeslot number filter</span><br><span style="color: hsl(0, 100%, 40%);">- bf_tn_val = None</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Frame number fliter</span><br><span style="color: hsl(0, 100%, 40%);">- bf_fn_lt = None</span><br><span style="color: hsl(0, 100%, 40%);">- bf_fn_gt = None</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span> # Internal variables</span><br><span> lo_trigger = False</span><br><span> </span><br><span> def __init__(self):</span><br><span> print_copyright(CR_HOLDERS)</span><br><span style="color: hsl(0, 100%, 40%);">- self.parse_argv()</span><br><span style="color: hsl(120, 100%, 40%);">+ self.argv = self.parse_argv()</span><br><span> </span><br><span> # Configure logging</span><br><span> log.basicConfig(level = log.DEBUG,</span><br><span> format = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s")</span><br><span> </span><br><span> # Open requested capture file</span><br><span style="color: hsl(0, 100%, 40%);">- if self.output_file is not None:</span><br><span style="color: hsl(0, 100%, 40%);">- self.ddf = DATADumpFile(self.output_file)</span><br><span style="color: hsl(120, 100%, 40%);">+ if self.argv.output_file is not None:</span><br><span style="color: hsl(120, 100%, 40%);">+ self.ddf = DATADumpFile(self.argv.output_file)</span><br><span> </span><br><span> def run(self):</span><br><span> # Compose a packet filter</span><br><span> pkt_filter = "udp and (port %d or port %d)" \</span><br><span style="color: hsl(0, 100%, 40%);">- % (self.sniff_base_port + 2, self.sniff_base_port + 102)</span><br><span style="color: hsl(120, 100%, 40%);">+ % (self.argv.base_port + 2, self.argv.base_port + 102)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- log.info("Listening on interface '%s'..." % self.sniff_interface)</span><br><span style="color: hsl(120, 100%, 40%);">+ log.info("Listening on interface '%s'..." % self.argv.sniff_if)</span><br><span> </span><br><span> # Start sniffing...</span><br><span style="color: hsl(0, 100%, 40%);">- scapy.all.sniff(iface = self.sniff_interface, store = 0,</span><br><span style="color: hsl(120, 100%, 40%);">+ scapy.all.sniff(iface = self.argv.sniff_if, store = 0,</span><br><span> filter = pkt_filter, prn = self.pkt_handler)</span><br><span> </span><br><span> # Scapy registers its own signal handler</span><br><span>@@ -92,7 +74,7 @@</span><br><span> </span><br><span> def pkt_handler(self, ether):</span><br><span> # Prevent loopback packet duplication</span><br><span style="color: hsl(0, 100%, 40%);">- if self.sniff_interface == "lo":</span><br><span style="color: hsl(120, 100%, 40%);">+ if self.argv.sniff_if == "lo":</span><br><span> self.lo_trigger = not self.lo_trigger</span><br><span> if not self.lo_trigger:</span><br><span> return</span><br><span>@@ -139,32 +121,34 @@</span><br><span> </span><br><span> def burst_pass_filter(self, l12trx, fn, tn):</span><br><span> # Direction filter</span><br><span style="color: hsl(0, 100%, 40%);">- if self.bf_dir_l12trx is not None:</span><br><span style="color: hsl(0, 100%, 40%);">- if l12trx != self.bf_dir_l12trx:</span><br><span style="color: hsl(120, 100%, 40%);">+ if self.argv.direction is not None:</span><br><span style="color: hsl(120, 100%, 40%);">+ if self.argv.direction == "TRX" and not l12trx:</span><br><span style="color: hsl(120, 100%, 40%);">+ return False</span><br><span style="color: hsl(120, 100%, 40%);">+ elif self.argv.direction == "L1" and l12trx:</span><br><span> return False</span><br><span> </span><br><span> # Timeslot filter</span><br><span style="color: hsl(0, 100%, 40%);">- if self.bf_tn_val is not None:</span><br><span style="color: hsl(0, 100%, 40%);">- if tn != self.bf_tn_val:</span><br><span style="color: hsl(120, 100%, 40%);">+ if self.argv.pf_tn is not None:</span><br><span style="color: hsl(120, 100%, 40%);">+ if tn != self.argv.pf_tn:</span><br><span> return False</span><br><span> </span><br><span> # Frame number filter</span><br><span style="color: hsl(0, 100%, 40%);">- if self.bf_fn_lt is not None:</span><br><span style="color: hsl(0, 100%, 40%);">- if fn > self.bf_fn_lt:</span><br><span style="color: hsl(120, 100%, 40%);">+ if self.argv.pf_fn_lt is not None:</span><br><span style="color: hsl(120, 100%, 40%);">+ if fn > self.argv.pf_fn_lt:</span><br><span> return False</span><br><span style="color: hsl(0, 100%, 40%);">- if self.bf_fn_gt is not None:</span><br><span style="color: hsl(0, 100%, 40%);">- if fn < self.bf_fn_gt:</span><br><span style="color: hsl(120, 100%, 40%);">+ if self.argv.pf_fn_gt is not None:</span><br><span style="color: hsl(120, 100%, 40%);">+ if fn < self.argv.pf_fn_gt:</span><br><span> return False</span><br><span> </span><br><span> # Burst passed ;)</span><br><span> return True</span><br><span> </span><br><span> def msg_handle(self, msg):</span><br><span style="color: hsl(0, 100%, 40%);">- if self.print_bursts:</span><br><span style="color: hsl(120, 100%, 40%);">+ if self.argv.verbose:</span><br><span> print(msg.burst)</span><br><span> </span><br><span> # Append a new message to the capture</span><br><span style="color: hsl(0, 100%, 40%);">- if self.output_file is not None:</span><br><span style="color: hsl(120, 100%, 40%);">+ if self.argv.output_file is not None:</span><br><span> self.ddf.append_msg(msg)</span><br><span> </span><br><span> def burst_count(self, fn, tn):</span><br><span>@@ -180,14 +164,14 @@</span><br><span> self.cnt_burst_num += 1</span><br><span> </span><br><span> # Stop sniffing after N bursts</span><br><span style="color: hsl(0, 100%, 40%);">- if self.cnt_burst_break is not None:</span><br><span style="color: hsl(0, 100%, 40%);">- if self.cnt_burst_num == self.cnt_burst_break:</span><br><span style="color: hsl(120, 100%, 40%);">+ if self.argv.burst_count is not None:</span><br><span style="color: hsl(120, 100%, 40%);">+ if self.cnt_burst_num == self.argv.burst_count:</span><br><span> log.info("Collected required amount of bursts")</span><br><span> return True</span><br><span> </span><br><span> # Stop sniffing after N frames</span><br><span style="color: hsl(0, 100%, 40%);">- if self.cnt_frame_break is not None:</span><br><span style="color: hsl(0, 100%, 40%);">- if self.cnt_frame_num == self.cnt_frame_break:</span><br><span style="color: hsl(120, 100%, 40%);">+ if self.argv.frame_count is not None:</span><br><span style="color: hsl(120, 100%, 40%);">+ if self.cnt_frame_num == self.argv.frame_count:</span><br><span> log.info("Collected required amount of frames")</span><br><span> return True</span><br><span> </span><br><span>@@ -203,88 +187,48 @@</span><br><span> # Exit</span><br><span> sys.exit(0)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- def print_help(self, msg = None):</span><br><span style="color: hsl(0, 100%, 40%);">- s = " Usage: " + sys.argv[0] + " [options]\n\n" \</span><br><span style="color: hsl(0, 100%, 40%);">- " Some help...\n" \</span><br><span style="color: hsl(0, 100%, 40%);">- " -h --help this text\n\n"</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- s += " Sniffing options\n" \</span><br><span style="color: hsl(0, 100%, 40%);">- " -i --sniff-interface Set network interface (default '%s')\n" \</span><br><span style="color: hsl(0, 100%, 40%);">- " -p --sniff-base-port Set base port number (default %d)\n\n"</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- s += " Processing (no processing by default)\n" \</span><br><span style="color: hsl(0, 100%, 40%);">- " -o --output-file Write bursts to file\n" \</span><br><span style="color: hsl(0, 100%, 40%);">- " -v --print-bits Print burst bits to stdout\n\n" \</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- s += " Count limitations (disabled by default)\n" \</span><br><span style="color: hsl(0, 100%, 40%);">- " --frame-count NUM Stop after sniffing NUM frames\n" \</span><br><span style="color: hsl(0, 100%, 40%);">- " --burst-count NUM Stop after sniffing NUM bursts\n\n"</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- s += " Filtering (disabled by default)\n" \</span><br><span style="color: hsl(0, 100%, 40%);">- " --direction DIR Burst direction: L12TRX or TRX2L1\n" \</span><br><span style="color: hsl(0, 100%, 40%);">- " --timeslot NUM TDMA timeslot number [0..7]\n" \</span><br><span style="color: hsl(0, 100%, 40%);">- " --frame-num-lt NUM TDMA frame number lower than NUM\n" \</span><br><span style="color: hsl(0, 100%, 40%);">- " --burst-num-gt NUM TDMA frame number greater than NUM\n"</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- print(s % (self.sniff_interface, self.sniff_base_port))</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- if msg is not None:</span><br><span style="color: hsl(0, 100%, 40%);">- print(msg)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span> def parse_argv(self):</span><br><span style="color: hsl(0, 100%, 40%);">- try:</span><br><span style="color: hsl(0, 100%, 40%);">- opts, args = getopt.getopt(sys.argv[1:],</span><br><span style="color: hsl(0, 100%, 40%);">- "i:p:o:v:h", ["help", "sniff-interface=", "sniff-base-port=",</span><br><span style="color: hsl(0, 100%, 40%);">- "frame-count=", "burst-count=", "direction=",</span><br><span style="color: hsl(0, 100%, 40%);">- "timeslot=", "frame-num-lt=", "frame-num-gt=",</span><br><span style="color: hsl(0, 100%, 40%);">- "output-file=", "print-bits"])</span><br><span style="color: hsl(0, 100%, 40%);">- except getopt.GetoptError as err:</span><br><span style="color: hsl(0, 100%, 40%);">- self.print_help("[!] " + str(err))</span><br><span style="color: hsl(0, 100%, 40%);">- sys.exit(2)</span><br><span style="color: hsl(120, 100%, 40%);">+ parser = argparse.ArgumentParser(prog = "trx_sniff",</span><br><span style="color: hsl(120, 100%, 40%);">+ description = "Scapy-based TRX interface sniffer")</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- for o, v in opts:</span><br><span style="color: hsl(0, 100%, 40%);">- if o in ("-h", "--help"):</span><br><span style="color: hsl(0, 100%, 40%);">- self.print_help()</span><br><span style="color: hsl(0, 100%, 40%);">- sys.exit(2)</span><br><span style="color: hsl(120, 100%, 40%);">+ parser.add_argument("-v", "--verbose",</span><br><span style="color: hsl(120, 100%, 40%);">+ dest = "verbose", action = "store_true",</span><br><span style="color: hsl(120, 100%, 40%);">+ help = "Print burst bits to stdout")</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- elif o in ("-i", "--sniff-interface"):</span><br><span style="color: hsl(0, 100%, 40%);">- self.sniff_interface = v</span><br><span style="color: hsl(0, 100%, 40%);">- elif o in ("-p", "--sniff-base-port"):</span><br><span style="color: hsl(0, 100%, 40%);">- self.sniff_base_port = int(v)</span><br><span style="color: hsl(120, 100%, 40%);">+ trx_group = parser.add_argument_group("TRX interface")</span><br><span style="color: hsl(120, 100%, 40%);">+ trx_group.add_argument("-i", "--sniff-interface",</span><br><span style="color: hsl(120, 100%, 40%);">+ dest = "sniff_if", type = str, default = "lo", metavar = "IF",</span><br><span style="color: hsl(120, 100%, 40%);">+ help = "Set network interface (default '%(default)s')")</span><br><span style="color: hsl(120, 100%, 40%);">+ trx_group.add_argument("-p", "--base-port",</span><br><span style="color: hsl(120, 100%, 40%);">+ dest = "base_port", type = int, default = 6700,</span><br><span style="color: hsl(120, 100%, 40%);">+ help = "Set base port number (default %(default)s)")</span><br><span style="color: hsl(120, 100%, 40%);">+ trx_group.add_argument("-o", "--output-file", metavar = "FILE",</span><br><span style="color: hsl(120, 100%, 40%);">+ dest = "output_file", type = str,</span><br><span style="color: hsl(120, 100%, 40%);">+ help = "Write bursts to a capture file")</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- elif o in ("-o", "--output-file"):</span><br><span style="color: hsl(0, 100%, 40%);">- self.output_file = v</span><br><span style="color: hsl(0, 100%, 40%);">- elif o in ("-v", "--print-bits"):</span><br><span style="color: hsl(0, 100%, 40%);">- self.print_bursts = True</span><br><span style="color: hsl(120, 100%, 40%);">+ cnt_group = parser.add_argument_group("Count limitations (optional)")</span><br><span style="color: hsl(120, 100%, 40%);">+ cnt_group.add_argument("--frame-count", metavar = "N",</span><br><span style="color: hsl(120, 100%, 40%);">+ dest = "frame_count", type = int,</span><br><span style="color: hsl(120, 100%, 40%);">+ help = "Stop after sniffing N frames")</span><br><span style="color: hsl(120, 100%, 40%);">+ cnt_group.add_argument("--burst-count", metavar = "N",</span><br><span style="color: hsl(120, 100%, 40%);">+ dest = "burst_count", type = int,</span><br><span style="color: hsl(120, 100%, 40%);">+ help = "Stop after sniffing N bursts")</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- # Break counters</span><br><span style="color: hsl(0, 100%, 40%);">- elif o == "--frame-count":</span><br><span style="color: hsl(0, 100%, 40%);">- self.cnt_frame_break = int(v)</span><br><span style="color: hsl(0, 100%, 40%);">- elif o == "--burst-count":</span><br><span style="color: hsl(0, 100%, 40%);">- self.cnt_burst_break = int(v)</span><br><span style="color: hsl(120, 100%, 40%);">+ pf_group = parser.add_argument_group("Filtering (optional)")</span><br><span style="color: hsl(120, 100%, 40%);">+ pf_group.add_argument("--direction",</span><br><span style="color: hsl(120, 100%, 40%);">+ dest = "direction", type = str, choices = ["TRX", "L1"],</span><br><span style="color: hsl(120, 100%, 40%);">+ help = "Burst direction")</span><br><span style="color: hsl(120, 100%, 40%);">+ pf_group.add_argument("--timeslot", metavar = "TN",</span><br><span style="color: hsl(120, 100%, 40%);">+ dest = "pf_tn", type = int, choices = range(0, 8),</span><br><span style="color: hsl(120, 100%, 40%);">+ help = "TDMA timeslot number (equal TN)")</span><br><span style="color: hsl(120, 100%, 40%);">+ pf_group.add_argument("--frame-num-lt", metavar = "FN",</span><br><span style="color: hsl(120, 100%, 40%);">+ dest = "pf_fn_lt", type = int,</span><br><span style="color: hsl(120, 100%, 40%);">+ help = "TDMA frame number (lower than FN)")</span><br><span style="color: hsl(120, 100%, 40%);">+ pf_group.add_argument("--frame-num-gt", metavar = "FN",</span><br><span style="color: hsl(120, 100%, 40%);">+ dest = "pf_fn_gt", type = int,</span><br><span style="color: hsl(120, 100%, 40%);">+ help = "TDMA frame number (greater than FN)")</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- # Direction filter</span><br><span style="color: hsl(0, 100%, 40%);">- elif o == "--direction":</span><br><span style="color: hsl(0, 100%, 40%);">- if v == "L12TRX":</span><br><span style="color: hsl(0, 100%, 40%);">- self.bf_dir_l12trx = True</span><br><span style="color: hsl(0, 100%, 40%);">- elif v == "TRX2L1":</span><br><span style="color: hsl(0, 100%, 40%);">- self.bf_dir_l12trx = False</span><br><span style="color: hsl(0, 100%, 40%);">- else:</span><br><span style="color: hsl(0, 100%, 40%);">- self.print_help("[!] Wrong direction argument")</span><br><span style="color: hsl(0, 100%, 40%);">- sys.exit(2)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Timeslot pass filter</span><br><span style="color: hsl(0, 100%, 40%);">- elif o == "--timeslot":</span><br><span style="color: hsl(0, 100%, 40%);">- self.bf_tn_val = int(v)</span><br><span style="color: hsl(0, 100%, 40%);">- if self.bf_tn_val < 0 or self.bf_tn_val > 7:</span><br><span style="color: hsl(0, 100%, 40%);">- self.print_help("[!] Wrong timeslot value")</span><br><span style="color: hsl(0, 100%, 40%);">- sys.exit(2)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Frame number pass filter</span><br><span style="color: hsl(0, 100%, 40%);">- elif o == "--frame-num-lt":</span><br><span style="color: hsl(0, 100%, 40%);">- self.bf_fn_lt = int(v)</span><br><span style="color: hsl(0, 100%, 40%);">- elif o == "--frame-num-gt":</span><br><span style="color: hsl(0, 100%, 40%);">- self.bf_fn_gt = int(v)</span><br><span style="color: hsl(120, 100%, 40%);">+ return parser.parse_args()</span><br><span> </span><br><span> if __name__ == '__main__':</span><br><span> app = Application()</span><br><span></span><br></pre><p>To view, visit <a href="https://gerrit.osmocom.org/12190">change 12190</a>. To unsubscribe, or for help writing mail filters, visit <a href="https://gerrit.osmocom.org/settings">settings</a>.</p><div itemscope itemtype="http://schema.org/EmailMessage"><div itemscope itemprop="action" itemtype="http://schema.org/ViewAction"><link itemprop="url" href="https://gerrit.osmocom.org/12190"/><meta itemprop="name" content="View Change"/></div></div>
<div style="display:none"> Gerrit-Project: osmocom-bb </div>
<div style="display:none"> Gerrit-Branch: master </div>
<div style="display:none"> Gerrit-MessageType: merged </div>
<div style="display:none"> Gerrit-Change-Id: Id1dacaa32134bfa68344e6c48310390cdd85cdc9 </div>
<div style="display:none"> Gerrit-Change-Number: 12190 </div>
<div style="display:none"> Gerrit-PatchSet: 1 </div>
<div style="display:none"> Gerrit-Owner: Vadim Yanitskiy <axilirator@gmail.com> </div>
<div style="display:none"> Gerrit-Reviewer: Jenkins Builder (1000002) </div>
<div style="display:none"> Gerrit-Reviewer: Pau Espin Pedrol <pespin@sysmocom.de> </div>
<div style="display:none"> Gerrit-Reviewer: Vadim Yanitskiy <axilirator@gmail.com> </div>