laforge has submitted this change. ( https://gerrit.osmocom.org/c/libosmocore/+/40494?usp=email )
Change subject: Send multiple read/recvfrom/recvmsg SQEs in advance ......................................................................
Send multiple read/recvfrom/recvmsg SQEs in advance
Multiple read or recvfrom operations can be submitted via SQEs when using io_uring. This allows reading multiple packet / more data between calls of osmo_select_main() the main loop.
Having more than one SQE submitted is optional and the number can be controlled via environment variable.
Related: OS#6705 Change-Id: Id50a8900fa2fe6de553e5025feae7e1e8d501e30 --- M include/osmocom/core/osmo_io.h M src/core/libosmocore.map M src/core/osmo_io.c M src/core/osmo_io_internal.h M src/core/osmo_io_uring.c 5 files changed, 103 insertions(+), 14 deletions(-)
Approvals: pespin: Looks good to me, approved laforge: Looks good to me, but someone else must approve Jenkins Builder: Verified
diff --git a/include/osmocom/core/osmo_io.h b/include/osmocom/core/osmo_io.h index 0b3dc83..0733240 100644 --- a/include/osmocom/core/osmo_io.h +++ b/include/osmocom/core/osmo_io.h @@ -213,6 +213,7 @@ enum osmo_io_fd_mode mode, const struct osmo_io_ops *ioops, void *data); int osmo_iofd_set_cmsg_size(struct osmo_io_fd *iofd, size_t cmsg_size); int osmo_iofd_set_io_buffers(struct osmo_io_fd *iofd, enum osmo_io_op op, uint8_t buffers); +int osmo_iofd_set_sqes(struct osmo_io_fd *iofd, enum osmo_io_op op, uint8_t sqes); int osmo_iofd_register(struct osmo_io_fd *iofd, int fd); int osmo_iofd_unregister(struct osmo_io_fd *iofd); unsigned int osmo_iofd_txqueue_len(struct osmo_io_fd *iofd); diff --git a/src/core/libosmocore.map b/src/core/libosmocore.map index 239597c..198b31e 100644 --- a/src/core/libosmocore.map +++ b/src/core/libosmocore.map @@ -276,6 +276,7 @@ osmo_iofd_set_alloc_info; osmo_iofd_set_cmsg_size; osmo_iofd_set_io_buffers; +osmo_iofd_set_sqes; osmo_iofd_set_data; osmo_iofd_set_ioops; osmo_iofd_set_priv_nr; diff --git a/src/core/osmo_io.c b/src/core/osmo_io.c index 62d2362..2f3af06 100644 --- a/src/core/osmo_io.c +++ b/src/core/osmo_io.c @@ -816,6 +816,14 @@ iofd->io_read_buffers = 1; iofd->io_write_buffers = 1;
+ if (osmo_iofd_ops.setup) { + int rc = osmo_iofd_ops.setup(iofd); + if (rc < 0) { + osmo_iofd_free(iofd); + return NULL; + } + } + return iofd; }
@@ -868,6 +876,35 @@ return 0; }
+/*! Set the number of SQEs that are submitted to an io_unring before completion is received. + * + * If the io_using backend is selected, this API function can be used to tell the osmo_io process how many SQE are + * scheduled in advance. + * The feature is currently supports scheduling read SQEs only. + * + * \param[in] iofd the iofd file descriptor + * \param[in] op the osmo_io_op (read) to set the number of IO buffers for + * \param[in] number of scheduled SQEs + * \returns zero on success, a negative value on error + */ +int osmo_iofd_set_sqes(struct osmo_io_fd *iofd, enum osmo_io_op op, uint8_t sqes) +{ + if (iofd->mode != OSMO_IO_FD_MODE_READ_WRITE) + return -EINVAL; + + if (g_io_backend != OSMO_IO_BACKEND_IO_URING) + return -EINVAL; + + if (op != OSMO_IO_OP_READ) + return -EINVAL; + + if (sqes < 1 || sqes > IOFD_MSGHDR_MAX_READ_SQES) + return -EINVAL; + + iofd->u.uring.num_read_sqes = sqes; + return 0; +} + /*! Register the osmo_io_fd for active I/O. * * Calling this function will register a previously initialized osmo_io_fd for performing I/O. diff --git a/src/core/osmo_io_internal.h b/src/core/osmo_io_internal.h index d28e3d7..f983b77 100644 --- a/src/core/osmo_io_internal.h +++ b/src/core/osmo_io_internal.h @@ -25,6 +25,7 @@ #endif
struct iofd_backend_ops { + int (*setup)(struct osmo_io_fd *iofd); int (*register_fd)(struct osmo_io_fd *iofd); int (*unregister_fd)(struct osmo_io_fd *iofd); int (*close)(struct osmo_io_fd *iofd); @@ -49,6 +50,8 @@
#define IOFD_FLAG_ISSET(iofd, flag) ((iofd)->flags & (flag))
+#define IOFD_MSGHDR_MAX_READ_SQES 32 + struct osmo_io_fd { /*! linked list for internal management */ struct llist_head list; @@ -107,7 +110,12 @@ struct { bool read_enabled; bool write_enabled; - void *read_msghdr; + /*! requested number of simultaniously submitted read SQEs */ + uint8_t num_read_sqes; + /*! array of simultaneously submitted read SQEs */ + void *read_msghdr[IOFD_MSGHDR_MAX_READ_SQES]; + /*! current number of simultaneously submitted read SQEs */ + uint8_t reads_submitted; void *write_msghdr; /* TODO: index into array of registered fd's? */ /* osmo_fd for non-blocking connect handling */ diff --git a/src/core/osmo_io_uring.c b/src/core/osmo_io_uring.c index 018bdab..1eaa5e6 100644 --- a/src/core/osmo_io_uring.c +++ b/src/core/osmo_io_uring.c @@ -56,9 +56,13 @@
#define OSMO_IO_URING_BATCH "LIBOSMO_IO_URING_BATCH"
+#define OSMO_IO_URING_READ_SQE "LIBOSMO_IO_URING_READ_SQE" + bool g_io_uring_batch = false; bool g_io_uring_submit_needed = false;
+static int g_io_uring_read_sqes = 1; + struct osmo_io_uring { struct osmo_fd event_ofd; struct io_uring ring; @@ -103,6 +107,15 @@ if (rc < 0) osmo_panic("failure during io_uring_queue_init(): %s\n", strerror(-rc));
+ if ((env = getenv(OSMO_IO_URING_READ_SQE))) { + g_io_uring_read_sqes = atoi(env); + if (g_io_uring_read_sqes < 1 || g_io_uring_read_sqes > IOFD_MSGHDR_MAX_READ_SQES) { + fprintf(stderr, "Invalid osmo_uring read SQEs requested: "%s"\n Allowed range: 1..%d\n", + env, IOFD_MSGHDR_MAX_READ_SQES); + exit(1); + } + } + rc = eventfd(0, 0); if (rc < 0) { io_uring_queue_exit(&g_ring.ring); @@ -134,7 +147,7 @@ g_io_uring_submit_needed = true; }
-static void iofd_uring_submit_recv(struct osmo_io_fd *iofd, enum iofd_msg_action action) +static void iofd_uring_submit_recv_sqe(struct osmo_io_fd *iofd, enum iofd_msg_action action) { struct msgb *msg; struct iofd_msghdr *msghdr; @@ -196,15 +209,37 @@
iofd_io_uring_submit();
- /* NOTE: This only works if we have one read per fd */ - iofd->u.uring.read_msghdr = msghdr; + iofd->u.uring.read_msghdr[iofd->u.uring.reads_submitted] = msghdr; + iofd->u.uring.reads_submitted++; +} + +static void iofd_uring_submit_recv(struct osmo_io_fd *iofd, enum iofd_msg_action action) +{ + /* Submit more read SQEs in advance, if requested. */ + while (iofd->u.uring.reads_submitted < iofd->u.uring.num_read_sqes) + iofd_uring_submit_recv_sqe(iofd, action); }
/*! completion call-back for READ/RECVFROM */ static void iofd_uring_handle_recv(struct iofd_msghdr *msghdr, int rc) { struct osmo_io_fd *iofd = msghdr->iofd; - uint8_t idx; + uint8_t idx, i; + + /* Find which read_msghdr is completed and remove from list. */ + for (idx = 0; idx < iofd->u.uring.reads_submitted; idx++) { + if (iofd->u.uring.read_msghdr[idx] == msghdr) + break; + } + if (idx == iofd->u.uring.reads_submitted) { + LOGP(DLIO, LOGL_FATAL, "Read SQE completion, but msghdr not found, please fix!\n"); + return; + } + /* Remove entry at idx. */ + iofd->u.uring.reads_submitted--; + for (i = idx; i < iofd->u.uring.reads_submitted; i++) + iofd->u.uring.read_msghdr[i] = iofd->u.uring.read_msghdr[i + 1]; + iofd->u.uring.read_msghdr[i] = NULL;
for (idx = 0; idx < msghdr->io_len; idx++) { struct msgb *msg = msghdr->msg[idx]; @@ -238,9 +273,6 @@
if (iofd->u.uring.read_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED)) iofd_uring_submit_recv(iofd, msghdr->action); - else - iofd->u.uring.read_msghdr = NULL; -
iofd_msghdr_free(msghdr); } @@ -300,7 +332,7 @@
IOFD_FLAG_UNSET(iofd, IOFD_FLAG_IN_CALLBACK);
- if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) && !iofd->u.uring.read_msghdr && !iofd->u.uring.write_msghdr) + if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) && !iofd->u.uring.reads_submitted && !iofd->u.uring.write_msghdr) talloc_free(iofd); }
@@ -410,11 +442,18 @@ if (iofd->u.uring.read_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED)) iofd_uring_read_enable(iofd);
- if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) && !iofd->u.uring.read_msghdr && !iofd->u.uring.write_msghdr) + if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) && !iofd->u.uring.reads_submitted && !iofd->u.uring.write_msghdr) talloc_free(iofd); return 0; }
+static int iofd_uring_setup(struct osmo_io_fd *iofd) +{ + iofd->u.uring.num_read_sqes = g_io_uring_read_sqes; + + return 0; +} + static int iofd_uring_register(struct osmo_io_fd *iofd) { if (iofd->mode != OSMO_IO_FD_MODE_RECVMSG_SENDMSG) @@ -440,18 +479,20 @@ { struct io_uring_sqe *sqe; struct iofd_msghdr *msghdr; + uint8_t idx;
- if (iofd->u.uring.read_msghdr) { - msghdr = iofd->u.uring.read_msghdr; + for (idx = 0; idx < iofd->u.uring.reads_submitted; idx++) { + msghdr = iofd->u.uring.read_msghdr[idx]; + iofd->u.uring.read_msghdr[idx] = NULL; sqe = io_uring_get_sqe(&g_ring.ring); OSMO_ASSERT(sqe != NULL); io_uring_sqe_set_data(sqe, NULL); LOGPIO(iofd, LOGL_DEBUG, "Cancelling read\n"); - iofd->u.uring.read_msghdr = NULL; talloc_steal(OTC_GLOBAL, msghdr); msghdr->iofd = NULL; io_uring_prep_cancel(sqe, msghdr, 0); } + iofd->u.uring.reads_submitted = 0;
if (iofd->u.uring.write_msghdr) { msghdr = iofd->u.uring.write_msghdr; @@ -533,7 +574,7 @@ { iofd->u.uring.read_enabled = true;
- if (iofd->u.uring.read_msghdr) + if (iofd->u.uring.reads_submitted) return;
/* This function is called again, once the socket is connected. */ @@ -580,6 +621,7 @@ }
const struct iofd_backend_ops iofd_uring_ops = { + .setup = iofd_uring_setup, .register_fd = iofd_uring_register, .unregister_fd = iofd_uring_unregister, .close = iofd_uring_close,