daniel has submitted this change. ( https://gerrit.osmocom.org/c/libosmocore/+/32536 )
Change subject: osmo_io: Add io_uring backend ......................................................................
osmo_io: Add io_uring backend
Change-Id: I5152129eb84b31ccc9e02bc2a5c5bdb046d331bc --- M configure.ac M contrib/libosmocore.spec.in M include/osmocom/core/osmo_io.h M src/core/Makefile.am M src/core/osmo_io.c M src/core/osmo_io_internal.h A src/core/osmo_io_uring.c M tests/osmo_io/osmo_io_test.c 8 files changed, 485 insertions(+), 3 deletions(-)
Approvals: Jenkins Builder: Verified pespin: Looks good to me, approved
diff --git a/configure.ac b/configure.ac index ef15f22..2fc9895 100644 --- a/configure.ac +++ b/configure.ac @@ -175,6 +175,20 @@
PKG_CHECK_MODULES(TALLOC, [talloc >= 2.1.0])
+AC_ARG_ENABLE([uring], [AS_HELP_STRING([--disable-uring], [Build without io_uring support])], + [ + ENABLE_URING=$enableval + ], + [ + ENABLE_URING="yes" + ]) +AS_IF([test "x$ENABLE_URING" = "xyes"], [ + PKG_CHECK_MODULES(URING, [liburing >= 0.7]) + AC_DEFINE([HAVE_URING],[1],[Build with io_uring support]) +]) +AM_CONDITIONAL(ENABLE_URING, test "x$ENABLE_URING" = "xyes") +AC_SUBST(ENABLE_URING) + AC_ARG_ENABLE([pcsc], [AS_HELP_STRING([--disable-pcsc], [Build without PC/SC support])], [ ENABLE_PCSC=$enableval diff --git a/contrib/libosmocore.spec.in b/contrib/libosmocore.spec.in index fdd1a65..3fde143 100644 --- a/contrib/libosmocore.spec.in +++ b/contrib/libosmocore.spec.in @@ -32,6 +32,7 @@ BuildRequires: pkgconfig(talloc) >= 2.1.0 BuildRequires: pkgconfig(libmnl) BuildRequires: pkgconfig(libsystemd) +BuildRequires: pkgconfig(liburing)
%description libosmocore is a package with various utility functions that were diff --git a/include/osmocom/core/osmo_io.h b/include/osmocom/core/osmo_io.h index 8f3c060..b780d9a 100644 --- a/include/osmocom/core/osmo_io.h +++ b/include/osmocom/core/osmo_io.h @@ -27,6 +27,7 @@
enum osmo_io_backend { OSMO_IO_BACKEND_POLL, + OSMO_IO_BACKEND_IO_URING, };
extern const struct value_string osmo_io_backend_names[]; diff --git a/src/core/Makefile.am b/src/core/Makefile.am index 80ee458..2f2fc19 100644 --- a/src/core/Makefile.am +++ b/src/core/Makefile.am @@ -4,7 +4,7 @@ LIBVERSION=20:0:0
AM_CPPFLAGS = -I$(top_srcdir)/include -I$(top_builddir)/include -I$(top_builddir) -AM_CFLAGS = -Wall $(TALLOC_CFLAGS) $(PTHREAD_CFLAGS) $(LIBSCTP_CFLAGS) $(LIBMNL_CFLAGS) +AM_CFLAGS = -Wall $(TALLOC_CFLAGS) $(PTHREAD_CFLAGS) $(LIBSCTP_CFLAGS) $(LIBMNL_CFLAGS) $(URING_CFLAGS)
if ENABLE_PSEUDOTALLOC AM_CPPFLAGS += -I$(top_srcdir)/src/pseudotalloc @@ -18,6 +18,7 @@ $(LIBRARY_RT) \ $(PTHREAD_LIBS) \ $(LIBSCTP_LIBS) \ + $(URING_LIBS) \ $(NULL)
libosmocore_la_SOURCES = \ @@ -156,5 +157,9 @@ libosmocore_la_LIBADD += probes.lo endif
+if ENABLE_URING +libosmocore_la_SOURCES += osmo_io_uring.c +endif + crc%gen.c: crcXXgen.c.tpl $(AM_V_GEN)sed -e's/XX/$*/g' $< > $@ diff --git a/src/core/osmo_io.c b/src/core/osmo_io.c index bccf7af..e176099 100644 --- a/src/core/osmo_io.c +++ b/src/core/osmo_io.c @@ -47,6 +47,7 @@
const struct value_string osmo_io_backend_names[] = { { OSMO_IO_BACKEND_POLL, "poll" }, + { OSMO_IO_BACKEND_IO_URING, "io_uring" }, { 0, NULL } };
@@ -55,12 +56,21 @@ /* Used by some tests, can't be static */ struct iofd_backend_ops osmo_iofd_ops;
+#if defined(HAVE_URING) +void osmo_iofd_uring_init(void); +#endif + /*! initialize osmo_io for the current thread */ void osmo_iofd_init(void) { switch (g_io_backend) { case OSMO_IO_BACKEND_POLL: break; +#if defined(HAVE_URING) + case OSMO_IO_BACKEND_IO_URING: + osmo_iofd_uring_init(); + break; +#endif default: OSMO_ASSERT(0); break; @@ -78,6 +88,11 @@ if (!strcmp("POLL", backend)) { g_io_backend = OSMO_IO_BACKEND_POLL; osmo_iofd_ops = iofd_poll_ops; +#if defined(HAVE_URING) + } else if (!strcmp("IO_URING", backend)) { + g_io_backend = OSMO_IO_BACKEND_IO_URING; + osmo_iofd_ops = iofd_uring_ops; +#endif } else { fprintf(stderr, "Invalid osmo_io backend requested: "%s"\nCheck the environment variable %s\n", backend, OSMO_IO_BACKEND_ENV); exit(1); diff --git a/src/core/osmo_io_internal.h b/src/core/osmo_io_internal.h index 0f0465d..5b7ab90 100644 --- a/src/core/osmo_io_internal.h +++ b/src/core/osmo_io_internal.h @@ -19,6 +19,10 @@ extern const struct iofd_backend_ops iofd_poll_ops; #define OSMO_IO_BACKEND_DEFAULT "POLL"
+#if defined(HAVE_URING) +extern const struct iofd_backend_ops iofd_uring_ops; +#endif + struct iofd_backend_ops { int (*register_fd)(struct osmo_io_fd *iofd); int (*unregister_fd)(struct osmo_io_fd *iofd); @@ -90,9 +94,9 @@ } poll; struct { bool read_enabled; - bool read_pending; - bool write_pending; bool write_enabled; + void *read_msghdr; + void *write_msghdr; /* TODO: index into array of registered fd's? */ } uring; } u; diff --git a/src/core/osmo_io_uring.c b/src/core/osmo_io_uring.c new file mode 100644 index 0000000..84b7b4c --- /dev/null +++ b/src/core/osmo_io_uring.c @@ -0,0 +1,427 @@ +/*! \file osmo_io_uring.c + * io_uring backend for osmo_io. + * + * (C) 2022-2023 by sysmocom s.f.m.c. + * Author: Daniel Willmann daniel@sysmocom.de + * + * All Rights Reserved. + * + * SPDX-License-Identifier: GPL-2.0+ + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + */ + +/* TODO: + * Parameters: + * - number of simultaneous read/write in uring for given fd + * + */ + +#include "../config.h" +#if defined(__linux__) + +#include <stdio.h> +#include <talloc.h> +#include <unistd.h> +#include <string.h> +#include <stdbool.h> +#include <errno.h> + +#include <sys/eventfd.h> +#include <liburing.h> + +#include <osmocom/core/osmo_io.h> +#include <osmocom/core/linuxlist.h> +#include <osmocom/core/logging.h> +#include <osmocom/core/msgb.h> +#include <osmocom/core/select.h> +#include <osmocom/core/talloc.h> +#include <osmocom/core/utils.h> +#include <osmocom/core/socket.h> + +#include "osmo_io_internal.h" + +#define IOFD_URING_ENTRIES 4096 + +struct osmo_io_uring { + struct osmo_fd event_ofd; + struct io_uring ring; +}; + +static __thread struct osmo_io_uring g_ring; + +static void iofd_uring_cqe(struct io_uring *ring); +static int iofd_uring_poll_cb(struct osmo_fd *ofd, unsigned int what) +{ + struct io_uring *ring = ofd->data; + eventfd_t val; + int rc; + + if (what & OSMO_FD_READ) { + rc = eventfd_read(ofd->fd, &val); + if (rc < 0) { + LOGP(DLIO, LOGL_ERROR, "eventfd_read() returned error\n"); + return rc; + } + + iofd_uring_cqe(ring); + } + if (what & OSMO_FD_WRITE) + OSMO_ASSERT(0); + + return 0; +} + +/*! initialize the uring and tie it into our event loop */ +void osmo_iofd_uring_init(void) +{ + int rc; + rc = io_uring_queue_init(IOFD_URING_ENTRIES, &g_ring.ring, 0); + if (rc < 0) + OSMO_ASSERT(0); + + rc = eventfd(0, 0); + if (rc < 0) { + io_uring_queue_exit(&g_ring.ring); + OSMO_ASSERT(0); + } + + osmo_fd_setup(&g_ring.event_ofd, rc, OSMO_FD_READ, iofd_uring_poll_cb, &g_ring.ring, 0); + osmo_fd_register(&g_ring.event_ofd); + io_uring_register_eventfd(&g_ring.ring, rc); +} + + +static void iofd_uring_submit_recv(struct osmo_io_fd *iofd, enum iofd_msg_action action) +{ + struct msgb *msg; + struct iofd_msghdr *msghdr; + struct io_uring_sqe *sqe; + + msg = iofd_msgb_pending_or_alloc(iofd); + if (!msg) { + LOGPIO(iofd, LOGL_ERROR, "Could not allocate msgb for reading\n"); + OSMO_ASSERT(0); + } + + msghdr = iofd_msghdr_alloc(iofd, action, msg); + if (!msghdr) { + LOGPIO(iofd, LOGL_ERROR, "Could not allocate msghdr for reading\n"); + OSMO_ASSERT(0); + } + + msghdr->iov[0].iov_base = msg->tail; + msghdr->iov[0].iov_len = msgb_tailroom(msg); + + switch (action) { + case IOFD_ACT_READ: + break; + case IOFD_ACT_RECVFROM: + msghdr->hdr.msg_iov = &msghdr->iov[0]; + msghdr->hdr.msg_iovlen = 1; + msghdr->hdr.msg_name = &msghdr->osa.u.sa; + msghdr->hdr.msg_namelen = osmo_sockaddr_size(&msghdr->osa); + break; + default: + OSMO_ASSERT(0); + } + + sqe = io_uring_get_sqe(&g_ring.ring); + if (!sqe) { + LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n"); + OSMO_ASSERT(0); + } + + switch (action) { + case IOFD_ACT_READ: + io_uring_prep_readv(sqe, iofd->fd, msghdr->iov, 1, 0); + break; + case IOFD_ACT_RECVFROM: + io_uring_prep_recvmsg(sqe, iofd->fd, &msghdr->hdr, msghdr->flags); + break; + default: + OSMO_ASSERT(0); + } + io_uring_sqe_set_data(sqe, msghdr); + + io_uring_submit(&g_ring.ring); + /* NOTE: This only works if we have one read per fd */ + iofd->u.uring.read_msghdr = msghdr; +} + +static void iofd_uring_handle_recv(struct iofd_msghdr *msghdr, int rc) +{ + struct osmo_io_fd *iofd = msghdr->iofd; + struct msgb *msg = msghdr->msg; + + if (rc > 0) + msgb_put(msg, rc); + + if (!IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED)) + iofd_handle_recv(iofd, msg, rc, msghdr); + + if (iofd->u.uring.read_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED)) + iofd_uring_submit_recv(iofd, msghdr->action); + else + iofd->u.uring.read_msghdr = NULL; + + + iofd_msghdr_free(msghdr); +} + +static int iofd_uring_submit_tx(struct osmo_io_fd *iofd); + +static void iofd_uring_handle_tx(struct iofd_msghdr *msghdr, int rc) +{ + struct osmo_io_fd *iofd = msghdr->iofd; + + if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED)) + goto out_free; + + /* Error during write */ + if (rc < 0) { + if (msghdr->action == IOFD_ACT_WRITE) + iofd->io_ops.write_cb(iofd, rc, msghdr->msg); + else if (msghdr->action == IOFD_ACT_SENDTO) + iofd->io_ops.sendto_cb(iofd, rc, msghdr->msg, &msghdr->osa); + else + OSMO_ASSERT(0); + goto out_free; + } + + /* Incomplete write */ + if (rc < msgb_length(msghdr->msg)) { + /* Re-enqueue remaining data */ + msgb_pull(msghdr->msg, rc); + msghdr->iov[0].iov_len = msgb_length(msghdr->msg); + iofd_txqueue_enqueue_front(iofd, msghdr); + goto out; + } + + if (msghdr->action == IOFD_ACT_WRITE) + iofd->io_ops.write_cb(iofd, rc, msghdr->msg); + else if (msghdr->action == IOFD_ACT_SENDTO) + iofd->io_ops.sendto_cb(iofd, rc, msghdr->msg, &msghdr->osa); + else + OSMO_ASSERT(0); + +out_free: + msgb_free(msghdr->msg); + iofd_msghdr_free(msghdr); + +out: + iofd->u.uring.write_msghdr = NULL; + if (iofd->u.uring.write_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED)) + iofd_uring_submit_tx(iofd); +} + +static void iofd_uring_handle_completion(struct iofd_msghdr *msghdr, int res) +{ + struct osmo_io_fd *iofd = msghdr->iofd; + + IOFD_FLAG_SET(iofd, IOFD_FLAG_IN_CALLBACK); + + switch (msghdr->action) { + case IOFD_ACT_READ: + case IOFD_ACT_RECVFROM: + iofd_uring_handle_recv(msghdr, res); + break; + case IOFD_ACT_WRITE: + case IOFD_ACT_SENDTO: + iofd_uring_handle_tx(msghdr, res); + break; + default: + OSMO_ASSERT(0) + } + + if (!iofd->u.uring.read_msghdr && !iofd->u.uring.write_msghdr) + IOFD_FLAG_UNSET(iofd, IOFD_FLAG_IN_CALLBACK); + + if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) && !iofd->u.uring.read_msghdr && !iofd->u.uring.write_msghdr) + talloc_free(iofd); +} + +static void iofd_uring_cqe(struct io_uring *ring) +{ + int rc; + struct io_uring_cqe *cqe; + struct iofd_msghdr *msghdr; + + while (io_uring_peek_cqe(ring, &cqe) == 0) { + + msghdr = io_uring_cqe_get_data(cqe); + if (!msghdr) { + LOGP(DLIO, LOGL_DEBUG, "Cancellation returned\n"); + io_uring_cqe_seen(ring, cqe); + continue; + } + + rc = cqe->res; + /* Hand the entry back to the kernel before */ + io_uring_cqe_seen(ring, cqe); + + iofd_uring_handle_completion(msghdr, rc); + + } +} + +static int iofd_uring_submit_tx(struct osmo_io_fd *iofd) +{ + struct io_uring_sqe *sqe; + struct iofd_msghdr *msghdr; + + msghdr = iofd_txqueue_dequeue(iofd); + if (!msghdr) + return -ENODATA; + + sqe = io_uring_get_sqe(&g_ring.ring); + if (!sqe) { + LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n"); + OSMO_ASSERT(0); + } + + io_uring_sqe_set_data(sqe, msghdr); + + switch (msghdr->action) { + case IOFD_ACT_WRITE: + case IOFD_ACT_SENDTO: + io_uring_prep_sendmsg(sqe, msghdr->iofd->fd, &msghdr->hdr, msghdr->flags); + break; + default: + OSMO_ASSERT(0); + } + + io_uring_submit(&g_ring.ring); + iofd->u.uring.write_msghdr = msghdr; + + return 0; +} + +static void iofd_uring_write_enable(struct osmo_io_fd *iofd); +static void iofd_uring_read_enable(struct osmo_io_fd *iofd); + +static int iofd_uring_register(struct osmo_io_fd *iofd) +{ + return 0; +} + +static int iofd_uring_unregister(struct osmo_io_fd *iofd) +{ + struct io_uring_sqe *sqe; + if (iofd->u.uring.read_msghdr) { + sqe = io_uring_get_sqe(&g_ring.ring); + OSMO_ASSERT(sqe != NULL); + io_uring_sqe_set_data(sqe, NULL); + LOGPIO(iofd, LOGL_DEBUG, "Cancelling read\n"); + io_uring_prep_cancel(sqe, iofd->u.uring.read_msghdr, 0); + } + + if (iofd->u.uring.write_msghdr) { + sqe = io_uring_get_sqe(&g_ring.ring); + OSMO_ASSERT(sqe != NULL); + io_uring_sqe_set_data(sqe, NULL); + LOGPIO(iofd, LOGL_DEBUG, "Cancelling write\n"); + io_uring_prep_cancel(sqe, iofd->u.uring.write_msghdr, 0); + } + io_uring_submit(&g_ring.ring); + + return 0; +} + +static void iofd_uring_write_enable(struct osmo_io_fd *iofd) +{ + iofd->u.uring.write_enabled = true; + + if (iofd->u.uring.write_msghdr) + return; + + if (osmo_iofd_txqueue_len(iofd) > 0) + iofd_uring_submit_tx(iofd); + else if (iofd->mode == OSMO_IO_FD_MODE_READ_WRITE) { + /* Empty write request to check when the socket is connected */ + struct iofd_msghdr *msghdr; + struct io_uring_sqe *sqe; + struct msgb *msg = msgb_alloc_headroom(0, 0, "io_uring write dummy"); + if (!msg) { + LOGPIO(iofd, LOGL_ERROR, "Could not allocate msgb for writing\n"); + OSMO_ASSERT(0); + } + msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_WRITE, msg); + if (!msghdr) { + LOGPIO(iofd, LOGL_ERROR, "Could not allocate msghdr for writing\n"); + OSMO_ASSERT(0); + } + + msghdr->iov[0].iov_base = msgb_data(msg); + msghdr->iov[0].iov_len = msgb_tailroom(msg); + + sqe = io_uring_get_sqe(&g_ring.ring); + if (!sqe) { + LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n"); + OSMO_ASSERT(0); + } + // Prep msgb/iov + io_uring_prep_writev(sqe, iofd->fd, msghdr->iov, 1, 0); + io_uring_sqe_set_data(sqe, msghdr); + + io_uring_submit(&g_ring.ring); + iofd->u.uring.write_msghdr = msghdr; + } +} + +static void iofd_uring_write_disable(struct osmo_io_fd *iofd) +{ + iofd->u.uring.write_enabled = false; +} + +static void iofd_uring_read_enable(struct osmo_io_fd *iofd) +{ + iofd->u.uring.read_enabled = true; + + if (iofd->u.uring.read_msghdr) + return; + + switch (iofd->mode) { + case OSMO_IO_FD_MODE_READ_WRITE: + iofd_uring_submit_recv(iofd, IOFD_ACT_READ); + break; + case OSMO_IO_FD_MODE_RECVFROM_SENDTO: + iofd_uring_submit_recv(iofd, IOFD_ACT_RECVFROM); + break; + default: + OSMO_ASSERT(0); + } +} + +static void iofd_uring_read_disable(struct osmo_io_fd *iofd) +{ + iofd->u.uring.read_enabled = false; +} + +static int iofd_uring_close(struct osmo_io_fd *iofd) +{ + iofd_uring_read_disable(iofd); + iofd_uring_write_disable(iofd); + iofd_uring_unregister(iofd); + return close(iofd->fd); +} + +const struct iofd_backend_ops iofd_uring_ops = { + .register_fd = iofd_uring_register, + .unregister_fd = iofd_uring_unregister, + .close = iofd_uring_close, + .write_enable = iofd_uring_write_enable, + .write_disable = iofd_uring_write_disable, + .read_enable = iofd_uring_read_enable, + .read_disable = iofd_uring_read_disable, +}; + +#endif /* defined(__linux__) */ diff --git a/tests/osmo_io/osmo_io_test.c b/tests/osmo_io/osmo_io_test.c index cff594b..93beef4 100644 --- a/tests/osmo_io/osmo_io_test.c +++ b/tests/osmo_io/osmo_io_test.c @@ -95,6 +95,9 @@
osmo_iofd_free(iofd1); osmo_iofd_free(iofd2); + + for (int i = 0; i < 128; i++) + osmo_select_main(1); }
static void recvfrom_cb(struct osmo_io_fd *iofd, int rc, struct msgb *msg, @@ -147,6 +150,9 @@
osmo_iofd_free(iofd1); osmo_iofd_free(iofd2); + + for (int i = 0; i < 128; i++) + osmo_select_main(1); } static const struct log_info_cat default_categories[] = { };