pespin has uploaded this change for review. ( https://gerrit.osmocom.org/c/upf-benchmark/+/38328?usp=email )
Change subject: Introduce osmo-udp-responder ......................................................................
Introduce osmo-udp-responder
Related: SYS#7096 Change-Id: I9edc2e3e1e41767673bcf96f1fe97fa4bf6d60f7 --- M configure.ac M src/Makefile.am A src/osmo-udp-responder/Makefile.am A src/osmo-udp-responder/udp_responder.c 4 files changed, 823 insertions(+), 0 deletions(-)
git pull ssh://gerrit.osmocom.org:29418/upf-benchmark refs/changes/28/38328/1
diff --git a/configure.ac b/configure.ac index a10b07c..4bbc5f6 100644 --- a/configure.ac +++ b/configure.ac @@ -166,6 +166,7 @@ include/osmocom/pfcptool/Makefile src/Makefile src/osmo-pfcp-tool/Makefile + src/osmo-udp-responder/Makefile doc/Makefile doc/manuals/Makefile Makefile) diff --git a/src/Makefile.am b/src/Makefile.am index fecf898..731fc5e 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1,3 +1,4 @@ SUBDIRS = \ osmo-pfcp-tool \ + osmo-udp-responder \ $(NULL) diff --git a/src/osmo-udp-responder/Makefile.am b/src/osmo-udp-responder/Makefile.am new file mode 100644 index 0000000..329c5f2 --- /dev/null +++ b/src/osmo-udp-responder/Makefile.am @@ -0,0 +1,25 @@ +AM_CPPFLAGS = \ + $(all_includes) \ + -I$(top_srcdir)/include \ + -I$(top_builddir)/include \ + -I$(top_builddir) \ + $(NULL) + +AM_CFLAGS = \ + -Wall \ + $(LIBOSMOCORE_CFLAGS) \ + $(LIBURING_CFLAGS) \ + $(NULL) + +AM_LDFLAGS = \ + $(LIBOSMOCORE_LIBS) \ + $(LIBURING_LIBS) \ + $(NULL) + +bin_PROGRAMS = \ + osmo-udp-responder \ + $(NULL) + +osmo_udp_responder_SOURCES = \ + udp_responder.c \ + $(NULL) diff --git a/src/osmo-udp-responder/udp_responder.c b/src/osmo-udp-responder/udp_responder.c new file mode 100644 index 0000000..5ffe707 --- /dev/null +++ b/src/osmo-udp-responder/udp_responder.c @@ -0,0 +1,796 @@ +/* UDP responder: listen on a UDP port, and respond to each received UDP packet back to the sender. */ +/* + * (C) 2024 by sysmocom - s.f.m.c. GmbH info@sysmocom.de + * All Rights Reserved. + * + * Author: Neels Janosch Hofmeyr nhofmeyr@sysmocom.de + * Author: Pau Espin Pedrol pespin@sysmocom.de + * + * SPDX-License-Identifier: GPL-2.0+ + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + * + */ +#include "config.h" + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif + +#include <osmocom/core/application.h> +#include <osmocom/core/logging.h> +#include <osmocom/core/timer.h> + +#if HAVE_URING + +#include <getopt.h> +#include <string.h> +#include <limits.h> +#include <unistd.h> +#include <liburing.h> +#include <pthread.h> +#include <sched.h> +#include <netinet/in.h> + +#include <osmocom/core/utils.h> +#include <osmocom/core/socket.h> +#include <osmocom/core/sockaddr_str.h> + +#include <osmocom/pfcptool/gtp_flood.h> + +static volatile bool started; +static struct timespec ts_start; + +struct cmdline_cmd { + const char *short_option; + const char *long_option; + const char *arg_name; + const char *doc; + + const char *value; +}; + +#define cmdline_foreach(ITER, CMDS) \ + for (const struct cmdline_cmd *ITER = (CMDS); \ + ITER->short_option || ITER->long_option || ITER->arg_name; \ + ITER++) + +int cmdline_doc_str_buf(char *buf, size_t buflen, const struct cmdline_cmd *cmds) +{ + struct osmo_strbuf sb = { .buf = buf, .len = buflen }; + /* First find the longest options part */ + int w = 0; + cmdline_foreach(cmd, cmds) { + int cmd_w = 0; + if (cmd->short_option) + cmd_w += 2 + strlen(cmd->short_option); + if (cmd->long_option) + cmd_w += 3 + strlen(cmd->long_option); + if (cmd->arg_name) + cmd_w += 1 + strlen(cmd->arg_name); + w = OSMO_MAX(w, cmd_w); + } + /* vertical gap */ + w += 2; + + OSMO_STRBUF_PRINTF(sb, "Options:\n"); + cmdline_foreach(cmd, cmds) { + char *line_start = sb.pos; + if (cmd->short_option) + OSMO_STRBUF_PRINTF(sb, " -%s", cmd->short_option); + if (cmd->long_option) + OSMO_STRBUF_PRINTF(sb, " --%s", cmd->long_option); + if (cmd->arg_name) + OSMO_STRBUF_PRINTF(sb, " %s", cmd->arg_name); + if (cmd->doc) { + int have = sb.pos - line_start; + int spaces = OSMO_MAX(1, w - have); + OSMO_STRBUF_PRINTF(sb, "%*s", spaces, ""); + OSMO_STRBUF_PRINTF(sb, "%s", cmd->doc); + } + OSMO_STRBUF_PRINTF(sb, "\n"); + } + return sb.chars_needed; +} + +void cmdline_print_help(const struct cmdline_cmd *cmds) +{ + char buf[8192]; + cmdline_doc_str_buf(buf, sizeof(buf), cmds); + printf("%s", buf); +} + +void cmdline_cmd_store_optarg(struct cmdline_cmd *cmd) +{ + if (cmd->arg_name) + cmd->value = optarg; + else + cmd->value = (cmd->short_option ? : cmd->long_option); +} + +int cmdline_read(struct cmdline_cmd *cmds, int argc, char **argv) +{ + char short_options[256] = {}; + struct option long_options[128] = {}; + int long_options_i = 0; + int long_option_val = 0; + struct osmo_strbuf short_sb = { .buf = short_options, .len = sizeof(short_options) }; + + cmdline_foreach(cmd, cmds) { + if (cmd->short_option) { + OSMO_STRBUF_PRINTF(short_sb, "%s", cmd->short_option); + if (cmd->arg_name) + OSMO_STRBUF_PRINTF(short_sb, ":"); + } + if (cmd->long_option) { + long_options[long_options_i] = (struct option){ + cmd->long_option, + cmd->arg_name ? 1 : 0, + &long_option_val, + long_options_i, + }; + long_options_i++; + } + } + + while (1) { + int option_index = 0; + char c = getopt_long(argc, argv, short_options, long_options, &option_index); + if (c == -1) + break; + if (c == 0) { + struct cmdline_cmd *long_cmd = &cmds[long_option_val]; + cmdline_cmd_store_optarg(long_cmd); + } else { + bool found = false; + cmdline_foreach(cc, cmds) { + if (strchr(cc->short_option, c)) { + cmdline_cmd_store_optarg((struct cmdline_cmd *)cc); + found = true; + break; + } + } + if (!found) { + fprintf(stderr, "%s: Error in command line options. Exiting.\n", argv[0]); + return -1; + } + } + } + + /* positional args */ + cmdline_foreach(cmd, cmds) { + if (optind >= argc) + break; + if (cmd->short_option || cmd->long_option) + continue; + if (!cmd->arg_name) + continue; + ((struct cmdline_cmd *)cmd)->value = argv[optind]; + optind++; + } + + if (optind < argc) { + cmdline_print_help(cmds); + fprintf(stderr, "%s: Unsupported positional argument on command line\n", argv[optind]); + return -1; + } + return 0; +} + +const char *cmdline_get(const struct cmdline_cmd *cmds, const char *option_name, const char *default_val) +{ + cmdline_foreach(cmd, cmds) { + if (cmd->long_option && !strcmp(cmd->long_option, option_name)) + return cmd->value; + if (cmd->short_option && !strcmp(cmd->short_option, option_name)) + return cmd->value; + if (cmd->arg_name && !strcmp(cmd->arg_name, option_name)) + return cmd->value; + } + return default_val; +} + +bool cmdline_get_int(int *dst, int minval, int maxval, int default_val, + const struct cmdline_cmd *cmds, const char *option_name) +{ + const char *str = cmdline_get(cmds, option_name, NULL); + if (!str) { + *dst = default_val; + return true; + } + if (osmo_str_to_int(dst, str, 10, minval, maxval)) { + cmdline_print_help(cmds); + printf("ERROR: invalid integer number: %s\n", str); + return false; + } + if (*dst < minval || *dst > maxval) { + cmdline_print_help(cmds); + printf("ERROR: number out of range: %d <= %d <= %d\n", minval, *dst, maxval); + return false; + } + return true; +} + +/* returns number of configured CPUs in the system, or negative otherwise */ +static int get_num_cpus(void) +{ + static unsigned int num_cpus = 0; + long ln; + + if (num_cpus) + return num_cpus; + + /* This is expensive (goes across /sys, so let's do it only once. It is + * guaranteed it won't change during process span anyway). */ + ln = sysconf(_SC_NPROCESSORS_CONF); + if (ln < 0) { + fprintf(stderr, "sysconf(_SC_NPROCESSORS_CONF) failed: %s\n", strerror(errno)); + return -1; + } + num_cpus = (unsigned int) ln; + return num_cpus; +} + +struct udp_port { + struct llist_head entry; + + /* IP address and UDP port from user input */ + struct osmo_sockaddr osa; + + /* locally bound socket */ + int fd; +}; + +enum data_io_type { + IO_UNUSED = 0, + IO_RECV, + IO_SEND, +}; + +struct data_io { + enum data_io_type type; + struct osmo_sockaddr osa; + struct iovec iov; + struct msghdr msgh; + uint8_t *data; + size_t data_size; + uint8_t *control; + size_t control_size; + int n; +}; + +struct io_queue { + size_t d_size; + struct data_io d[0]; +}; + +struct counter { + uint64_t count; + uint64_t last; +}; + +struct traffic_counter { + struct counter packets; + struct counter bytes; +}; + +uint64_t counter_get(struct counter *c) +{ + uint64_t cnt = c->count; + uint64_t val = cnt - c->last; + c->last = cnt; + return val; +} + +struct traffic_counter g_rx = {}; +struct traffic_counter g_tx = {}; + +struct worker { + int id; + struct io_queue *q; + struct io_uring ring; + pthread_t worker; + struct { + unsigned long long rx_packets; + unsigned long long rx_bytes; + unsigned long long tx_packets; + unsigned long long tx_bytes; + } ctr; +}; + +struct { + int port_nr; + const char *local_addr; + int queue_size; + int buf_size; + int control_size; + int response_size; + int response_n; + int workers_n; + struct udp_port port; + bool cpu_affinity; +} cfg = {}; + +static void data_io_prepare(struct worker *w, struct data_io *d) +{ + void *data_buf = talloc_size(w->q, cfg.buf_size); + void *control_buf = talloc_size(w->q, cfg.control_size); + *d = (struct data_io){ + .type = IO_RECV, + .iov = { + .iov_base = data_buf, + .iov_len = cfg.buf_size, + }, + .msgh = { + .msg_name = &d->osa, + .msg_namelen = sizeof(d->osa), + .msg_iov = &d->iov, + .msg_iovlen = 1, + .msg_control = control_buf, + .msg_controllen = cfg.control_size, + }, + .data = data_buf, + .data_size = cfg.buf_size, + .control = control_buf, + .control_size = cfg.control_size, + }; +} + +static void data_io_prep_recv(struct io_uring *ring, struct udp_port *port, struct data_io *d) +{ + struct io_uring_sqe *sqe; + d->type = IO_RECV; + d->iov.iov_len = d->data_size; + d->msgh.msg_controllen = d->control_size; + sqe = io_uring_get_sqe(ring); + OSMO_ASSERT(sqe); + io_uring_prep_recvmsg(sqe, port->fd, &d->msgh, 0); + io_uring_sqe_set_data(sqe, d); +} + +static void data_io_prep_send(struct io_uring *ring, struct udp_port *port, struct data_io *d) +{ + struct io_uring_sqe *sqe; + d->type = IO_SEND; + sqe = io_uring_get_sqe(ring); + OSMO_ASSERT(sqe); + io_uring_prep_sendmsg(sqe, port->fd, &d->msgh, 0); + io_uring_sqe_set_data(sqe, d); +} + +static bool get_payload_info(struct gtp_flood_payload_info *dst, struct data_io *d) +{ + uint8_t *pi; + uint8_t *len; + size_t copy_len; + + len = d->iov.iov_base + d->iov.iov_len - 1; + if ((*len) > d->iov.iov_len) + return false; + pi = len - (*len); + if (strncmp((void *)pi, "info", 4)) + return false; + copy_len = OSMO_MIN(sizeof(*dst), *len); + *dst = (struct gtp_flood_payload_info){}; + memcpy((void *)dst, pi, copy_len); + return true; +} + +static void data_io_handle_completion(struct worker *w, struct udp_port *port, struct io_uring_cqe *cqe, + int response_size, int response_n) +{ + struct data_io *d; + struct osmo_sockaddr *osa = NULL; + int rc; + struct gtp_flood_payload_info pi; + struct io_uring *ring = &w->ring; + + d = io_uring_cqe_get_data(cqe); + + osa = &d->osa; + rc = cqe->res; + + switch (d->type) { + case IO_RECV: + /* done reading */ + if (OSMO_UNLIKELY(!started)) { + started = true; + OSMO_ASSERT(clock_gettime(CLOCK_MONOTONIC, &ts_start) == 0); + } + if (rc <= 0) { + LOGP(DLGLOBAL, LOGL_ERROR, "%s -> rx error rc=%d flags=0x%x\n", + osa ? osmo_sockaddr_to_str(osa) : "NULL", + rc, cqe->flags); + return; + } +#if 0 + LOGP(DLGLOBAL, LOGL_DEBUG, "%s -> rx rc=%d flags=0x%x: %s\n", + osa ? osmo_sockaddr_to_str(osa) : "NULL", + rc, cqe->flags, + osmo_quote_str(d->iov.iov_base, rc)); +#endif + io_uring_cqe_seen(ring, cqe); + w->ctr.rx_packets++; + w->ctr.rx_bytes += rc; + + if (response_n < 1) { + data_io_prep_recv(ring, port, d); + break; + } + + d->iov.iov_len = rc; + if (get_payload_info(&pi, d)) { + /* set the return TEID */ + struct gtp1u_hdr *gtp_hdr = (void *)d->iov.iov_base; + gtp_hdr->tei = pi.return_teid; + } + + /* resubmit back to sender */ + + /* adjust size? */ + if (response_size > 0) + d->iov.iov_len = response_size; + + data_io_prep_send(ring, port, d); + break; + + case IO_SEND: + /* done writing. */ + if (rc <= 0) { + LOGP(DLGLOBAL, LOGL_ERROR, "%s -> tx error rc=%d flags=0x%x\n", + osa ? osmo_sockaddr_to_str(osa) : "NULL", + rc, cqe->flags); + return; + } +#if 0 + LOGP(DLGLOBAL, LOGL_DEBUG, "%s <- tx rc=%d flags=0x%x: %s\n", + osa ? osmo_sockaddr_to_str(osa) : "NULL", + rc, cqe->flags, + osmo_quote_str(d->iov.iov_base, rc)); +#endif + io_uring_cqe_seen(ring, cqe); + w->ctr.tx_packets++; + w->ctr.tx_bytes += rc; + + d->n++; + + /* Send again? If not, re-submit open slot for reading. */ + if (d->n < response_n) + data_io_prep_send(ring, port, d); + else + data_io_prep_recv(ring, port, d); + break; + + default: + OSMO_ASSERT(0); + } +} + +struct cmdline_cmd cmds[] = { + { + .short_option = "h", + .long_option = "help", + .doc = "Show this help", + }, + { + .short_option = "l", + .long_option = "local-addr", + .arg_name = "IP-ADDR", + .doc = "Listen on local IP address (default is 0.0.0.0).", + }, + { + .short_option = "p", + .long_option = "port", + .arg_name = "UDP-PORT", + .doc = "Listen on local UDP port.", + }, + /* + { + .short_option = "P", + .long_option = "port-range-to", + .arg_name = "UDP-PORT-TO", + .doc = "Listen on a range of ports, from --port to --port-range-to, inclusive.", + }, + */ + { + .short_option = "s", + .long_option = "response-size", + .arg_name = "BYTES", + .doc = "When responding, enlarge or shorten the payload to this size.", + }, + { + .short_option = "n", + .long_option = "response-repeat", + .arg_name = "N", + .doc = "Respond N times, i.e. multiply the returned traffic.", + }, + { + .short_option = "a", + .long_option = "cpu-affinity", + .doc = "Pin each Nth worker to a Nth cpu.", + }, + { + .long_option = "io-uring-queue", + .arg_name = "SIZE", + .doc = "I/O tuning: queue size to use for io_uring, default is 4000.", + }, + { + .long_option = "io-uring-buf", + .arg_name = "SIZE", + .doc = "I/O tuning: maximum payload size, default is 1452.", + }, + { + .long_option = "workers", + .arg_name = "N", + .doc = "Number of rx threads to run", + }, + {} +}; + +static const struct log_info_cat categories[] = { +}; + +const struct log_info udp_responder_log_info = { + .cat = categories, + .num_cat = ARRAY_SIZE(categories), +}; + +static void start_rx_worker(struct worker *w); + +int main(int argc, char **argv) +{ + struct osmo_sockaddr_str addr = {}; + struct osmo_sockaddr osa = {}; + int val; + int rc; + + osmo_init_logging2(OTC_GLOBAL, &udp_responder_log_info); + log_set_log_level(osmo_stderr_target, LOGL_ERROR); + + if (cmdline_read(cmds, argc, argv) + || cmdline_get(cmds, "help", NULL)) { + cmdline_print_help(cmds); + return -1; + } + + if (!cmdline_get_int(&cfg.port_nr, 1, 65535, 23000, cmds, "port")) + return -1; + + cfg.local_addr = cmdline_get(cmds, "local-addr", "0.0.0.0"); + if (osmo_sockaddr_str_from_str(&addr, cfg.local_addr, cfg.port_nr) + || osmo_sockaddr_str_to_osa(&addr, &osa)) { + printf("ERROR: invalid interface or port number: %s:%d\n", cfg.local_addr, cfg.port_nr); + return -1; + } + + if (!cmdline_get_int(&cfg.queue_size, 1, 65535, 4000, cmds, "io-uring-queue")) + return -1; + if (!cmdline_get_int(&cfg.buf_size, 1, 65535, 1452, cmds, "io-uring-buf")) + return -1; + + if (!cmdline_get_int(&cfg.response_size, 0, cfg.buf_size, 0, cmds, "response-size")) + return -1; + + if (!cmdline_get_int(&cfg.response_n, 0, INT_MAX, 1, cmds, "response-repeat")) + return -1; + + if (!cmdline_get_int(&cfg.workers_n, 1, INT_MAX, 4, cmds, "workers")) + return -1; + + if (cmdline_get(cmds, "cpu-affinity", NULL)) + cfg.cpu_affinity = true; + + cfg.port.osa = osa; + cfg.control_size = 1024; + + /* create and bind socket */ + rc = osmo_sock_init_osa(SOCK_DGRAM, IPPROTO_UDP, &cfg.port.osa, NULL, OSMO_SOCK_F_BIND); + /* (logging of errors already happens in osmo_sock_init_osa() */ + if (rc < 0) + return -1; + cfg.port.fd = rc; + LOGP(DLGLOBAL, LOGL_NOTICE, "bound UDP %s fd=%d\n", osmo_sock_get_name2(cfg.port.fd), cfg.port.fd); + + /* Set Don't Fragment (DF) bit on IP packets transmitted by socket: */ + val = IP_PMTUDISC_DO; + rc = setsockopt(cfg.port.fd, IPPROTO_IP, IP_MTU_DISCOVER, &val, sizeof(val)); + if (rc == -1) { + fprintf(stderr, "ERROR: setsockopt(IPPROTO_IP, IP_DONTFRAG) failed errno=%d\n", errno); + return -1; + } + + val = 1; + rc = setsockopt(cfg.port.fd, IPPROTO_IP, IP_PKTINFO, &val, sizeof(val)); + /* TODO: IPv6: setsockopt(s, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one)); */ + if (rc == -1) { + fprintf(stderr, "ERROR: setsockopt(IPPROTO_IP, IP_PKTINFO) failed errno=%d\n", errno); + return -1; + } + + struct worker *workers = talloc_zero_array(OTC_GLOBAL, struct worker, cfg.workers_n); + for (int i = 0; i < cfg.workers_n; i++) { + printf("Starting worker %d\n", i); + workers[i].id = i; + start_rx_worker(&workers[i]); + } + + /* periodically log rx stats */ + while (1) { + static struct timespec last_info_log = {.tv_sec = 0, .tv_nsec = 0}; + struct timespec ts_now; + + if (OSMO_UNLIKELY(!started)) + continue; + + clock_gettime(CLOCK_MONOTONIC, &ts_now); + /* the resolution is in seconds, output stats once per second. */ + if (OSMO_UNLIKELY(ts_now.tv_sec != last_info_log.tv_sec)) { + struct timespec ts_elapsed, ts_diff; + unsigned long long elapsed_usec, diff_usec; + uint64_t diff_rx_packets, diff_rx_bytes, diff_tx_packets, diff_tx_bytes; + uint64_t elapsed_rx_packets = 0, elapsed_rx_bytes = 0, elapsed_tx_packets = 0, elapsed_tx_bytes = 0; + + timespecsub(&ts_now, &ts_start, &ts_elapsed); + timespecsub(&ts_now, &last_info_log, &ts_diff); + elapsed_usec = (ts_elapsed.tv_sec * 1000 * 1000) + (ts_elapsed.tv_nsec / 1000); + diff_usec = (ts_diff.tv_sec * 1000 * 1000) + (ts_diff.tv_nsec / 1000); + if (elapsed_usec == 0) + elapsed_usec = 1; + if (diff_usec == 0) + diff_usec = 1; + last_info_log = ts_now; + + for (int i = 0; i < cfg.workers_n; i++) { + elapsed_rx_packets += workers[i].ctr.rx_packets; + elapsed_rx_bytes += workers[i].ctr.rx_bytes; + elapsed_tx_packets += workers[i].ctr.tx_packets; + elapsed_tx_bytes += workers[i].ctr.tx_bytes; + } + g_rx.packets.count = elapsed_rx_packets; + g_rx.bytes.count = elapsed_rx_bytes; + g_tx.packets.count = elapsed_tx_packets; + g_tx.bytes.count = elapsed_tx_bytes; + + diff_rx_packets = counter_get(&g_rx.packets); + diff_rx_bytes = counter_get(&g_rx.bytes); + diff_tx_packets = counter_get(&g_tx.packets); + diff_tx_bytes = counter_get(&g_tx.bytes); + + if (diff_rx_packets || diff_tx_packets) { + printf("DIFF: %12llu Rx: %8"PRIu64" packets (%6llu kPPS, %8llu Mbps) | Tx: %8"PRIu64" packets (%6llu kPPS, %8llu Mbps)\n", + diff_usec, + diff_rx_packets, + diff_rx_packets * 1000 / diff_usec, + diff_rx_bytes * 8 / diff_usec, + diff_tx_packets, + diff_tx_packets * 1000 / diff_usec, + diff_tx_bytes * 8 / diff_usec); + printf("TOTAL: %12llu Rx: %8"PRIu64" packets (%6llu kPPS, %8llu Mbps) | Tx: %8"PRIu64" packets (%6llu kPPS, %8llu Mbps)\n", + elapsed_usec, + elapsed_rx_packets, + elapsed_rx_packets * 1000 / elapsed_usec, + elapsed_rx_bytes * 8 / elapsed_usec, + elapsed_tx_packets, + elapsed_tx_packets * 1000 / elapsed_usec, + elapsed_tx_bytes * 8 / elapsed_usec); + fflush(stdout); + } + } + int rc = usleep(500 * 1000); + if (rc == -1) + fprintf(stderr, "ERROR: usleep() failed errno=%d\n", errno); + } +} + +static void *rx_worker_func(void *_worker); + +static void start_rx_worker(struct worker *w) +{ + w->q = talloc_zero_size(OTC_GLOBAL, sizeof(struct io_queue) + cfg.queue_size * sizeof(struct data_io)); + OSMO_ASSERT(w->q); + *w->q = (struct io_queue){ + .d_size = cfg.queue_size, + }; + for (int i = 0; i < w->q->d_size; i++) { + struct data_io *d = &w->q->d[i]; + data_io_prepare(w, d); + } + int rc = pthread_create(&w->worker, NULL, rx_worker_func, w); + OSMO_ASSERT(rc >= 0); +} + +static void *rx_worker_func(void *_worker) +{ + struct worker *w = _worker; + struct io_queue *q = w->q; + char thread_name[32]; + pthread_t phtread_id = pthread_self(); + + if (cfg.cpu_affinity) { + cpu_set_t *cpuset; + size_t cpuset_size; + int num_cpus = get_num_cpus(); + int core_id = w->id % num_cpus; + + cpuset = CPU_ALLOC(num_cpus); + cpuset_size = CPU_ALLOC_SIZE(num_cpus); + CPU_ZERO_S(cpuset_size, cpuset); + CPU_SET_S(core_id, cpuset_size, cpuset); + + fprintf(stderr, "Pinning worker %d to CPU %d\n", w->id, core_id); + int rc = pthread_setaffinity_np(pthread_self(), cpuset_size, cpuset) != 0; + if (rc != 0) { + fprintf(stderr, "ERROR: Pinning worker %d to CPU %d: error=%d\n", w->id, core_id, rc); + exit(1); + } + CPU_FREE(cpuset); + } + snprintf(thread_name, sizeof(thread_name), "UdpRspWrk%u", w->id); + if (pthread_setname_np(phtread_id, thread_name) != 0) { + char buf[256]; + int err = errno; + char *err_str = strerror_r(err, buf, sizeof(buf)); + fprintf(stderr, "worker %u: failed setting thread name: %s\n", + w->id, err_str); + } + + int rc = io_uring_queue_init(q->d_size, &w->ring, 0); + OSMO_ASSERT(rc >= 0); + unsigned int vals[2] = {1, 1}; + rc = io_uring_register_iowq_max_workers(&w->ring, &vals[0]); + OSMO_ASSERT(rc == 0); + + for (int i = 0; i < q->d_size; i++) { + struct data_io *d = &q->d[i]; + /* fill once with random printable data */ + for (int j = 0; j < d->data_size; j++) + d->data[j] = 32 + random() % (126 - 32 + 1); + } + + /* fill the queue to start receiving */ + for (int i = 0; i < q->d_size; i++) + data_io_prep_recv(&w->ring, &cfg.port, &q->d[i]); + + struct __kernel_timespec ts_zero = {}; + struct __kernel_timespec ts_1s = { .tv_sec = 1 }; + + while (1) { + uint32_t submitted; + uint32_t completed = 0; + struct io_uring_cqe *cqe; + + /* submit any requests from previous loop */ + submitted = io_uring_submit(&w->ring); + + /* process all pending completions */ + while (io_uring_wait_cqe_timeout(&w->ring, &cqe, &ts_zero) == 0) { + data_io_handle_completion(w, &cfg.port, cqe, cfg.response_size, cfg.response_n); + completed++; + } + + /* Wait a bit longer */ + if (!submitted && !completed) { + if (io_uring_wait_cqe_timeout(&w->ring, &cqe, &ts_1s) == 0) { + data_io_handle_completion(w, &cfg.port, cqe, cfg.response_size, cfg.response_n); + completed++; + } + } + } + + talloc_free(q); + return 0; +} + +#endif /* HAVE_URING */ +