falconia has uploaded this change for review. (
https://gerrit.osmocom.org/c/libosmo-netif/+/39280?usp=email )
Change subject: bring twjit into libosmo-netif
......................................................................
bring twjit into libosmo-netif
twjit is the jitter buffer portion of Themyscira Wireless RTP
endpoint implementation, an alternative to Belledonne libortp.
Unlike Belledonne software, ThemWi RTP library was developed
specifically for use in Osmocom-based GSM, ISDN and IP-PSTN
network elements, and is built on top of libosmocore primitives -
thus it can function more natively in Osmocom universe than ortp.
This ThemWi library was initially developed externally to Osmocom,
but is now being brought into libosmo-netif so it can be used by
native Osmocom projects, particularly OsmoBTS.
Related: OS#6474
Change-Id: Ia3be5834571ca18b68939abbcf1ce3a879156658
---
M configure.ac
M include/osmocom/netif/Makefile.am
A include/osmocom/netif/twjit.h
A include/osmocom/netif/twjit_private.h
M src/Makefile.am
A src/twjit.c
A src/twjit_in.c
A src/twjit_out.c
A src/twjit_vty.c
9 files changed, 955 insertions(+), 2 deletions(-)
git pull ssh://gerrit.osmocom.org:29418/libosmo-netif refs/changes/80/39280/1
diff --git a/configure.ac b/configure.ac
index 9513270..9b52901 100644
--- a/configure.ac
+++ b/configure.ac
@@ -92,6 +92,7 @@
PKG_CHECK_MODULES(LIBOSMOCORE, libosmocore >= 1.10.0)
PKG_CHECK_MODULES(LIBOSMOGSM, libosmogsm >= 1.10.0)
PKG_CHECK_MODULES(LIBOSMOCODEC, libosmocodec >= 1.10.0)
+PKG_CHECK_MODULES(LIBOSMOVTY, libosmovty >= 1.10.0)
AC_ARG_ENABLE([libsctp], [AS_HELP_STRING([--disable-libsctp], [Do not enable socket
multiaddr APIs requiring libsctp])],
[ENABLE_LIBSCTP=$enableval], [ENABLE_LIBSCTP="yes"])
diff --git a/include/osmocom/netif/Makefile.am b/include/osmocom/netif/Makefile.am
index d518edb..d06adda 100644
--- a/include/osmocom/netif/Makefile.am
+++ b/include/osmocom/netif/Makefile.am
@@ -1,5 +1,6 @@
noinst_HEADERS = \
stream_private.h \
+ twjit_private.h \
$(NULL)
osmonetif_HEADERS = amr.h \
@@ -12,6 +13,7 @@
rs232.h \
rtp.h \
stream.h \
+ twjit.h \
$(NULL)
if ENABLE_LIBSCTP
diff --git a/include/osmocom/netif/twjit.h b/include/osmocom/netif/twjit.h
new file mode 100644
index 0000000..905f113
--- /dev/null
+++ b/include/osmocom/netif/twjit.h
@@ -0,0 +1,135 @@
+/*
+ * Themyscira Wireless RTP jitter buffer implementation:
+ * public API definition for Osmocom-integrated version.
+ *
+ * This code was contributed to Osmocom Cellular Network Infrastructure
+ * project by Mother Mychaela N. Falconia of Themyscira Wireless.
+ * Mother Mychaela's contributions are NOT subject to copyright:
+ * no rights reserved, all rights relinquished.
+ */
+
+#pragma once
+
+#include <stdint.h>
+#include <stdbool.h>
+
+/*
+ * Each instance of twjit in the present version exists as struct osmo_twjit.
+ * This structure is opaque, and always constitutes a talloc context.
+ */
+struct osmo_twjit;
+
+/*
+ * twjit configuration tunings, usually set via vty. This config structure
+ * always has to be provided in order to create a twjit instance.
+ */
+struct osmo_twjit_config {
+ /* buffer depth: starting minimum and high watermark */
+ uint16_t bd_start;
+ uint16_t bd_hiwat;
+ /* interval for thinning of too-deep standing queue */
+ uint16_t thinning_int;
+ /* guard against time traveler RTP packets */
+ uint16_t max_future_sec;
+ /* min and max time delta in starting state, 0 means not set */
+ uint16_t start_min_delta;
+ uint16_t start_max_delta;
+};
+
+/*
+ * Stats collected during the lifetime of a twjit instance.
+ */
+struct osmo_twjit_stats {
+ /* normal operation */
+ uint32_t rx_packets;
+ uint32_t delivered_pkt;
+ uint32_t handovers_in;
+ uint32_t handovers_out;
+ /* undesirable, but not totally unexpected */
+ uint32_t too_old;
+ uint32_t underruns;
+ uint32_t ho_underruns;
+ uint32_t output_gaps;
+ uint32_t thinning_drops;
+ /* unusual error events */
+ uint32_t bad_packets;
+ uint32_t duplicate_ts;
+ /* independent analysis of Rx packet stream */
+ uint32_t ssrc_changes;
+ uint32_t seq_skips;
+ uint32_t seq_backwards;
+ uint32_t seq_repeats;
+ uint32_t intentional_gaps;
+ uint32_t ts_resets;
+ uint32_t jitter_max;
+};
+
+/*
+ * Info collected from the incoming RTP data stream
+ * for the purpose of generating RTCP reception report blocks.
+ * Key point: unlike the counters in struct osmo_twjit_stats,
+ * all RR info is reset to initial whenever incoming SSRC changes,
+ * as necessitated by RTCP data model being organized per SSRC.
+ */
+struct osmo_twjit_rr_info {
+ uint32_t ssrc;
+ uint32_t rx_packets;
+ uint32_t base_seq;
+ uint32_t max_seq_ext;
+ uint32_t expected_pkt;
+ uint32_t jitter_accum;
+};
+
+/* twjit module API functions */
+
+struct osmo_twjit *osmo_twjit_create(void *ctx, uint16_t clock_khz,
+ uint16_t quantum_ms,
+ const struct osmo_twjit_config *config);
+void osmo_twjit_destroy(struct osmo_twjit *twjit);
+
+void osmo_twjit_new_config(struct osmo_twjit *twjit,
+ const struct osmo_twjit_config *config);
+void osmo_twjit_reset(struct osmo_twjit *twjit);
+
+struct msgb;
+
+/* RTP input, takes ownership of msgb */
+void osmo_twjit_input(struct osmo_twjit *twjit, struct msgb *msg);
+
+/* output function, to be called by TDM/GSM/etc fixed-timing side */
+struct msgb *osmo_twjit_output(struct osmo_twjit *twjit);
+
+/* Stats and RR info structures are contained inside opaque struct osmo_twjit.
+ * We need to provide access to these stats and RR info structures to API
+ * users, but we don't want to make the whole twjit instance struct public.
+ * Also we would like to have fast external access to these stats, hence an API
+ * that copies our stats to caller-provided storage would be very inefficient.
+ * Compromise: we allow direct external access to just these selected parts
+ * of the full internal state structure by providing API functions that
+ * return pointers to these selected parts.
+ */
+const struct osmo_twjit_stats *
+osmo_twjit_get_stats(struct osmo_twjit *twjit);
+
+const struct osmo_twjit_rr_info *
+osmo_twjit_get_rr_info(struct osmo_twjit *twjit);
+
+/* When we compose outgoing RTCP packets in the upper layer of twrtp,
+ * we need to know whether or not we have received at least one valid
+ * RTP data packet so far. If we haven't received any RTP yet, then
+ * we have no Rx SSRC, all data in struct osmo_twjit_rr_info are invalid,
+ * and we cannot send RTCP reception reports.
+ */
+bool osmo_twjit_got_any_input(struct osmo_twjit *twjit);
+
+/* vty configuration functions */
+
+void osmo_twjit_init_defaults(struct osmo_twjit_config *config);
+
+void osmo_twjit_vty_init(int twjit_node);
+
+struct vty;
+
+int osmo_twjit_config_write(struct vty *vty,
+ const struct osmo_twjit_config *conf,
+ const char *name, const char *prefix);
diff --git a/include/osmocom/netif/twjit_private.h
b/include/osmocom/netif/twjit_private.h
new file mode 100644
index 0000000..0312af4
--- /dev/null
+++ b/include/osmocom/netif/twjit_private.h
@@ -0,0 +1,78 @@
+/*
+ * Themyscira Wireless RTP jitter buffer implementation:
+ * internal definitions confined to twjit code inside libosmo-netif.
+ *
+ * This code was contributed to Osmocom Cellular Network Infrastructure
+ * project by Mother Mychaela N. Falconia of Themyscira Wireless.
+ * Mother Mychaela's contributions are NOT subject to copyright:
+ * no rights reserved, all rights relinquished.
+ */
+
+#pragma once
+
+#include <stdint.h>
+#include <stdbool.h>
+
+#include <osmocom/core/linuxlist.h>
+#include <osmocom/core/timer.h>
+
+#include <osmocom/netif/twjit.h>
+
+/*
+ * Each twjit instance has two sub-buffers; each subbuf is a queue of
+ * received RTP packets that have the same SSRC and whose timestamps
+ * increment in the expected cadence, with each ts delta being an
+ * integral multiple of the samples-per-quantum constant.
+ */
+struct twjit_subbuf {
+ uint32_t ssrc;
+ uint32_t head_ts;
+ struct llist_head queue;
+ uint32_t depth;
+ uint32_t delta_ms; /* used only in starting state */
+ /* thinning mechanism */
+ uint16_t drop_int_count;
+ /* running config for this subbuf */
+ struct osmo_twjit_config conf;
+};
+
+/*
+ * Each twjit instance is in one of 4 fundamental states at any moment,
+ * as enumerated here.
+ */
+enum twjit_state {
+ TWJIT_STATE_EMPTY,
+ TWJIT_STATE_HUNT,
+ TWJIT_STATE_FLOWING,
+ TWJIT_STATE_HANDOVER,
+};
+
+/* Main structure for one instance of twjit */
+struct osmo_twjit {
+ /* pointer to config structure given to osmo_twjit_create(),
+ * memory must remain valid, but content can change at any time. */
+ const struct osmo_twjit_config *ext_config;
+ /* count of RTP timestamp units per quantum */
+ uint32_t ts_quantum;
+ /* quanta per second, used to scale max_future_sec */
+ uint16_t quanta_per_sec;
+ /* scaling factors for time delta conversions */
+ uint16_t ts_units_per_ms;
+ uint32_t ts_units_per_sec;
+ uint32_t ns_to_ts_units;
+ /* operational state */
+ enum twjit_state state;
+ struct twjit_subbuf sb[2];
+ uint8_t read_sb; /* 0 or 1 */
+ uint8_t write_sb; /* ditto */
+ /* info about the most recent Rx packet */
+ uint32_t last_ts;
+ uint16_t last_seq;
+ bool got_first_packet;
+ struct timespec last_arrival;
+ uint32_t last_arrival_delta;
+ /* analytics for RTCP RR, also remembers last SSRC */
+ struct osmo_twjit_rr_info rr_info;
+ /* stats over lifetime of this instance */
+ struct osmo_twjit_stats stats;
+};
diff --git a/src/Makefile.am b/src/Makefile.am
index e3634be..171b879 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -3,12 +3,14 @@
LIBVERSION=13:1:2
AM_CPPFLAGS = -I$(top_srcdir)/include -I$(top_builddir)
-AM_CFLAGS= -fPIC -Wall $(LIBOSMOCORE_CFLAGS) $(LIBOSMOGSM_CFLAGS) $(COVERAGE_CFLAGS)
$(LIBSCTP_CFLAGS)
+AM_CFLAGS= -fPIC -Wall $(LIBOSMOCORE_CFLAGS) $(LIBOSMOGSM_CFLAGS) \
+ $(LIBOSMOVTY_CFLAGS) $(COVERAGE_CFLAGS) $(LIBSCTP_CFLAGS)
AM_LDFLAGS = $(COVERAGE_LDFLAGS)
lib_LTLIBRARIES = libosmonetif.la
-libosmonetif_la_LIBADD = $(LIBOSMOCORE_LIBS) $(LIBOSMOGSM_LIBS) $(LIBSCTP_LIBS)
+libosmonetif_la_LIBADD = $(LIBOSMOCORE_LIBS) $(LIBOSMOGSM_LIBS) \
+ $(LIBOSMOVTY_LIBS) $(LIBSCTP_LIBS)
libosmonetif_la_LDFLAGS = $(AM_LDFLAGS) -version-info $(LIBVERSION) -no-undefined
libosmonetif_la_SOURCES = amr.c \
@@ -26,6 +28,10 @@
stream.c \
stream_cli.c \
stream_srv.c \
+ twjit.c \
+ twjit_in.c \
+ twjit_out.c \
+ twjit_vty.c \
$(NULL)
if ENABLE_LIBSCTP
diff --git a/src/twjit.c b/src/twjit.c
new file mode 100644
index 0000000..4c77f53
--- /dev/null
+++ b/src/twjit.c
@@ -0,0 +1,171 @@
+/*
+ * Themyscira Wireless RTP jitter buffer implementation: basic functions,
+ * everything that isn't factored out into input handling, output handling
+ * or vty config modules.
+ *
+ * This code was contributed to Osmocom Cellular Network Infrastructure
+ * project by Mother Mychaela N. Falconia of Themyscira Wireless.
+ * Mother Mychaela's contributions are NOT subject to copyright:
+ * no rights reserved, all rights relinquished.
+ */
+
+#include <stdint.h>
+#include <stdbool.h>
+#include <string.h>
+
+#include <osmocom/core/linuxlist.h>
+#include <osmocom/core/msgb.h>
+#include <osmocom/core/talloc.h>
+#include <osmocom/core/utils.h>
+
+#include <osmocom/netif/twjit.h>
+#include <osmocom/netif/twjit_private.h>
+
+void osmo_twjit_init_defaults(struct osmo_twjit_config *config)
+{
+ memset(config, 0, sizeof(struct osmo_twjit_config));
+
+ /* While the theoretical minimum starting fill level is 1, the
+ * practically useful minimum (achieving lowest latency, but not
+ * incurring underruns in normal healthy operation) is 2 for typical
+ * network configurations that combine elements with "perfect" 20 ms
+ * timing (T1/E1 interfaces, external IP-PSTN links, software
+ * transcoders timed by system clock etc) and GSM-to-IP OsmoBTS
+ * whose 20 ms timing contains the small inherent jitter of TDMA. */
+ config->bd_start = 2;
+
+ /* The high water mark setting determines when the standing queue
+ * thinning mechanism kicks in. A standing queue that is longer
+ * than the starting fill level will occur when the flow starts
+ * during a network latency spike, but then the network latency
+ * goes down. If this setting is too high, deep standing queues
+ * will persist, adding needless latency to speech or CSD.
+ * If this setting is too low, the thinning mechanism will be
+ * too invasive, needlessly and perhaps frequently deleting a quantum
+ * of speech or data from the stream and incurring a phase shift.
+ * Starting fill level plus 2 seems like a good default. */
+ config->bd_hiwat = 4;
+
+ /* When the standing queue thinning mechanism does kick in,
+ * it drops every Nth packet, where N is the thinning interval.
+ * Given that this mechanism forcibly deletes a quantum of speech
+ * or data from the stream, these induced disruptions should be
+ * spaced out, and the managing operator should also keep in mind
+ * that the incurred phase shift may be a problem for some
+ * applications, particularly CSD. Our current default is
+ * a prime number, reducing the probability that the thinning
+ * mechanism will interfere badly with intrinsic features of the
+ * stream being thinned. 17 quantum units at 20 ms per quantum
+ * is 340 ms, which should be sufficiently long spacing to make
+ * speech quantum deletions tolerable. */
+ config->thinning_int = 17;
+
+ /* With RTP timestamps being 32 bits and with the usual RTP
+ * clock rate of 8000 timestamp units per second, a packet may
+ * arrive that claims to be as far as 3 days into the future.
+ * Such aberrant RTP packets are jocularly referred to as
+ * time travelers. Assuming that actual time travel either
+ * does not exist at all or at least does not happen in the
+ * present context, we reason that when such "time traveler" RTP
+ * packets do arrive, we must be dealing with the effect of a
+ * software bug or misdesign or misconfiguration in whatever
+ * foreign network element is sending us RTP. In any case,
+ * irrespective of the cause, we must be prepared for the
+ * possibility of seeming "time travel" in the incoming RTP stream.
+ * We implement an arbitrary threshold: if the received RTP ts
+ * is too far into the future, we treat that packet as the
+ * beginning of a new stream, same as SSRC change or non-quantum
+ * ts increment. This threshold has 1 s granularity, which is
+ * sufficient for its intended purpose of catching gross errors.
+ * The minimum setting of this threshold is 1 s, but let's
+ * default to 10 s, being generous to networks with really bad
+ * latency. */
+ config->max_future_sec = 10;
+}
+
+/* create and destroy functions */
+
+struct osmo_twjit *osmo_twjit_create(void *ctx, uint16_t clock_khz,
+ uint16_t quantum_ms,
+ const struct osmo_twjit_config *config)
+{
+ struct osmo_twjit *twjit;
+
+ twjit = talloc_zero(ctx, struct osmo_twjit);
+ if (!twjit)
+ return NULL;
+
+ twjit->ext_config = config;
+ twjit->state = TWJIT_STATE_EMPTY;
+ INIT_LLIST_HEAD(&twjit->sb[0].queue);
+ INIT_LLIST_HEAD(&twjit->sb[1].queue);
+ twjit->ts_quantum = (uint32_t) quantum_ms * clock_khz;
+ twjit->quanta_per_sec = 1000 / quantum_ms;
+ twjit->ts_units_per_ms = clock_khz;
+ twjit->ts_units_per_sec = (uint32_t) clock_khz * 1000;
+ twjit->ns_to_ts_units = 1000000 / clock_khz;
+
+ return twjit;
+}
+
+void osmo_twjit_destroy(struct osmo_twjit *twjit)
+{
+ msgb_queue_free(&twjit->sb[0].queue);
+ msgb_queue_free(&twjit->sb[1].queue);
+ talloc_free(twjit);
+}
+
+/* Here is how twjit config works: every twjit instance remembers
+ * a pointer to struct osmo_twjit_config, either the initial one
+ * given to osmo_twjit_create() or an updated one set with
+ * osmo_twjit_new_config(). However, the memory holding this
+ * config structure remains owned by the application, and all
+ * config settings therein may be freely changed by vty at any time.
+ * In the case of changes to twjit config after the call to
+ * osmo_twjit_create(), whether these changes are done by feeding
+ * a new config structure to osmo_twjit_new_config() or by changing
+ * values in the previously-supplied structure, all changes take
+ * effect atomically whenever a new sub-buffer is initialized,
+ * upon receiving the first RTP packet into a completely empty
+ * buffer or upon receiving a packet that constitutes handover.
+ */
+void osmo_twjit_new_config(struct osmo_twjit *twjit,
+ const struct osmo_twjit_config *config)
+{
+ twjit->ext_config = config;
+}
+
+/* The following reset function is intended to be called when the
+ * application stops doing regular (once every time quantum) reads
+ * from the jitter buffer, but may resume this activity later.
+ * All packet Rx state and queues are cleared, but "lifetime"
+ * statistical counters are NOT reset.
+ */
+void osmo_twjit_reset(struct osmo_twjit *twjit)
+{
+ msgb_queue_free(&twjit->sb[0].queue);
+ msgb_queue_free(&twjit->sb[1].queue);
+ twjit->state = TWJIT_STATE_EMPTY;
+ twjit->sb[0].depth = 0;
+ twjit->sb[1].depth = 0;
+ twjit->got_first_packet = false;
+}
+
+/* simple information retrieval functions */
+
+const struct osmo_twjit_stats *
+osmo_twjit_get_stats(struct osmo_twjit *twjit)
+{
+ return &twjit->stats;
+}
+
+const struct osmo_twjit_rr_info *
+osmo_twjit_get_rr_info(struct osmo_twjit *twjit)
+{
+ return &twjit->rr_info;
+}
+
+bool osmo_twjit_got_any_input(struct osmo_twjit *twjit)
+{
+ return twjit->got_first_packet;
+}
diff --git a/src/twjit_in.c b/src/twjit_in.c
new file mode 100644
index 0000000..6f0583d
--- /dev/null
+++ b/src/twjit_in.c
@@ -0,0 +1,304 @@
+/*
+ * Themyscira Wireless RTP jitter buffer implementation: input processing
+ * of received RTP packets.
+ *
+ * This code was contributed to Osmocom Cellular Network Infrastructure
+ * project by Mother Mychaela N. Falconia of Themyscira Wireless.
+ * Mother Mychaela's contributions are NOT subject to copyright:
+ * no rights reserved, all rights relinquished.
+ */
+
+#include <stdint.h>
+#include <stdbool.h>
+#include <string.h>
+#include <arpa/inet.h> /* for network byte order functions */
+
+#include <osmocom/core/linuxlist.h>
+#include <osmocom/core/msgb.h>
+#include <osmocom/core/timer.h>
+#include <osmocom/core/utils.h>
+
+#include <osmocom/netif/twjit.h>
+#include <osmocom/netif/twjit_private.h>
+#include <osmocom/netif/rtp.h>
+
+/* raw analytics on the Rx packet stream */
+
+static void analytics_init(struct osmo_twjit *twjit, uint32_t rx_ssrc,
+ uint16_t rx_seq)
+{
+ struct osmo_twjit_rr_info *rri = &twjit->rr_info;
+
+ rri->ssrc = rx_ssrc;
+ rri->rx_packets = 1;
+ rri->base_seq = rx_seq;
+ rri->max_seq_ext = rx_seq;
+ rri->expected_pkt = 1;
+ rri->jitter_accum = 0;
+}
+
+static void analytics_cont(struct osmo_twjit *twjit, uint16_t rx_seq,
+ uint32_t rx_ts, const struct timespec *now)
+{
+ struct osmo_twjit_rr_info *rri = &twjit->rr_info;
+ uint16_t seq_ext_lo = rri->max_seq_ext;
+ uint16_t seq_ext_hi = rri->max_seq_ext >> 16;
+ int16_t seq_delta = (int16_t)(rx_seq - twjit->last_seq);
+ int16_t seq_delta2 = (int16_t)(rx_seq - seq_ext_lo);
+ int32_t ts_delta = (int32_t)(rx_ts - twjit->last_ts);
+ struct timespec time_delta;
+ uint32_t time_delta_tsu;
+ int32_t jitter_new, ts_delta_clamp;
+
+ /* analytics for our own stats */
+ if (seq_delta < 0)
+ twjit->stats.seq_backwards++;
+ else if (seq_delta == 0)
+ twjit->stats.seq_repeats++;
+ else if (seq_delta == 1) {
+ if (ts_delta != twjit->ts_quantum) {
+ if (ts_delta > 0 && (ts_delta % twjit->ts_quantum) == 0)
+ twjit->stats.intentional_gaps++;
+ else
+ twjit->stats.ts_resets++;
+ }
+ } else
+ twjit->stats.seq_skips++;
+
+ /* analytics for RTCP RR: packet counts */
+ rri->rx_packets++;
+ if (seq_delta2 > 0) {
+ if (rx_seq < seq_ext_lo)
+ seq_ext_hi++;
+ seq_ext_lo = rx_seq;
+ rri->max_seq_ext = ((uint32_t) seq_ext_hi << 16) | seq_ext_lo;
+ rri->expected_pkt = rri->max_seq_ext - rri->base_seq + 1;
+ }
+
+ /* time-of-arrival analytics */
+ time_delta.tv_sec = now->tv_sec - twjit->last_arrival.tv_sec;
+ time_delta.tv_nsec = now->tv_nsec - twjit->last_arrival.tv_nsec;
+ if (time_delta.tv_nsec < 0) {
+ time_delta.tv_sec--;
+ time_delta.tv_nsec += 1000000000;
+ }
+ /* to avoid overflows in downstream math, clamp to 1 hour */
+ if (time_delta.tv_sec >= 3600) {
+ time_delta.tv_sec = 3600;
+ time_delta.tv_nsec = 0;
+ }
+ /* convert to RTP timestamp units */
+ time_delta_tsu = time_delta.tv_sec * twjit->ts_units_per_sec +
+ time_delta.tv_nsec / twjit->ns_to_ts_units;
+ twjit->last_arrival_delta = time_delta_tsu;
+ /* jitter calculation for RTCP RR */
+ ts_delta_clamp = twjit->ts_units_per_sec * 3600;
+ if (ts_delta > ts_delta_clamp)
+ ts_delta = ts_delta_clamp;
+ else if (ts_delta < -ts_delta_clamp)
+ ts_delta = -ts_delta_clamp;
+ jitter_new = time_delta_tsu - ts_delta;
+ if (jitter_new < 0)
+ jitter_new = -jitter_new;
+ rri->jitter_accum += jitter_new - ((rri->jitter_accum + 8) >> 4);
+ if (jitter_new > twjit->stats.jitter_max)
+ twjit->stats.jitter_max = jitter_new;
+}
+
+/* actual twjit input logic */
+
+static void
+init_subbuf_first_packet(struct osmo_twjit *twjit, struct msgb *msg,
+ uint32_t rx_ssrc, uint32_t rx_ts)
+{
+ struct twjit_subbuf *sb = &twjit->sb[twjit->write_sb];
+
+ OSMO_ASSERT(llist_empty(&sb->queue));
+ OSMO_ASSERT(sb->depth == 0);
+ /* all good, proceed */
+ sb->ssrc = rx_ssrc;
+ sb->head_ts = rx_ts;
+ msgb_enqueue(&sb->queue, msg);
+ sb->depth = 1;
+ memcpy(&sb->conf, twjit->ext_config, sizeof(struct osmo_twjit_config));
+ sb->drop_int_count = 0;
+ /* The setting of delta_ms is needed in order to pacify the check
+ * in twjit_out.c:starting_sb_is_ready() in configurations with
+ * bd_start=1. An alternative would be to enforce start_min_delta
+ * being not set with bd_start=1, but the present solution is
+ * simpler than doing cross-enforcement between two different
+ * parameter settings in vty. */
+ sb->delta_ms = UINT32_MAX;
+}
+
+enum input_decision {
+ INPUT_CONTINUE,
+ INPUT_TOO_OLD,
+ INPUT_RESET,
+};
+
+static enum input_decision
+check_input_for_subbuf(struct osmo_twjit *twjit, bool starting,
+ uint32_t rx_ssrc, uint32_t rx_ts)
+{
+ struct twjit_subbuf *sb = &twjit->sb[twjit->write_sb];
+ int32_t ts_delta;
+
+ if (rx_ssrc != sb->ssrc)
+ return INPUT_RESET;
+ sb->delta_ms = twjit->last_arrival_delta / twjit->ts_units_per_ms;
+ ts_delta = (int32_t)(rx_ts - sb->head_ts);
+ if (ts_delta < 0)
+ return INPUT_TOO_OLD;
+ if (ts_delta % twjit->ts_quantum)
+ return INPUT_RESET;
+ if (starting) {
+ if (sb->conf.start_max_delta &&
+ sb->delta_ms > sb->conf.start_max_delta)
+ return INPUT_RESET;
+ } else {
+ uint32_t fwd = ts_delta / twjit->ts_quantum;
+
+ if (fwd >= (uint32_t) sb->conf.max_future_sec *
+ twjit->quanta_per_sec)
+ return INPUT_RESET;
+ }
+ return INPUT_CONTINUE;
+}
+
+static void toss_write_queue(struct osmo_twjit *twjit)
+{
+ struct twjit_subbuf *sb = &twjit->sb[twjit->write_sb];
+
+ msgb_queue_free(&sb->queue);
+ sb->depth = 0;
+}
+
+static void insert_pkt_write_sb(struct osmo_twjit *twjit, struct msgb *new_msg,
+ uint32_t rx_ts)
+{
+ struct twjit_subbuf *sb = &twjit->sb[twjit->write_sb];
+ uint32_t ts_delta = rx_ts - sb->head_ts;
+ uint32_t ins_depth = ts_delta / twjit->ts_quantum;
+ struct msgb *old_msg;
+ uint32_t old_ts_delta;
+
+ /* are we increasing total depth, and can we do simple tail append? */
+ if (ins_depth >= sb->depth) {
+ msgb_enqueue(&sb->queue, new_msg);
+ sb->depth = ins_depth + 1;
+ return;
+ }
+ /* nope - do it the hard way */
+ llist_for_each_entry(old_msg, &sb->queue, list) {
+ old_ts_delta = old_msg->cb[0] - sb->head_ts;
+ if (old_ts_delta == ts_delta) {
+ /* two packets with the same timestamp! */
+ twjit->stats.duplicate_ts++;
+ msgb_free(new_msg);
+ return;
+ }
+ if (old_ts_delta > ts_delta)
+ break;
+ }
+ llist_add_tail(&new_msg->list, &old_msg->list);
+}
+
+static void trim_starting_sb(struct osmo_twjit *twjit)
+{
+ struct twjit_subbuf *sb = &twjit->sb[twjit->write_sb];
+ struct msgb *msg;
+ uint32_t msg_ts, ts_adv, quantum_adv;
+
+ while (sb->depth > sb->conf.bd_start) {
+ msg = msgb_dequeue(&sb->queue);
+ OSMO_ASSERT(msg);
+ msgb_free(msg);
+ OSMO_ASSERT(!llist_empty(&sb->queue));
+ msg = llist_entry(sb->queue.next, struct msgb, list);
+ msg_ts = msg->cb[0];
+ ts_adv = msg_ts - sb->head_ts;
+ quantum_adv = ts_adv / twjit->ts_quantum;
+ OSMO_ASSERT(sb->depth > quantum_adv);
+ sb->head_ts = msg_ts;
+ sb->depth -= quantum_adv;
+ }
+}
+
+void osmo_twjit_input(struct osmo_twjit *twjit, struct msgb *msg)
+{
+ bool got_previous_input = twjit->got_first_packet;
+ struct rtp_hdr *rtph;
+ uint32_t rx_ssrc, rx_ts;
+ uint16_t rx_seq;
+ struct timespec now;
+ enum input_decision id;
+
+ rtph = osmo_rtp_get_hdr(msg);
+ if (!rtph) {
+ twjit->stats.bad_packets++;
+ msgb_free(msg);
+ return;
+ }
+ rx_ssrc = ntohl(rtph->ssrc);
+ rx_ts = ntohl(rtph->timestamp);
+ rx_seq = ntohs(rtph->sequence);
+ osmo_clock_gettime(CLOCK_MONOTONIC, &now);
+ if (!got_previous_input) {
+ analytics_init(twjit, rx_ssrc, rx_seq);
+ twjit->got_first_packet = true;
+ } else if (rx_ssrc != twjit->rr_info.ssrc) {
+ twjit->stats.ssrc_changes++;
+ analytics_init(twjit, rx_ssrc, rx_seq);
+ } else
+ analytics_cont(twjit, rx_seq, rx_ts, &now);
+ twjit->last_seq = rx_seq;
+ twjit->last_ts = rx_ts;
+ memcpy(&twjit->last_arrival, &now, sizeof(struct timespec));
+ twjit->stats.rx_packets++;
+ msg->cb[0] = rx_ts;
+
+ switch (twjit->state) {
+ case TWJIT_STATE_EMPTY:
+ /* first packet into totally empty buffer */
+ if (got_previous_input)
+ twjit->stats.underruns++;
+ twjit->state = TWJIT_STATE_HUNT;
+ twjit->write_sb = 0;
+ init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts);
+ return;
+ case TWJIT_STATE_HUNT:
+ case TWJIT_STATE_HANDOVER:
+ id = check_input_for_subbuf(twjit, true, rx_ssrc, rx_ts);
+ if (id == INPUT_TOO_OLD) {
+ msgb_free(msg);
+ return;
+ }
+ if (id == INPUT_RESET) {
+ toss_write_queue(twjit);
+ init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts);
+ return;
+ }
+ insert_pkt_write_sb(twjit, msg, rx_ts);
+ trim_starting_sb(twjit);
+ return;
+ case TWJIT_STATE_FLOWING:
+ id = check_input_for_subbuf(twjit, false, rx_ssrc, rx_ts);
+ if (id == INPUT_TOO_OLD) {
+ twjit->stats.too_old++;
+ msgb_free(msg);
+ return;
+ }
+ if (id == INPUT_RESET) {
+ twjit->state = TWJIT_STATE_HANDOVER;
+ twjit->write_sb = !twjit->write_sb;
+ init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts);
+ twjit->stats.handovers_in++;
+ return;
+ }
+ insert_pkt_write_sb(twjit, msg, rx_ts);
+ return;
+ default:
+ OSMO_ASSERT(0);
+ }
+}
diff --git a/src/twjit_out.c b/src/twjit_out.c
new file mode 100644
index 0000000..94a2a39
--- /dev/null
+++ b/src/twjit_out.c
@@ -0,0 +1,122 @@
+/*
+ * Themyscira Wireless RTP jitter buffer implementation:
+ * output to the fixed timing system.
+ *
+ * This code was contributed to Osmocom Cellular Network Infrastructure
+ * project by Mother Mychaela N. Falconia of Themyscira Wireless.
+ * Mother Mychaela's contributions are NOT subject to copyright:
+ * no rights reserved, all rights relinquished.
+ */
+
+#include <stdint.h>
+#include <stdbool.h>
+#include <string.h>
+
+#include <osmocom/core/linuxlist.h>
+#include <osmocom/core/msgb.h>
+#include <osmocom/core/utils.h>
+
+#include <osmocom/netif/twjit.h>
+#include <osmocom/netif/twjit_private.h>
+
+static bool starting_sb_is_ready(struct osmo_twjit *twjit)
+{
+ struct twjit_subbuf *sb = &twjit->sb[twjit->write_sb];
+
+ if (sb->depth < sb->conf.bd_start)
+ return false;
+ if (sb->delta_ms < sb->conf.start_min_delta)
+ return false;
+ return true;
+}
+
+static bool read_sb_is_empty(struct osmo_twjit *twjit)
+{
+ struct twjit_subbuf *sb = &twjit->sb[twjit->read_sb];
+
+ return sb->depth == 0;
+}
+
+static struct msgb *pull_from_read_sb(struct osmo_twjit *twjit)
+{
+ struct twjit_subbuf *sb = &twjit->sb[twjit->read_sb];
+ struct msgb *msg;
+
+ OSMO_ASSERT(!llist_empty(&sb->queue));
+ OSMO_ASSERT(sb->depth > 0);
+ msg = llist_entry(sb->queue.next, struct msgb, list);
+ if (msg->cb[0] == sb->head_ts) {
+ llist_del(&msg->list);
+ twjit->stats.delivered_pkt++;
+ } else {
+ msg = NULL;
+ twjit->stats.output_gaps++;
+ }
+ sb->head_ts += twjit->ts_quantum;
+ sb->depth--;
+ return msg;
+}
+
+static void read_sb_thinning(struct osmo_twjit *twjit)
+{
+ struct twjit_subbuf *sb = &twjit->sb[twjit->read_sb];
+ struct msgb *msg;
+
+ if (sb->drop_int_count) {
+ sb->drop_int_count--;
+ return;
+ }
+ if (sb->depth <= sb->conf.bd_hiwat)
+ return;
+ twjit->stats.thinning_drops++;
+ msg = pull_from_read_sb(twjit);
+ if (msg)
+ msgb_free(msg);
+ sb->drop_int_count = sb->conf.thinning_int - 2;
+}
+
+static void toss_read_queue(struct osmo_twjit *twjit)
+{
+ struct twjit_subbuf *sb = &twjit->sb[twjit->read_sb];
+
+ msgb_queue_free(&sb->queue);
+ sb->depth = 0;
+}
+
+struct msgb *osmo_twjit_output(struct osmo_twjit *twjit)
+{
+ switch (twjit->state) {
+ case TWJIT_STATE_EMPTY:
+ return NULL;
+ case TWJIT_STATE_HUNT:
+ if (!starting_sb_is_ready(twjit))
+ return NULL;
+ twjit->state = TWJIT_STATE_FLOWING;
+ twjit->read_sb = twjit->write_sb;
+ return pull_from_read_sb(twjit);
+ case TWJIT_STATE_FLOWING:
+ if (read_sb_is_empty(twjit)) {
+ twjit->state = TWJIT_STATE_EMPTY;
+ return NULL;
+ }
+ read_sb_thinning(twjit);
+ return pull_from_read_sb(twjit);
+ case TWJIT_STATE_HANDOVER:
+ if (starting_sb_is_ready(twjit)) {
+ toss_read_queue(twjit);
+ twjit->stats.handovers_out++;
+ twjit->state = TWJIT_STATE_FLOWING;
+ twjit->read_sb = twjit->write_sb;
+ return pull_from_read_sb(twjit);
+ }
+ if (read_sb_is_empty(twjit)) {
+ twjit->state = TWJIT_STATE_HUNT;
+ twjit->stats.ho_underruns++;
+ return NULL;
+ }
+ read_sb_thinning(twjit);
+ return pull_from_read_sb(twjit);
+ default:
+ OSMO_ASSERT(0);
+ }
+}
diff --git a/src/twjit_vty.c b/src/twjit_vty.c
new file mode 100644
index 0000000..a16f9a1
--- /dev/null
+++ b/src/twjit_vty.c
@@ -0,0 +1,134 @@
+/*
+ * Themyscira Wireless RTP jitter buffer implementation: vty configuration.
+ *
+ * This code was contributed to Osmocom Cellular Network Infrastructure
+ * project by Mother Mychaela N. Falconia of Themyscira Wireless.
+ * Mother Mychaela's contributions are NOT subject to copyright:
+ * no rights reserved, all rights relinquished.
+ */
+
+#include <stdint.h>
+#include <stdbool.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <osmocom/core/utils.h>
+#include <osmocom/vty/vty.h>
+#include <osmocom/vty/command.h>
+
+#include <osmocom/netif/twjit.h>
+
+int osmo_twjit_config_write(struct vty *vty,
+ const struct osmo_twjit_config *conf,
+ const char *name, const char *prefix)
+{
+ vty_out(vty, "%s%s%s", prefix, name ? : "twjit", VTY_NEWLINE);
+ vty_out(vty, "%s buffer-depth %u %u%s", prefix, conf->bd_start,
+ conf->bd_hiwat, VTY_NEWLINE);
+ vty_out(vty, "%s thinning-interval %u%s", prefix, conf->thinning_int,
+ VTY_NEWLINE);
+ vty_out(vty, "%s max-future-sec %u%s", prefix, conf->max_future_sec,
+ VTY_NEWLINE);
+
+ if (conf->start_min_delta) {
+ vty_out(vty, "%s start-min-delta %u%s", prefix,
+ conf->start_min_delta, VTY_NEWLINE);
+ }
+ if (conf->start_max_delta) {
+ vty_out(vty, "%s start-max-delta %u%s", prefix,
+ conf->start_max_delta, VTY_NEWLINE);
+ }
+
+ return CMD_SUCCESS;
+}
+
+DEFUN(cfg_buffer_depth, cfg_buffer_depth_cmd,
+ "buffer-depth <1-65535> <1-65535>",
+ "Buffer depth configuration\n"
+ "Minimum fill required to start flow\n"
+ "High water mark fill level\n")
+{
+ struct osmo_twjit_config *conf = vty->index;
+ unsigned bd_start = atoi(argv[0]);
+ unsigned bd_hiwat = atoi(argv[1]);
+
+ if (bd_hiwat < bd_start) {
+ vty_out(vty, "%% Error: high water mark cannot be less than starting
level%s",
+ VTY_NEWLINE);
+ return CMD_WARNING;
+ }
+
+ conf->bd_start = bd_start;
+ conf->bd_hiwat = bd_hiwat;
+
+ return CMD_SUCCESS;
+}
+
+DEFUN(cfg_thinning, cfg_thinning_cmd,
+ "thinning-interval <2-65535>",
+ "Standing queue thinning configuration\n"
+ "Drop every Nth packet\n")
+{
+ struct osmo_twjit_config *conf = vty->index;
+ conf->thinning_int = atoi(argv[0]);
+ return CMD_SUCCESS;
+}
+
+DEFUN(cfg_max_future, cfg_max_future_cmd,
+ "max-future-sec <1-65535>",
+ "Guard against time traveler packets\n"
+ "Maximum permissible number of seconds into the future\n")
+{
+ struct osmo_twjit_config *conf = vty->index;
+ conf->max_future_sec = atoi(argv[0]);
+ return CMD_SUCCESS;
+}
+
+DEFUN(cfg_start_min_delta, cfg_start_min_delta_cmd,
+ "start-min-delta <1-65535>",
+ "Minimum required delta in time-of-arrival to start flow\n"
+ "Time delta value in ms\n")
+{
+ struct osmo_twjit_config *conf = vty->index;
+ conf->start_min_delta = atoi(argv[0]);
+ return CMD_SUCCESS;
+}
+
+DEFUN(cfg_no_start_min_delta, cfg_no_start_min_delta_cmd,
+ "no start-min-delta",
+ NO_STR "Minimum required delta in time-of-arrival to start flow\n")
+{
+ struct osmo_twjit_config *conf = vty->index;
+ conf->start_min_delta = 0;
+ return CMD_SUCCESS;
+}
+
+DEFUN(cfg_start_max_delta, cfg_start_max_delta_cmd,
+ "start-max-delta <1-65535>",
+ "Maximum permitted gap in time-of-arrival in starting state\n"
+ "Time delta value in ms\n")
+{
+ struct osmo_twjit_config *conf = vty->index;
+ conf->start_max_delta = atoi(argv[0]);
+ return CMD_SUCCESS;
+}
+
+DEFUN(cfg_no_start_max_delta, cfg_no_start_max_delta_cmd,
+ "no start-max-delta",
+ NO_STR "Maximum permitted gap in time-of-arrival in starting state\n")
+{
+ struct osmo_twjit_config *conf = vty->index;
+ conf->start_max_delta = 0;
+ return CMD_SUCCESS;
+}
+
+void osmo_twjit_vty_init(int twjit_node)
+{
+ install_lib_element(twjit_node, &cfg_buffer_depth_cmd);
+ install_lib_element(twjit_node, &cfg_thinning_cmd);
+ install_lib_element(twjit_node, &cfg_max_future_cmd);
+ install_lib_element(twjit_node, &cfg_start_min_delta_cmd);
+ install_lib_element(twjit_node, &cfg_no_start_min_delta_cmd);
+ install_lib_element(twjit_node, &cfg_start_max_delta_cmd);
+ install_lib_element(twjit_node, &cfg_no_start_max_delta_cmd);
+}
--
To view, visit
https://gerrit.osmocom.org/c/libosmo-netif/+/39280?usp=email
To unsubscribe, or for help writing mail filters, visit
https://gerrit.osmocom.org/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: libosmo-netif
Gerrit-Branch: master
Gerrit-Change-Id: Ia3be5834571ca18b68939abbcf1ce3a879156658
Gerrit-Change-Number: 39280
Gerrit-PatchSet: 1
Gerrit-Owner: falconia <falcon(a)freecalypso.org>