pespin has uploaded this change for review. ( https://gerrit.osmocom.org/c/libosmocore/+/32536 )
Change subject: [WIP, BROKEN] osmo_io: Add io_uring backend ......................................................................
[WIP, BROKEN] osmo_io: Add io_uring backend
Change-Id: I5152129eb84b31ccc9e02bc2a5c5bdb046d331bc --- M configure.ac M include/osmocom/core/osmo_io.h M src/core/Makefile.am M src/core/osmo_io.c M src/core/osmo_io_internal.h A src/core/osmo_io_uring.c 6 files changed, 471 insertions(+), 1 deletion(-)
git pull ssh://gerrit.osmocom.org:29418/libosmocore refs/changes/36/32536/1
diff --git a/configure.ac b/configure.ac index 5e17c7a..3e5d8f6 100644 --- a/configure.ac +++ b/configure.ac @@ -174,6 +174,20 @@
PKG_CHECK_MODULES(TALLOC, [talloc >= 2.1.0])
+AC_ARG_ENABLE([uring], [AS_HELP_STRING([--disable-uring], [Build without io_uring support])], + [ + ENABLE_URING=$enableval + ], + [ + ENABLE_URING="yes" + ]) +AS_IF([test "x$ENABLE_URING" = "xyes"], [ + PKG_CHECK_MODULES(URING, [liburing >= 2]) + AC_DEFINE([HAVE_URING],[1],[Build with io_uring support]) +]) +AM_CONDITIONAL(ENABLE_URING, test "x$ENABLE_URING" = "xyes") +AC_SUBST(ENABLE_URING) + AC_ARG_ENABLE([pcsc], [AS_HELP_STRING([--disable-pcsc], [Build without PC/SC support])], [ ENABLE_PCSC=$enableval diff --git a/include/osmocom/core/osmo_io.h b/include/osmocom/core/osmo_io.h index fa7a745..01a7ada 100644 --- a/include/osmocom/core/osmo_io.h +++ b/include/osmocom/core/osmo_io.h @@ -22,6 +22,7 @@
enum osmo_io_backend { OSMO_IO_BACKEND_POLL, + OSMO_IO_BACKEND_URING, };
extern const struct value_string osmo_io_backend_names[]; diff --git a/src/core/Makefile.am b/src/core/Makefile.am index 80ee458..2f2fc19 100644 --- a/src/core/Makefile.am +++ b/src/core/Makefile.am @@ -4,7 +4,7 @@ LIBVERSION=20:0:0
AM_CPPFLAGS = -I$(top_srcdir)/include -I$(top_builddir)/include -I$(top_builddir) -AM_CFLAGS = -Wall $(TALLOC_CFLAGS) $(PTHREAD_CFLAGS) $(LIBSCTP_CFLAGS) $(LIBMNL_CFLAGS) +AM_CFLAGS = -Wall $(TALLOC_CFLAGS) $(PTHREAD_CFLAGS) $(LIBSCTP_CFLAGS) $(LIBMNL_CFLAGS) $(URING_CFLAGS)
if ENABLE_PSEUDOTALLOC AM_CPPFLAGS += -I$(top_srcdir)/src/pseudotalloc @@ -18,6 +18,7 @@ $(LIBRARY_RT) \ $(PTHREAD_LIBS) \ $(LIBSCTP_LIBS) \ + $(URING_LIBS) \ $(NULL)
libosmocore_la_SOURCES = \ @@ -156,5 +157,9 @@ libosmocore_la_LIBADD += probes.lo endif
+if ENABLE_URING +libosmocore_la_SOURCES += osmo_io_uring.c +endif + crc%gen.c: crcXXgen.c.tpl $(AM_V_GEN)sed -e's/XX/$*/g' $< > $@ diff --git a/src/core/osmo_io.c b/src/core/osmo_io.c index caf776a..3647deb 100644 --- a/src/core/osmo_io.c +++ b/src/core/osmo_io.c @@ -44,6 +44,7 @@
const struct value_string osmo_io_backend_names[] = { { OSMO_IO_BACKEND_POLL, "Poll" }, + { OSMO_IO_BACKEND_URING, "Uring" }, { 0, NULL } };
@@ -52,12 +53,21 @@ /* Used by some tests, can't be static */ struct iofd_backend_ops osmo_iofd_ops;
+#if defined(HAVE_URING) +void osmo_iofd_uring_init(void); +#endif + /*! initialize osmo_io for the current thread */ void osmo_iofd_init(void) { switch (g_io_backend) { case OSMO_IO_BACKEND_POLL: break; +#if defined(HAVE_URING) + case OSMO_IO_BACKEND_URING: + osmo_iofd_uring_init(); + break; +#endif default: OSMO_ASSERT(0); break; @@ -75,6 +85,11 @@ if (!strcmp("POLL", backend)) { g_io_backend = OSMO_IO_BACKEND_POLL; osmo_iofd_ops = iofd_poll_ops; +#if defined(HAVE_URING) + } else if (!strcmp("URING", backend)) { + g_io_backend = OSMO_IO_BACKEND_URING; + osmo_iofd_ops = iofd_uring_ops; +#endif } else { fprintf(stderr, "Invalid LIBOSMO_IO_BACKEND selected (got: %s)\n", backend); exit(1); diff --git a/src/core/osmo_io_internal.h b/src/core/osmo_io_internal.h index 0d61519..a396b53 100644 --- a/src/core/osmo_io_internal.h +++ b/src/core/osmo_io_internal.h @@ -19,6 +19,10 @@ extern struct iofd_backend_ops iofd_poll_ops; #define OSMO_IO_BACKEND_DEFAULT "POLL"
+#if defined(HAVE_URING) +extern struct iofd_backend_ops iofd_uring_ops; +#endif + struct iofd_backend_ops { int (*register_fd)(struct osmo_io_fd *iofd); int (*unregister_fd)(struct osmo_io_fd *iofd); diff --git a/src/core/osmo_io_uring.c b/src/core/osmo_io_uring.c new file mode 100644 index 0000000..b092faa --- /dev/null +++ b/src/core/osmo_io_uring.c @@ -0,0 +1,422 @@ +/*! \file osmo_io_uring.c + * io_uring backend for osmo_io. + * + * (C) 2022-2023 by sysmocom s.f.m.c. + * Author: Daniel Willmann daniel@sysmocom.de + * + * All Rights Reserved. + * + * SPDX-License-Identifier: GPL-2.0+ + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + */ + +/* TODO: + * Parameters: + * - number of simultaneous read/write in uring for given fd + * + */ + +#include <stdio.h> +#include <talloc.h> +#include <unistd.h> +#include <string.h> +#include <stdbool.h> +#include <errno.h> + +#include <sys/eventfd.h> +#include <liburing.h> + +#include <osmocom/core/osmo_io.h> +#include <osmocom/core/linuxlist.h> +#include <osmocom/core/logging.h> +#include <osmocom/core/msgb.h> +#include <osmocom/core/select.h> +#include <osmocom/core/talloc.h> +#include <osmocom/core/utils.h> +#include <osmocom/core/socket.h> + +#include "../config.h" +#include "osmo_io_internal.h" + +#define IOFD_URING_ENTRIES 4096 + +struct osmo_io_uring { + struct osmo_fd event_ofd; + struct io_uring ring; +}; + +static __thread struct osmo_io_uring g_ring; + +int iofd_uring_poll_cb(struct osmo_fd *ofd, unsigned int what); + +/*! initialize the uring */ +void osmo_iofd_uring_init(void) +{ + int rc; + rc = io_uring_queue_init(IOFD_URING_ENTRIES, &g_ring.ring, 0); + if (rc < 0) + OSMO_ASSERT(0); + + rc = eventfd(0, 0); + if (rc < 0) { + io_uring_queue_exit(&g_ring.ring); + OSMO_ASSERT(0); + } + + /* FIXME: This can't be done in _init because it depends on the osmo_fd constructor being run and order is unspecified */ + osmo_fd_setup(&g_ring.event_ofd, rc, OSMO_FD_READ, iofd_uring_poll_cb, &g_ring.ring, 1); + osmo_fd_register(&g_ring.event_ofd); + io_uring_register_eventfd(&g_ring.ring, rc); +} + +static int iofd_uring_cqe(struct io_uring *ring); +int iofd_uring_poll_cb(struct osmo_fd *ofd, unsigned int what) +{ + struct io_uring *ring = ofd->data; + eventfd_t val; + int rc; + + if (what & OSMO_FD_READ) { + rc = eventfd_read(ofd->fd, &val); + // TODO: Logging + if (rc < 0) + return rc; + + iofd_uring_cqe(ring); + } + if (what & OSMO_FD_WRITE) { + OSMO_ASSERT(0); + } + + return 0; +} + +static void iofd_uring_submit_read(struct osmo_io_fd *iofd) +{ + struct msgb *msg; + struct iofd_msghdr *msghdr; + struct io_uring_sqe *sqe = io_uring_get_sqe(&g_ring.ring); + if (!sqe) + // FIXME + OSMO_ASSERT(0); + + // TODO: This only works if we have one read per fd + msg = iofd_msgb_pending_or_alloc(iofd); + if (!msg) { + // FIXME: complain + return; + } + + msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_READ, msg); + if (!msghdr) + return; + + msghdr->iov[0].iov_base = msgb_data(msg); + msghdr->iov[0].iov_len = msgb_tailroom(msg); + + // Prep msgb/iov + io_uring_prep_readv(sqe, iofd->fd, msghdr->iov, 1, 0); + io_uring_sqe_set_data(sqe, msghdr); + + io_uring_submit(&g_ring.ring); + iofd->u.uring.read_pending = true; +} + +static void iofd_uring_submit_recvfrom(struct osmo_io_fd *iofd) +{ + struct msgb *msg; + struct iofd_msghdr *msghdr; + struct io_uring_sqe *sqe = io_uring_get_sqe(&g_ring.ring); + if (!sqe) + // FIXME + OSMO_ASSERT(0); + + msg = iofd_msgb_pending_or_alloc(iofd); + if (!msg) { + // FIXME: complain + return; + } + + msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_RECVFROM, msg); + if (!msghdr) + return; + + msghdr->iov[0].iov_base = msgb_data(msg); + msghdr->iov[0].iov_len = msgb_tailroom(msg); + + msghdr->hdr.msg_iov = &msghdr->iov[0]; + msghdr->hdr.msg_iovlen = 1; + msghdr->hdr.msg_name = &msghdr->osa.u.sa; + msghdr->hdr.msg_namelen = osmo_sockaddr_size(&msghdr->osa); + + // Prep msgb/iov + io_uring_prep_recvmsg(sqe, iofd->fd, &msghdr->hdr, msghdr->flags); + io_uring_sqe_set_data(sqe, msghdr); + + io_uring_submit(&g_ring.ring); + iofd->u.uring.read_pending = true; +} + +void iofd_uring_read_enable(struct osmo_io_fd *iofd) +{ + iofd->u.uring.read_enabled = true; + + if (iofd->u.uring.read_pending) + return; + + switch (iofd->mode) { + case OSMO_IO_FD_MODE_READ_WRITE: + iofd_uring_submit_read(iofd); + break; + case OSMO_IO_FD_MODE_RECVFROM_SENDTO: + iofd_uring_submit_recvfrom(iofd); + break; + default: + OSMO_ASSERT(0); + } + +} + +static void iofd_uring_handle_read(struct iofd_msghdr *msghdr, int rc) +{ + struct osmo_io_fd * iofd = msghdr->iofd; + struct msgb *msg = msghdr->msg; + + if (rc > 0) + msgb_put(msg, rc); + + if (!iofd->closed) + iofd_handle_segmented_read(iofd, msg, rc); + + iofd_msghdr_free(msghdr); + if (iofd->u.uring.read_enabled && !iofd->closed) { + iofd_uring_submit_read(iofd); + } else { + iofd->u.uring.read_pending = false; + } +} + +static void iofd_uring_handle_recvfrom(struct iofd_msghdr *msghdr, int rc) +{ + struct osmo_io_fd * iofd = msghdr->iofd; + struct msgb *msg = msghdr->msg; + + if (rc > 0) + msgb_put(msg, rc); + + if (!iofd->closed) + // FIXME: Include flags + iofd->io_ops.recvfrom_cb(iofd, rc, msghdr->msg, &msghdr->osa); + + iofd_msghdr_free(msghdr); + + if (iofd->u.uring.read_enabled && !iofd->closed) { + iofd_uring_submit_recvfrom(iofd); + } else { + iofd->u.uring.read_pending = false; + } +} + +static int iofd_uring_submit_tx(struct osmo_io_fd *iofd); + +static void iofd_uring_handle_tx(struct iofd_msghdr *msghdr, int rc) +{ + struct osmo_io_fd *iofd = msghdr->iofd; + + if (iofd->closed) + goto out_free; + + if (rc < 0) { + if (msghdr->action == IOFD_ACT_WRITE) + iofd->io_ops.write_cb(iofd, rc, msghdr->msg); + else if (msghdr->action == IOFD_ACT_SENDTO) + iofd->io_ops.sendto_cb(iofd, rc, msghdr->msg, &msghdr->osa); + else + OSMO_ASSERT(0); + goto out_free; + } + + if (rc < msgb_length(msghdr->msg)) { + iofd_txqueue_enqueue_front(iofd, msghdr); + goto out; + } + + if (msghdr->action == IOFD_ACT_WRITE) + iofd->io_ops.write_cb(iofd, rc, msghdr->msg); + else if (msghdr->action == IOFD_ACT_SENDTO) + iofd->io_ops.sendto_cb(iofd, rc, msghdr->msg, &msghdr->osa); + else + OSMO_ASSERT(0); + +out_free: + msgb_free(msghdr->msg); + iofd_msghdr_free(msghdr); + +out: + iofd->u.uring.write_pending = false; + if (iofd->u.uring.write_enabled && !iofd->closed) { + iofd_uring_submit_tx(iofd); + } +} + +static void iofd_uring_handle_completion(struct iofd_msghdr *msghdr, int res) +{ + struct osmo_io_fd *iofd = msghdr->iofd; + + switch (msghdr->action) { + case IOFD_ACT_READ: + iofd_uring_handle_read(msghdr, res); + break; + case IOFD_ACT_RECVFROM: + iofd_uring_handle_recvfrom(msghdr, res); + break; + case IOFD_ACT_WRITE: + /* Fallthrough */ + case IOFD_ACT_SENDTO: + iofd_uring_handle_tx(msghdr, res); + break; + default: + OSMO_ASSERT(0) + } + + if (iofd->closed && !iofd->u.uring.read_pending && ! iofd->u.uring.write_pending) + talloc_free(iofd); +} + +static int iofd_uring_cqe(struct io_uring *ring) +{ + int rc; + struct io_uring_cqe *cqe; + struct iofd_msghdr *msghdr; + + do { + /* Maybe use peek_batch? */ + rc = io_uring_peek_cqe(ring, &cqe); + if (rc < 0) + break; + + msghdr = io_uring_cqe_get_data(cqe); + OSMO_ASSERT(msghdr); + + iofd_uring_handle_completion(msghdr, cqe->res); + // FIXME: Call seen inside the handlers? + /* Hand the entry back to the kernel */ + io_uring_cqe_seen(ring, cqe); + } while (rc == 0); + + return 0; +} + +static int iofd_uring_submit_write(struct osmo_io_fd *iofd, struct iofd_msghdr *msghdr) +{ + struct io_uring_sqe *sqe = io_uring_get_sqe(&g_ring.ring); + if (!sqe) + // FIXME + OSMO_ASSERT(0); + + msghdr->iov[0].iov_base = msgb_data(msghdr->msg); + msghdr->iov[0].iov_len = msgb_length(msghdr->msg); + msghdr->hdr.msg_iov = &msghdr->iov[0]; + msghdr->hdr.msg_iovlen = 1; + + // Prep msgb/iov + io_uring_prep_writev(sqe, msghdr->iofd->fd, msghdr->iov, 1, 0); + io_uring_sqe_set_data(sqe, msghdr); + + io_uring_submit(&g_ring.ring); + + return 0; +} + +static int iofd_uring_submit_sendto(struct osmo_io_fd *iofd, struct iofd_msghdr *msghdr) +{ + struct io_uring_sqe *sqe = io_uring_get_sqe(&g_ring.ring); + if (!sqe) + // FIXME + OSMO_ASSERT(0); + + msghdr->iov[0].iov_base = msgb_data(msghdr->msg); + msghdr->iov[0].iov_len = msgb_length(msghdr->msg); + msghdr->hdr.msg_iov = &msghdr->iov[0]; + msghdr->hdr.msg_iovlen = 1; + msghdr->hdr.msg_name = &msghdr->osa.u.sa; + msghdr->hdr.msg_namelen = osmo_sockaddr_size(&msghdr->osa); + + // Prep msgb/iov + io_uring_prep_sendmsg(sqe, msghdr->iofd->fd, &msghdr->hdr, msghdr->flags); + io_uring_sqe_set_data(sqe, msghdr); + + io_uring_submit(&g_ring.ring); + + return 0; +} + +static int iofd_uring_submit_tx(struct osmo_io_fd *iofd) +{ + int rc; + struct iofd_msghdr *msghdr; + + msghdr = iofd_txqueue_dequeue(iofd); + if (!msghdr) + return -ENODATA; + + switch (msghdr->action) { + case IOFD_ACT_WRITE: + rc = iofd_uring_submit_write(iofd, msghdr); + break; + case IOFD_ACT_SENDTO: + rc = iofd_uring_submit_sendto(iofd, msghdr); + break; + default: + OSMO_ASSERT(0); + } + if (rc == 0) + iofd->u.uring.write_pending = true; + + return rc; +} + +void iofd_uring_write_enable(struct osmo_io_fd *iofd) +{ + iofd->u.uring.write_enabled = true; + + if (iofd->u.uring.write_pending) + return; + + iofd_uring_submit_tx(iofd); +} + +void iofd_uring_write_disable(struct osmo_io_fd *iofd) +{ + iofd->u.uring.write_enabled = false; +} + +void iofd_uring_read_disable(struct osmo_io_fd *iofd) +{ + iofd->u.uring.read_enabled = false; +} + +int iofd_uring_close(struct osmo_io_fd *iofd) +{ + if (iofd->u.uring.read_pending || iofd->u.uring.write_pending) + return 0; + + return close(iofd->fd); +} + +struct iofd_backend_ops iofd_uring_ops = { + .close = iofd_uring_close, + .write_enable = iofd_uring_write_enable, + .write_disable = iofd_uring_write_disable, + .read_enable = iofd_uring_read_enable, + .read_disable = iofd_uring_read_disable, +};