laforge has submitted this change. (
https://gerrit.osmocom.org/c/libosmocore/+/40494?usp=email )
Change subject: Send multiple read/recvfrom/recvmsg SQEs in advance
......................................................................
Send multiple read/recvfrom/recvmsg SQEs in advance
Multiple read or recvfrom operations can be submitted via SQEs when
using io_uring. This allows reading multiple packet / more data between
calls of osmo_select_main() the main loop.
Having more than one SQE submitted is optional and the number can be
controlled via environment variable.
Related: OS#6705
Change-Id: Id50a8900fa2fe6de553e5025feae7e1e8d501e30
---
M include/osmocom/core/osmo_io.h
M src/core/libosmocore.map
M src/core/osmo_io.c
M src/core/osmo_io_internal.h
M src/core/osmo_io_uring.c
5 files changed, 103 insertions(+), 14 deletions(-)
Approvals:
pespin: Looks good to me, approved
laforge: Looks good to me, but someone else must approve
Jenkins Builder: Verified
diff --git a/include/osmocom/core/osmo_io.h b/include/osmocom/core/osmo_io.h
index 0b3dc83..0733240 100644
--- a/include/osmocom/core/osmo_io.h
+++ b/include/osmocom/core/osmo_io.h
@@ -213,6 +213,7 @@
enum osmo_io_fd_mode mode, const struct osmo_io_ops *ioops, void *data);
int osmo_iofd_set_cmsg_size(struct osmo_io_fd *iofd, size_t cmsg_size);
int osmo_iofd_set_io_buffers(struct osmo_io_fd *iofd, enum osmo_io_op op, uint8_t
buffers);
+int osmo_iofd_set_sqes(struct osmo_io_fd *iofd, enum osmo_io_op op, uint8_t sqes);
int osmo_iofd_register(struct osmo_io_fd *iofd, int fd);
int osmo_iofd_unregister(struct osmo_io_fd *iofd);
unsigned int osmo_iofd_txqueue_len(struct osmo_io_fd *iofd);
diff --git a/src/core/libosmocore.map b/src/core/libosmocore.map
index 239597c..198b31e 100644
--- a/src/core/libosmocore.map
+++ b/src/core/libosmocore.map
@@ -276,6 +276,7 @@
osmo_iofd_set_alloc_info;
osmo_iofd_set_cmsg_size;
osmo_iofd_set_io_buffers;
+osmo_iofd_set_sqes;
osmo_iofd_set_data;
osmo_iofd_set_ioops;
osmo_iofd_set_priv_nr;
diff --git a/src/core/osmo_io.c b/src/core/osmo_io.c
index 62d2362..2f3af06 100644
--- a/src/core/osmo_io.c
+++ b/src/core/osmo_io.c
@@ -816,6 +816,14 @@
iofd->io_read_buffers = 1;
iofd->io_write_buffers = 1;
+ if (osmo_iofd_ops.setup) {
+ int rc = osmo_iofd_ops.setup(iofd);
+ if (rc < 0) {
+ osmo_iofd_free(iofd);
+ return NULL;
+ }
+ }
+
return iofd;
}
@@ -868,6 +876,35 @@
return 0;
}
+/*! Set the number of SQEs that are submitted to an io_unring before completion is
received.
+ *
+ * If the io_using backend is selected, this API function can be used to tell the
osmo_io process how many SQE are
+ * scheduled in advance.
+ * The feature is currently supports scheduling read SQEs only.
+ *
+ * \param[in] iofd the iofd file descriptor
+ * \param[in] op the osmo_io_op (read) to set the number of IO buffers for
+ * \param[in] number of scheduled SQEs
+ * \returns zero on success, a negative value on error
+ */
+int osmo_iofd_set_sqes(struct osmo_io_fd *iofd, enum osmo_io_op op, uint8_t sqes)
+{
+ if (iofd->mode != OSMO_IO_FD_MODE_READ_WRITE)
+ return -EINVAL;
+
+ if (g_io_backend != OSMO_IO_BACKEND_IO_URING)
+ return -EINVAL;
+
+ if (op != OSMO_IO_OP_READ)
+ return -EINVAL;
+
+ if (sqes < 1 || sqes > IOFD_MSGHDR_MAX_READ_SQES)
+ return -EINVAL;
+
+ iofd->u.uring.num_read_sqes = sqes;
+ return 0;
+}
+
/*! Register the osmo_io_fd for active I/O.
*
* Calling this function will register a previously initialized osmo_io_fd for
performing I/O.
diff --git a/src/core/osmo_io_internal.h b/src/core/osmo_io_internal.h
index d28e3d7..f983b77 100644
--- a/src/core/osmo_io_internal.h
+++ b/src/core/osmo_io_internal.h
@@ -25,6 +25,7 @@
#endif
struct iofd_backend_ops {
+ int (*setup)(struct osmo_io_fd *iofd);
int (*register_fd)(struct osmo_io_fd *iofd);
int (*unregister_fd)(struct osmo_io_fd *iofd);
int (*close)(struct osmo_io_fd *iofd);
@@ -49,6 +50,8 @@
#define IOFD_FLAG_ISSET(iofd, flag) ((iofd)->flags & (flag))
+#define IOFD_MSGHDR_MAX_READ_SQES 32
+
struct osmo_io_fd {
/*! linked list for internal management */
struct llist_head list;
@@ -107,7 +110,12 @@
struct {
bool read_enabled;
bool write_enabled;
- void *read_msghdr;
+ /*! requested number of simultaniously submitted read SQEs */
+ uint8_t num_read_sqes;
+ /*! array of simultaneously submitted read SQEs */
+ void *read_msghdr[IOFD_MSGHDR_MAX_READ_SQES];
+ /*! current number of simultaneously submitted read SQEs */
+ uint8_t reads_submitted;
void *write_msghdr;
/* TODO: index into array of registered fd's? */
/* osmo_fd for non-blocking connect handling */
diff --git a/src/core/osmo_io_uring.c b/src/core/osmo_io_uring.c
index 018bdab..1eaa5e6 100644
--- a/src/core/osmo_io_uring.c
+++ b/src/core/osmo_io_uring.c
@@ -56,9 +56,13 @@
#define OSMO_IO_URING_BATCH "LIBOSMO_IO_URING_BATCH"
+#define OSMO_IO_URING_READ_SQE "LIBOSMO_IO_URING_READ_SQE"
+
bool g_io_uring_batch = false;
bool g_io_uring_submit_needed = false;
+static int g_io_uring_read_sqes = 1;
+
struct osmo_io_uring {
struct osmo_fd event_ofd;
struct io_uring ring;
@@ -103,6 +107,15 @@
if (rc < 0)
osmo_panic("failure during io_uring_queue_init(): %s\n", strerror(-rc));
+ if ((env = getenv(OSMO_IO_URING_READ_SQE))) {
+ g_io_uring_read_sqes = atoi(env);
+ if (g_io_uring_read_sqes < 1 || g_io_uring_read_sqes > IOFD_MSGHDR_MAX_READ_SQES)
{
+ fprintf(stderr, "Invalid osmo_uring read SQEs requested: \"%s\"\n
Allowed range: 1..%d\n",
+ env, IOFD_MSGHDR_MAX_READ_SQES);
+ exit(1);
+ }
+ }
+
rc = eventfd(0, 0);
if (rc < 0) {
io_uring_queue_exit(&g_ring.ring);
@@ -134,7 +147,7 @@
g_io_uring_submit_needed = true;
}
-static void iofd_uring_submit_recv(struct osmo_io_fd *iofd, enum iofd_msg_action action)
+static void iofd_uring_submit_recv_sqe(struct osmo_io_fd *iofd, enum iofd_msg_action
action)
{
struct msgb *msg;
struct iofd_msghdr *msghdr;
@@ -196,15 +209,37 @@
iofd_io_uring_submit();
- /* NOTE: This only works if we have one read per fd */
- iofd->u.uring.read_msghdr = msghdr;
+ iofd->u.uring.read_msghdr[iofd->u.uring.reads_submitted] = msghdr;
+ iofd->u.uring.reads_submitted++;
+}
+
+static void iofd_uring_submit_recv(struct osmo_io_fd *iofd, enum iofd_msg_action action)
+{
+ /* Submit more read SQEs in advance, if requested. */
+ while (iofd->u.uring.reads_submitted < iofd->u.uring.num_read_sqes)
+ iofd_uring_submit_recv_sqe(iofd, action);
}
/*! completion call-back for READ/RECVFROM */
static void iofd_uring_handle_recv(struct iofd_msghdr *msghdr, int rc)
{
struct osmo_io_fd *iofd = msghdr->iofd;
- uint8_t idx;
+ uint8_t idx, i;
+
+ /* Find which read_msghdr is completed and remove from list. */
+ for (idx = 0; idx < iofd->u.uring.reads_submitted; idx++) {
+ if (iofd->u.uring.read_msghdr[idx] == msghdr)
+ break;
+ }
+ if (idx == iofd->u.uring.reads_submitted) {
+ LOGP(DLIO, LOGL_FATAL, "Read SQE completion, but msghdr not found, please
fix!\n");
+ return;
+ }
+ /* Remove entry at idx. */
+ iofd->u.uring.reads_submitted--;
+ for (i = idx; i < iofd->u.uring.reads_submitted; i++)
+ iofd->u.uring.read_msghdr[i] = iofd->u.uring.read_msghdr[i + 1];
+ iofd->u.uring.read_msghdr[i] = NULL;
for (idx = 0; idx < msghdr->io_len; idx++) {
struct msgb *msg = msghdr->msg[idx];
@@ -238,9 +273,6 @@
if (iofd->u.uring.read_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
iofd_uring_submit_recv(iofd, msghdr->action);
- else
- iofd->u.uring.read_msghdr = NULL;
-
iofd_msghdr_free(msghdr);
}
@@ -300,7 +332,7 @@
IOFD_FLAG_UNSET(iofd, IOFD_FLAG_IN_CALLBACK);
- if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) && !iofd->u.uring.read_msghdr
&& !iofd->u.uring.write_msghdr)
+ if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) &&
!iofd->u.uring.reads_submitted && !iofd->u.uring.write_msghdr)
talloc_free(iofd);
}
@@ -410,11 +442,18 @@
if (iofd->u.uring.read_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
iofd_uring_read_enable(iofd);
- if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) && !iofd->u.uring.read_msghdr
&& !iofd->u.uring.write_msghdr)
+ if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) &&
!iofd->u.uring.reads_submitted && !iofd->u.uring.write_msghdr)
talloc_free(iofd);
return 0;
}
+static int iofd_uring_setup(struct osmo_io_fd *iofd)
+{
+ iofd->u.uring.num_read_sqes = g_io_uring_read_sqes;
+
+ return 0;
+}
+
static int iofd_uring_register(struct osmo_io_fd *iofd)
{
if (iofd->mode != OSMO_IO_FD_MODE_RECVMSG_SENDMSG)
@@ -440,18 +479,20 @@
{
struct io_uring_sqe *sqe;
struct iofd_msghdr *msghdr;
+ uint8_t idx;
- if (iofd->u.uring.read_msghdr) {
- msghdr = iofd->u.uring.read_msghdr;
+ for (idx = 0; idx < iofd->u.uring.reads_submitted; idx++) {
+ msghdr = iofd->u.uring.read_msghdr[idx];
+ iofd->u.uring.read_msghdr[idx] = NULL;
sqe = io_uring_get_sqe(&g_ring.ring);
OSMO_ASSERT(sqe != NULL);
io_uring_sqe_set_data(sqe, NULL);
LOGPIO(iofd, LOGL_DEBUG, "Cancelling read\n");
- iofd->u.uring.read_msghdr = NULL;
talloc_steal(OTC_GLOBAL, msghdr);
msghdr->iofd = NULL;
io_uring_prep_cancel(sqe, msghdr, 0);
}
+ iofd->u.uring.reads_submitted = 0;
if (iofd->u.uring.write_msghdr) {
msghdr = iofd->u.uring.write_msghdr;
@@ -533,7 +574,7 @@
{
iofd->u.uring.read_enabled = true;
- if (iofd->u.uring.read_msghdr)
+ if (iofd->u.uring.reads_submitted)
return;
/* This function is called again, once the socket is connected. */
@@ -580,6 +621,7 @@
}
const struct iofd_backend_ops iofd_uring_ops = {
+ .setup = iofd_uring_setup,
.register_fd = iofd_uring_register,
.unregister_fd = iofd_uring_unregister,
.close = iofd_uring_close,
--
To view, visit
https://gerrit.osmocom.org/c/libosmocore/+/40494?usp=email
To unsubscribe, or for help writing mail filters, visit
https://gerrit.osmocom.org/settings?usp=email
Gerrit-MessageType: merged
Gerrit-Project: libosmocore
Gerrit-Branch: master
Gerrit-Change-Id: Id50a8900fa2fe6de553e5025feae7e1e8d501e30
Gerrit-Change-Number: 40494
Gerrit-PatchSet: 16
Gerrit-Owner: jolly <andreas(a)eversberg.eu>
Gerrit-Reviewer: Jenkins Builder
Gerrit-Reviewer: laforge <laforge(a)osmocom.org>
Gerrit-Reviewer: pespin <pespin(a)sysmocom.de>