pespin has uploaded this change for review.

View Change

Introduce osmo-udp-responder

Related: SYS#7096
Change-Id: I9edc2e3e1e41767673bcf96f1fe97fa4bf6d60f7
---
M configure.ac
M src/Makefile.am
A src/osmo-udp-responder/Makefile.am
A src/osmo-udp-responder/udp_responder.c
4 files changed, 823 insertions(+), 0 deletions(-)

git pull ssh://gerrit.osmocom.org:29418/upf-benchmark refs/changes/28/38328/1
diff --git a/configure.ac b/configure.ac
index a10b07c..4bbc5f6 100644
--- a/configure.ac
+++ b/configure.ac
@@ -166,6 +166,7 @@
include/osmocom/pfcptool/Makefile
src/Makefile
src/osmo-pfcp-tool/Makefile
+ src/osmo-udp-responder/Makefile
doc/Makefile
doc/manuals/Makefile
Makefile)
diff --git a/src/Makefile.am b/src/Makefile.am
index fecf898..731fc5e 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1,3 +1,4 @@
SUBDIRS = \
osmo-pfcp-tool \
+ osmo-udp-responder \
$(NULL)
diff --git a/src/osmo-udp-responder/Makefile.am b/src/osmo-udp-responder/Makefile.am
new file mode 100644
index 0000000..329c5f2
--- /dev/null
+++ b/src/osmo-udp-responder/Makefile.am
@@ -0,0 +1,25 @@
+AM_CPPFLAGS = \
+ $(all_includes) \
+ -I$(top_srcdir)/include \
+ -I$(top_builddir)/include \
+ -I$(top_builddir) \
+ $(NULL)
+
+AM_CFLAGS = \
+ -Wall \
+ $(LIBOSMOCORE_CFLAGS) \
+ $(LIBURING_CFLAGS) \
+ $(NULL)
+
+AM_LDFLAGS = \
+ $(LIBOSMOCORE_LIBS) \
+ $(LIBURING_LIBS) \
+ $(NULL)
+
+bin_PROGRAMS = \
+ osmo-udp-responder \
+ $(NULL)
+
+osmo_udp_responder_SOURCES = \
+ udp_responder.c \
+ $(NULL)
diff --git a/src/osmo-udp-responder/udp_responder.c b/src/osmo-udp-responder/udp_responder.c
new file mode 100644
index 0000000..5ffe707
--- /dev/null
+++ b/src/osmo-udp-responder/udp_responder.c
@@ -0,0 +1,796 @@
+/* UDP responder: listen on a UDP port, and respond to each received UDP packet back to the sender. */
+/*
+ * (C) 2024 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
+ * All Rights Reserved.
+ *
+ * Author: Neels Janosch Hofmeyr <nhofmeyr@sysmocom.de>
+ * Author: Pau Espin Pedrol <pespin@sysmocom.de>
+ *
+ * SPDX-License-Identifier: GPL-2.0+
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+#include "config.h"
+
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+
+#include <osmocom/core/application.h>
+#include <osmocom/core/logging.h>
+#include <osmocom/core/timer.h>
+
+#if HAVE_URING
+
+#include <getopt.h>
+#include <string.h>
+#include <limits.h>
+#include <unistd.h>
+#include <liburing.h>
+#include <pthread.h>
+#include <sched.h>
+#include <netinet/in.h>
+
+#include <osmocom/core/utils.h>
+#include <osmocom/core/socket.h>
+#include <osmocom/core/sockaddr_str.h>
+
+#include <osmocom/pfcptool/gtp_flood.h>
+
+static volatile bool started;
+static struct timespec ts_start;
+
+struct cmdline_cmd {
+ const char *short_option;
+ const char *long_option;
+ const char *arg_name;
+ const char *doc;
+
+ const char *value;
+};
+
+#define cmdline_foreach(ITER, CMDS) \
+ for (const struct cmdline_cmd *ITER = (CMDS); \
+ ITER->short_option || ITER->long_option || ITER->arg_name; \
+ ITER++)
+
+int cmdline_doc_str_buf(char *buf, size_t buflen, const struct cmdline_cmd *cmds)
+{
+ struct osmo_strbuf sb = { .buf = buf, .len = buflen };
+ /* First find the longest options part */
+ int w = 0;
+ cmdline_foreach(cmd, cmds) {
+ int cmd_w = 0;
+ if (cmd->short_option)
+ cmd_w += 2 + strlen(cmd->short_option);
+ if (cmd->long_option)
+ cmd_w += 3 + strlen(cmd->long_option);
+ if (cmd->arg_name)
+ cmd_w += 1 + strlen(cmd->arg_name);
+ w = OSMO_MAX(w, cmd_w);
+ }
+ /* vertical gap */
+ w += 2;
+
+ OSMO_STRBUF_PRINTF(sb, "Options:\n");
+ cmdline_foreach(cmd, cmds) {
+ char *line_start = sb.pos;
+ if (cmd->short_option)
+ OSMO_STRBUF_PRINTF(sb, " -%s", cmd->short_option);
+ if (cmd->long_option)
+ OSMO_STRBUF_PRINTF(sb, " --%s", cmd->long_option);
+ if (cmd->arg_name)
+ OSMO_STRBUF_PRINTF(sb, " %s", cmd->arg_name);
+ if (cmd->doc) {
+ int have = sb.pos - line_start;
+ int spaces = OSMO_MAX(1, w - have);
+ OSMO_STRBUF_PRINTF(sb, "%*s", spaces, "");
+ OSMO_STRBUF_PRINTF(sb, "%s", cmd->doc);
+ }
+ OSMO_STRBUF_PRINTF(sb, "\n");
+ }
+ return sb.chars_needed;
+}
+
+void cmdline_print_help(const struct cmdline_cmd *cmds)
+{
+ char buf[8192];
+ cmdline_doc_str_buf(buf, sizeof(buf), cmds);
+ printf("%s", buf);
+}
+
+void cmdline_cmd_store_optarg(struct cmdline_cmd *cmd)
+{
+ if (cmd->arg_name)
+ cmd->value = optarg;
+ else
+ cmd->value = (cmd->short_option ? : cmd->long_option);
+}
+
+int cmdline_read(struct cmdline_cmd *cmds, int argc, char **argv)
+{
+ char short_options[256] = {};
+ struct option long_options[128] = {};
+ int long_options_i = 0;
+ int long_option_val = 0;
+ struct osmo_strbuf short_sb = { .buf = short_options, .len = sizeof(short_options) };
+
+ cmdline_foreach(cmd, cmds) {
+ if (cmd->short_option) {
+ OSMO_STRBUF_PRINTF(short_sb, "%s", cmd->short_option);
+ if (cmd->arg_name)
+ OSMO_STRBUF_PRINTF(short_sb, ":");
+ }
+ if (cmd->long_option) {
+ long_options[long_options_i] = (struct option){
+ cmd->long_option,
+ cmd->arg_name ? 1 : 0,
+ &long_option_val,
+ long_options_i,
+ };
+ long_options_i++;
+ }
+ }
+
+ while (1) {
+ int option_index = 0;
+ char c = getopt_long(argc, argv, short_options, long_options, &option_index);
+ if (c == -1)
+ break;
+ if (c == 0) {
+ struct cmdline_cmd *long_cmd = &cmds[long_option_val];
+ cmdline_cmd_store_optarg(long_cmd);
+ } else {
+ bool found = false;
+ cmdline_foreach(cc, cmds) {
+ if (strchr(cc->short_option, c)) {
+ cmdline_cmd_store_optarg((struct cmdline_cmd *)cc);
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ fprintf(stderr, "%s: Error in command line options. Exiting.\n", argv[0]);
+ return -1;
+ }
+ }
+ }
+
+ /* positional args */
+ cmdline_foreach(cmd, cmds) {
+ if (optind >= argc)
+ break;
+ if (cmd->short_option || cmd->long_option)
+ continue;
+ if (!cmd->arg_name)
+ continue;
+ ((struct cmdline_cmd *)cmd)->value = argv[optind];
+ optind++;
+ }
+
+ if (optind < argc) {
+ cmdline_print_help(cmds);
+ fprintf(stderr, "%s: Unsupported positional argument on command line\n", argv[optind]);
+ return -1;
+ }
+ return 0;
+}
+
+const char *cmdline_get(const struct cmdline_cmd *cmds, const char *option_name, const char *default_val)
+{
+ cmdline_foreach(cmd, cmds) {
+ if (cmd->long_option && !strcmp(cmd->long_option, option_name))
+ return cmd->value;
+ if (cmd->short_option && !strcmp(cmd->short_option, option_name))
+ return cmd->value;
+ if (cmd->arg_name && !strcmp(cmd->arg_name, option_name))
+ return cmd->value;
+ }
+ return default_val;
+}
+
+bool cmdline_get_int(int *dst, int minval, int maxval, int default_val,
+ const struct cmdline_cmd *cmds, const char *option_name)
+{
+ const char *str = cmdline_get(cmds, option_name, NULL);
+ if (!str) {
+ *dst = default_val;
+ return true;
+ }
+ if (osmo_str_to_int(dst, str, 10, minval, maxval)) {
+ cmdline_print_help(cmds);
+ printf("ERROR: invalid integer number: %s\n", str);
+ return false;
+ }
+ if (*dst < minval || *dst > maxval) {
+ cmdline_print_help(cmds);
+ printf("ERROR: number out of range: %d <= %d <= %d\n", minval, *dst, maxval);
+ return false;
+ }
+ return true;
+}
+
+/* returns number of configured CPUs in the system, or negative otherwise */
+static int get_num_cpus(void)
+{
+ static unsigned int num_cpus = 0;
+ long ln;
+
+ if (num_cpus)
+ return num_cpus;
+
+ /* This is expensive (goes across /sys, so let's do it only once. It is
+ * guaranteed it won't change during process span anyway). */
+ ln = sysconf(_SC_NPROCESSORS_CONF);
+ if (ln < 0) {
+ fprintf(stderr, "sysconf(_SC_NPROCESSORS_CONF) failed: %s\n", strerror(errno));
+ return -1;
+ }
+ num_cpus = (unsigned int) ln;
+ return num_cpus;
+}
+
+struct udp_port {
+ struct llist_head entry;
+
+ /* IP address and UDP port from user input */
+ struct osmo_sockaddr osa;
+
+ /* locally bound socket */
+ int fd;
+};
+
+enum data_io_type {
+ IO_UNUSED = 0,
+ IO_RECV,
+ IO_SEND,
+};
+
+struct data_io {
+ enum data_io_type type;
+ struct osmo_sockaddr osa;
+ struct iovec iov;
+ struct msghdr msgh;
+ uint8_t *data;
+ size_t data_size;
+ uint8_t *control;
+ size_t control_size;
+ int n;
+};
+
+struct io_queue {
+ size_t d_size;
+ struct data_io d[0];
+};
+
+struct counter {
+ uint64_t count;
+ uint64_t last;
+};
+
+struct traffic_counter {
+ struct counter packets;
+ struct counter bytes;
+};
+
+uint64_t counter_get(struct counter *c)
+{
+ uint64_t cnt = c->count;
+ uint64_t val = cnt - c->last;
+ c->last = cnt;
+ return val;
+}
+
+struct traffic_counter g_rx = {};
+struct traffic_counter g_tx = {};
+
+struct worker {
+ int id;
+ struct io_queue *q;
+ struct io_uring ring;
+ pthread_t worker;
+ struct {
+ unsigned long long rx_packets;
+ unsigned long long rx_bytes;
+ unsigned long long tx_packets;
+ unsigned long long tx_bytes;
+ } ctr;
+};
+
+struct {
+ int port_nr;
+ const char *local_addr;
+ int queue_size;
+ int buf_size;
+ int control_size;
+ int response_size;
+ int response_n;
+ int workers_n;
+ struct udp_port port;
+ bool cpu_affinity;
+} cfg = {};
+
+static void data_io_prepare(struct worker *w, struct data_io *d)
+{
+ void *data_buf = talloc_size(w->q, cfg.buf_size);
+ void *control_buf = talloc_size(w->q, cfg.control_size);
+ *d = (struct data_io){
+ .type = IO_RECV,
+ .iov = {
+ .iov_base = data_buf,
+ .iov_len = cfg.buf_size,
+ },
+ .msgh = {
+ .msg_name = &d->osa,
+ .msg_namelen = sizeof(d->osa),
+ .msg_iov = &d->iov,
+ .msg_iovlen = 1,
+ .msg_control = control_buf,
+ .msg_controllen = cfg.control_size,
+ },
+ .data = data_buf,
+ .data_size = cfg.buf_size,
+ .control = control_buf,
+ .control_size = cfg.control_size,
+ };
+}
+
+static void data_io_prep_recv(struct io_uring *ring, struct udp_port *port, struct data_io *d)
+{
+ struct io_uring_sqe *sqe;
+ d->type = IO_RECV;
+ d->iov.iov_len = d->data_size;
+ d->msgh.msg_controllen = d->control_size;
+ sqe = io_uring_get_sqe(ring);
+ OSMO_ASSERT(sqe);
+ io_uring_prep_recvmsg(sqe, port->fd, &d->msgh, 0);
+ io_uring_sqe_set_data(sqe, d);
+}
+
+static void data_io_prep_send(struct io_uring *ring, struct udp_port *port, struct data_io *d)
+{
+ struct io_uring_sqe *sqe;
+ d->type = IO_SEND;
+ sqe = io_uring_get_sqe(ring);
+ OSMO_ASSERT(sqe);
+ io_uring_prep_sendmsg(sqe, port->fd, &d->msgh, 0);
+ io_uring_sqe_set_data(sqe, d);
+}
+
+static bool get_payload_info(struct gtp_flood_payload_info *dst, struct data_io *d)
+{
+ uint8_t *pi;
+ uint8_t *len;
+ size_t copy_len;
+
+ len = d->iov.iov_base + d->iov.iov_len - 1;
+ if ((*len) > d->iov.iov_len)
+ return false;
+ pi = len - (*len);
+ if (strncmp((void *)pi, "info", 4))
+ return false;
+ copy_len = OSMO_MIN(sizeof(*dst), *len);
+ *dst = (struct gtp_flood_payload_info){};
+ memcpy((void *)dst, pi, copy_len);
+ return true;
+}
+
+static void data_io_handle_completion(struct worker *w, struct udp_port *port, struct io_uring_cqe *cqe,
+ int response_size, int response_n)
+{
+ struct data_io *d;
+ struct osmo_sockaddr *osa = NULL;
+ int rc;
+ struct gtp_flood_payload_info pi;
+ struct io_uring *ring = &w->ring;
+
+ d = io_uring_cqe_get_data(cqe);
+
+ osa = &d->osa;
+ rc = cqe->res;
+
+ switch (d->type) {
+ case IO_RECV:
+ /* done reading */
+ if (OSMO_UNLIKELY(!started)) {
+ started = true;
+ OSMO_ASSERT(clock_gettime(CLOCK_MONOTONIC, &ts_start) == 0);
+ }
+ if (rc <= 0) {
+ LOGP(DLGLOBAL, LOGL_ERROR, "%s -> rx error rc=%d flags=0x%x\n",
+ osa ? osmo_sockaddr_to_str(osa) : "NULL",
+ rc, cqe->flags);
+ return;
+ }
+#if 0
+ LOGP(DLGLOBAL, LOGL_DEBUG, "%s -> rx rc=%d flags=0x%x: %s\n",
+ osa ? osmo_sockaddr_to_str(osa) : "NULL",
+ rc, cqe->flags,
+ osmo_quote_str(d->iov.iov_base, rc));
+#endif
+ io_uring_cqe_seen(ring, cqe);
+ w->ctr.rx_packets++;
+ w->ctr.rx_bytes += rc;
+
+ if (response_n < 1) {
+ data_io_prep_recv(ring, port, d);
+ break;
+ }
+
+ d->iov.iov_len = rc;
+ if (get_payload_info(&pi, d)) {
+ /* set the return TEID */
+ struct gtp1u_hdr *gtp_hdr = (void *)d->iov.iov_base;
+ gtp_hdr->tei = pi.return_teid;
+ }
+
+ /* resubmit back to sender */
+
+ /* adjust size? */
+ if (response_size > 0)
+ d->iov.iov_len = response_size;
+
+ data_io_prep_send(ring, port, d);
+ break;
+
+ case IO_SEND:
+ /* done writing. */
+ if (rc <= 0) {
+ LOGP(DLGLOBAL, LOGL_ERROR, "%s -> tx error rc=%d flags=0x%x\n",
+ osa ? osmo_sockaddr_to_str(osa) : "NULL",
+ rc, cqe->flags);
+ return;
+ }
+#if 0
+ LOGP(DLGLOBAL, LOGL_DEBUG, "%s <- tx rc=%d flags=0x%x: %s\n",
+ osa ? osmo_sockaddr_to_str(osa) : "NULL",
+ rc, cqe->flags,
+ osmo_quote_str(d->iov.iov_base, rc));
+#endif
+ io_uring_cqe_seen(ring, cqe);
+ w->ctr.tx_packets++;
+ w->ctr.tx_bytes += rc;
+
+ d->n++;
+
+ /* Send again? If not, re-submit open slot for reading. */
+ if (d->n < response_n)
+ data_io_prep_send(ring, port, d);
+ else
+ data_io_prep_recv(ring, port, d);
+ break;
+
+ default:
+ OSMO_ASSERT(0);
+ }
+}
+
+struct cmdline_cmd cmds[] = {
+ {
+ .short_option = "h",
+ .long_option = "help",
+ .doc = "Show this help",
+ },
+ {
+ .short_option = "l",
+ .long_option = "local-addr",
+ .arg_name = "IP-ADDR",
+ .doc = "Listen on local IP address (default is 0.0.0.0).",
+ },
+ {
+ .short_option = "p",
+ .long_option = "port",
+ .arg_name = "UDP-PORT",
+ .doc = "Listen on local UDP port.",
+ },
+ /*
+ {
+ .short_option = "P",
+ .long_option = "port-range-to",
+ .arg_name = "UDP-PORT-TO",
+ .doc = "Listen on a range of ports, from --port to --port-range-to, inclusive.",
+ },
+ */
+ {
+ .short_option = "s",
+ .long_option = "response-size",
+ .arg_name = "BYTES",
+ .doc = "When responding, enlarge or shorten the payload to this size.",
+ },
+ {
+ .short_option = "n",
+ .long_option = "response-repeat",
+ .arg_name = "N",
+ .doc = "Respond N times, i.e. multiply the returned traffic.",
+ },
+ {
+ .short_option = "a",
+ .long_option = "cpu-affinity",
+ .doc = "Pin each Nth worker to a Nth cpu.",
+ },
+ {
+ .long_option = "io-uring-queue",
+ .arg_name = "SIZE",
+ .doc = "I/O tuning: queue size to use for io_uring, default is 4000.",
+ },
+ {
+ .long_option = "io-uring-buf",
+ .arg_name = "SIZE",
+ .doc = "I/O tuning: maximum payload size, default is 1452.",
+ },
+ {
+ .long_option = "workers",
+ .arg_name = "N",
+ .doc = "Number of rx threads to run",
+ },
+ {}
+};
+
+static const struct log_info_cat categories[] = {
+};
+
+const struct log_info udp_responder_log_info = {
+ .cat = categories,
+ .num_cat = ARRAY_SIZE(categories),
+};
+
+static void start_rx_worker(struct worker *w);
+
+int main(int argc, char **argv)
+{
+ struct osmo_sockaddr_str addr = {};
+ struct osmo_sockaddr osa = {};
+ int val;
+ int rc;
+
+ osmo_init_logging2(OTC_GLOBAL, &udp_responder_log_info);
+ log_set_log_level(osmo_stderr_target, LOGL_ERROR);
+
+ if (cmdline_read(cmds, argc, argv)
+ || cmdline_get(cmds, "help", NULL)) {
+ cmdline_print_help(cmds);
+ return -1;
+ }
+
+ if (!cmdline_get_int(&cfg.port_nr, 1, 65535, 23000, cmds, "port"))
+ return -1;
+
+ cfg.local_addr = cmdline_get(cmds, "local-addr", "0.0.0.0");
+ if (osmo_sockaddr_str_from_str(&addr, cfg.local_addr, cfg.port_nr)
+ || osmo_sockaddr_str_to_osa(&addr, &osa)) {
+ printf("ERROR: invalid interface or port number: %s:%d\n", cfg.local_addr, cfg.port_nr);
+ return -1;
+ }
+
+ if (!cmdline_get_int(&cfg.queue_size, 1, 65535, 4000, cmds, "io-uring-queue"))
+ return -1;
+ if (!cmdline_get_int(&cfg.buf_size, 1, 65535, 1452, cmds, "io-uring-buf"))
+ return -1;
+
+ if (!cmdline_get_int(&cfg.response_size, 0, cfg.buf_size, 0, cmds, "response-size"))
+ return -1;
+
+ if (!cmdline_get_int(&cfg.response_n, 0, INT_MAX, 1, cmds, "response-repeat"))
+ return -1;
+
+ if (!cmdline_get_int(&cfg.workers_n, 1, INT_MAX, 4, cmds, "workers"))
+ return -1;
+
+ if (cmdline_get(cmds, "cpu-affinity", NULL))
+ cfg.cpu_affinity = true;
+
+ cfg.port.osa = osa;
+ cfg.control_size = 1024;
+
+ /* create and bind socket */
+ rc = osmo_sock_init_osa(SOCK_DGRAM, IPPROTO_UDP, &cfg.port.osa, NULL, OSMO_SOCK_F_BIND);
+ /* (logging of errors already happens in osmo_sock_init_osa() */
+ if (rc < 0)
+ return -1;
+ cfg.port.fd = rc;
+ LOGP(DLGLOBAL, LOGL_NOTICE, "bound UDP %s fd=%d\n", osmo_sock_get_name2(cfg.port.fd), cfg.port.fd);
+
+ /* Set Don't Fragment (DF) bit on IP packets transmitted by socket: */
+ val = IP_PMTUDISC_DO;
+ rc = setsockopt(cfg.port.fd, IPPROTO_IP, IP_MTU_DISCOVER, &val, sizeof(val));
+ if (rc == -1) {
+ fprintf(stderr, "ERROR: setsockopt(IPPROTO_IP, IP_DONTFRAG) failed errno=%d\n", errno);
+ return -1;
+ }
+
+ val = 1;
+ rc = setsockopt(cfg.port.fd, IPPROTO_IP, IP_PKTINFO, &val, sizeof(val));
+ /* TODO: IPv6: setsockopt(s, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one)); */
+ if (rc == -1) {
+ fprintf(stderr, "ERROR: setsockopt(IPPROTO_IP, IP_PKTINFO) failed errno=%d\n", errno);
+ return -1;
+ }
+
+ struct worker *workers = talloc_zero_array(OTC_GLOBAL, struct worker, cfg.workers_n);
+ for (int i = 0; i < cfg.workers_n; i++) {
+ printf("Starting worker %d\n", i);
+ workers[i].id = i;
+ start_rx_worker(&workers[i]);
+ }
+
+ /* periodically log rx stats */
+ while (1) {
+ static struct timespec last_info_log = {.tv_sec = 0, .tv_nsec = 0};
+ struct timespec ts_now;
+
+ if (OSMO_UNLIKELY(!started))
+ continue;
+
+ clock_gettime(CLOCK_MONOTONIC, &ts_now);
+ /* the resolution is in seconds, output stats once per second. */
+ if (OSMO_UNLIKELY(ts_now.tv_sec != last_info_log.tv_sec)) {
+ struct timespec ts_elapsed, ts_diff;
+ unsigned long long elapsed_usec, diff_usec;
+ uint64_t diff_rx_packets, diff_rx_bytes, diff_tx_packets, diff_tx_bytes;
+ uint64_t elapsed_rx_packets = 0, elapsed_rx_bytes = 0, elapsed_tx_packets = 0, elapsed_tx_bytes = 0;
+
+ timespecsub(&ts_now, &ts_start, &ts_elapsed);
+ timespecsub(&ts_now, &last_info_log, &ts_diff);
+ elapsed_usec = (ts_elapsed.tv_sec * 1000 * 1000) + (ts_elapsed.tv_nsec / 1000);
+ diff_usec = (ts_diff.tv_sec * 1000 * 1000) + (ts_diff.tv_nsec / 1000);
+ if (elapsed_usec == 0)
+ elapsed_usec = 1;
+ if (diff_usec == 0)
+ diff_usec = 1;
+ last_info_log = ts_now;
+
+ for (int i = 0; i < cfg.workers_n; i++) {
+ elapsed_rx_packets += workers[i].ctr.rx_packets;
+ elapsed_rx_bytes += workers[i].ctr.rx_bytes;
+ elapsed_tx_packets += workers[i].ctr.tx_packets;
+ elapsed_tx_bytes += workers[i].ctr.tx_bytes;
+ }
+ g_rx.packets.count = elapsed_rx_packets;
+ g_rx.bytes.count = elapsed_rx_bytes;
+ g_tx.packets.count = elapsed_tx_packets;
+ g_tx.bytes.count = elapsed_tx_bytes;
+
+ diff_rx_packets = counter_get(&g_rx.packets);
+ diff_rx_bytes = counter_get(&g_rx.bytes);
+ diff_tx_packets = counter_get(&g_tx.packets);
+ diff_tx_bytes = counter_get(&g_tx.bytes);
+
+ if (diff_rx_packets || diff_tx_packets) {
+ printf("DIFF: %12llu Rx: %8"PRIu64" packets (%6llu kPPS, %8llu Mbps) | Tx: %8"PRIu64" packets (%6llu kPPS, %8llu Mbps)\n",
+ diff_usec,
+ diff_rx_packets,
+ diff_rx_packets * 1000 / diff_usec,
+ diff_rx_bytes * 8 / diff_usec,
+ diff_tx_packets,
+ diff_tx_packets * 1000 / diff_usec,
+ diff_tx_bytes * 8 / diff_usec);
+ printf("TOTAL: %12llu Rx: %8"PRIu64" packets (%6llu kPPS, %8llu Mbps) | Tx: %8"PRIu64" packets (%6llu kPPS, %8llu Mbps)\n",
+ elapsed_usec,
+ elapsed_rx_packets,
+ elapsed_rx_packets * 1000 / elapsed_usec,
+ elapsed_rx_bytes * 8 / elapsed_usec,
+ elapsed_tx_packets,
+ elapsed_tx_packets * 1000 / elapsed_usec,
+ elapsed_tx_bytes * 8 / elapsed_usec);
+ fflush(stdout);
+ }
+ }
+ int rc = usleep(500 * 1000);
+ if (rc == -1)
+ fprintf(stderr, "ERROR: usleep() failed errno=%d\n", errno);
+ }
+}
+
+static void *rx_worker_func(void *_worker);
+
+static void start_rx_worker(struct worker *w)
+{
+ w->q = talloc_zero_size(OTC_GLOBAL, sizeof(struct io_queue) + cfg.queue_size * sizeof(struct data_io));
+ OSMO_ASSERT(w->q);
+ *w->q = (struct io_queue){
+ .d_size = cfg.queue_size,
+ };
+ for (int i = 0; i < w->q->d_size; i++) {
+ struct data_io *d = &w->q->d[i];
+ data_io_prepare(w, d);
+ }
+ int rc = pthread_create(&w->worker, NULL, rx_worker_func, w);
+ OSMO_ASSERT(rc >= 0);
+}
+
+static void *rx_worker_func(void *_worker)
+{
+ struct worker *w = _worker;
+ struct io_queue *q = w->q;
+ char thread_name[32];
+ pthread_t phtread_id = pthread_self();
+
+ if (cfg.cpu_affinity) {
+ cpu_set_t *cpuset;
+ size_t cpuset_size;
+ int num_cpus = get_num_cpus();
+ int core_id = w->id % num_cpus;
+
+ cpuset = CPU_ALLOC(num_cpus);
+ cpuset_size = CPU_ALLOC_SIZE(num_cpus);
+ CPU_ZERO_S(cpuset_size, cpuset);
+ CPU_SET_S(core_id, cpuset_size, cpuset);
+
+ fprintf(stderr, "Pinning worker %d to CPU %d\n", w->id, core_id);
+ int rc = pthread_setaffinity_np(pthread_self(), cpuset_size, cpuset) != 0;
+ if (rc != 0) {
+ fprintf(stderr, "ERROR: Pinning worker %d to CPU %d: error=%d\n", w->id, core_id, rc);
+ exit(1);
+ }
+ CPU_FREE(cpuset);
+ }
+ snprintf(thread_name, sizeof(thread_name), "UdpRspWrk%u", w->id);
+ if (pthread_setname_np(phtread_id, thread_name) != 0) {
+ char buf[256];
+ int err = errno;
+ char *err_str = strerror_r(err, buf, sizeof(buf));
+ fprintf(stderr, "worker %u: failed setting thread name: %s\n",
+ w->id, err_str);
+ }
+
+ int rc = io_uring_queue_init(q->d_size, &w->ring, 0);
+ OSMO_ASSERT(rc >= 0);
+ unsigned int vals[2] = {1, 1};
+ rc = io_uring_register_iowq_max_workers(&w->ring, &vals[0]);
+ OSMO_ASSERT(rc == 0);
+
+ for (int i = 0; i < q->d_size; i++) {
+ struct data_io *d = &q->d[i];
+ /* fill once with random printable data */
+ for (int j = 0; j < d->data_size; j++)
+ d->data[j] = 32 + random() % (126 - 32 + 1);
+ }
+
+ /* fill the queue to start receiving */
+ for (int i = 0; i < q->d_size; i++)
+ data_io_prep_recv(&w->ring, &cfg.port, &q->d[i]);
+
+ struct __kernel_timespec ts_zero = {};
+ struct __kernel_timespec ts_1s = { .tv_sec = 1 };
+
+ while (1) {
+ uint32_t submitted;
+ uint32_t completed = 0;
+ struct io_uring_cqe *cqe;
+
+ /* submit any requests from previous loop */
+ submitted = io_uring_submit(&w->ring);
+
+ /* process all pending completions */
+ while (io_uring_wait_cqe_timeout(&w->ring, &cqe, &ts_zero) == 0) {
+ data_io_handle_completion(w, &cfg.port, cqe, cfg.response_size, cfg.response_n);
+ completed++;
+ }
+
+ /* Wait a bit longer */
+ if (!submitted && !completed) {
+ if (io_uring_wait_cqe_timeout(&w->ring, &cqe, &ts_1s) == 0) {
+ data_io_handle_completion(w, &cfg.port, cqe, cfg.response_size, cfg.response_n);
+ completed++;
+ }
+ }
+ }
+
+ talloc_free(q);
+ return 0;
+}
+
+#endif /* HAVE_URING */
+

To view, visit change 38328. To unsubscribe, or for help writing mail filters, visit settings.

Gerrit-MessageType: newchange
Gerrit-Project: upf-benchmark
Gerrit-Branch: master
Gerrit-Change-Id: I9edc2e3e1e41767673bcf96f1fe97fa4bf6d60f7
Gerrit-Change-Number: 38328
Gerrit-PatchSet: 1
Gerrit-Owner: pespin <pespin@sysmocom.de>