laforge submitted this change.

View Change

Approvals: Jenkins Builder: Verified pespin: Looks good to me, but someone else must approve laforge: Looks good to me, approved
Automatically increase io_uring, if too small.

The ring may be too small to store all SQEs before the kernel can
handle them. If this happens, a new ring is allocated with twice of the
size of the old ring. The old ring will not be destroyed, as it still
contains uncompleted elements. Some of them may never be completed.

A pointer to the current ring will be stored within the msghdr
structure. It is used when cancelling an SQE. The cancellation must be
performed in the same ring where it was created.

It is quite unlikely that the old ring cannot store the cancellation
SQE. If this happens, the cancellation is queued and submitted, once
the ring can store it.

The old ring will not be removed, because there is currently no counter
to determine when all submissions are completed.

Related: OS#6705
Change-Id: Id9230146acc8d54bfd44834e783c31b37bd64bca
---
M src/core/osmo_io_internal.h
M src/core/osmo_io_uring.c
2 files changed, 148 insertions(+), 39 deletions(-)

diff --git a/src/core/osmo_io_internal.h b/src/core/osmo_io_internal.h
index f983b77..bf5bb76 100644
--- a/src/core/osmo_io_internal.h
+++ b/src/core/osmo_io_internal.h
@@ -114,9 +114,13 @@
uint8_t num_read_sqes;
/*! array of simultaneously submitted read SQEs */
void *read_msghdr[IOFD_MSGHDR_MAX_READ_SQES];
+ /*! ring the read SQEs have been submitted to */
+ struct io_uring *read_ring;
/*! current number of simultaneously submitted read SQEs */
uint8_t reads_submitted;
void *write_msghdr;
+ /*! ring the write SQE has been submitted to */
+ struct io_uring *write_ring;
/* TODO: index into array of registered fd's? */
/* osmo_fd for non-blocking connect handling */
struct osmo_fd connect_ofd;
@@ -157,6 +161,9 @@
/*! I/O file descriptor on which we perform this I/O operation */
struct osmo_io_fd *iofd;

+ /*! msghdr is in the cancel_queue list */
+ bool in_cancel_queue;
+
/*! control message buffer for passing sctp_sndrcvinfo along */
char cmsg[0]; /* size is determined by iofd->cmsg_size on recvmsg, and by mcghdr->msg_controllen on sendmsg */
};
diff --git a/src/core/osmo_io_uring.c b/src/core/osmo_io_uring.c
index 48f6f0d..12caa77 100644
--- a/src/core/osmo_io_uring.c
+++ b/src/core/osmo_io_uring.c
@@ -35,6 +35,7 @@
#include <string.h>
#include <stdbool.h>
#include <errno.h>
+#include <limits.h>

#include <netinet/in.h>
#include <netinet/sctp.h>
@@ -72,12 +73,15 @@
struct osmo_io_uring {
struct osmo_fd event_ofd;
struct io_uring ring;
+ struct llist_head cancel_queue;
};

-static __thread struct osmo_io_uring g_ring;
+static __thread struct osmo_io_uring *g_ring = NULL;

static void iofd_uring_cqe(struct io_uring *ring);

+void osmo_io_uring_submit(void);
+
/*! read call-back for eventfd notifying us if entries are in the completion queue */
static int iofd_uring_poll_cb(struct osmo_fd *ofd, unsigned int what)
{
@@ -109,7 +113,7 @@
if ((env = getenv(OSMO_IO_URING_BATCH)))
g_io_uring_batch = true;

- if ((env = getenv(OSMO_IO_URING_INITIAL_SIZE))) {
+ if (!g_ring && (env = getenv(OSMO_IO_URING_INITIAL_SIZE))) {
int env_value;
rc = osmo_str_to_int(&env_value, env, 10, 1, IOFD_URING_MAXIMUM_SIZE);
if (rc < 0) {
@@ -124,7 +128,10 @@
g_io_uring_size = env_value;
}

- rc = io_uring_queue_init(g_io_uring_size, &g_ring.ring, 0);
+ g_ring = talloc_zero(OTC_GLOBAL, struct osmo_io_uring);
+ INIT_LLIST_HEAD(&g_ring->cancel_queue);
+
+ rc = io_uring_queue_init(g_io_uring_size, &g_ring->ring, 0);
if (rc < 0)
osmo_panic("failure during io_uring_queue_init(): %s\n", strerror(-rc));

@@ -139,42 +146,88 @@

rc = eventfd(0, 0);
if (rc < 0) {
- io_uring_queue_exit(&g_ring.ring);
+ io_uring_queue_exit(&g_ring->ring);
osmo_panic("failure creating eventfd(0, 0) for io_uring: %s\n", strerror(-rc));
}
evfd = rc;

- osmo_fd_setup(&g_ring.event_ofd, evfd, OSMO_FD_READ, iofd_uring_poll_cb, &g_ring.ring, 0);
- rc = osmo_fd_register(&g_ring.event_ofd);
+ osmo_fd_setup(&g_ring->event_ofd, evfd, OSMO_FD_READ, iofd_uring_poll_cb, &g_ring->ring, 0);
+ rc = osmo_fd_register(&g_ring->event_ofd);
if (rc < 0) {
close(evfd);
- io_uring_queue_exit(&g_ring.ring);
+ io_uring_queue_exit(&g_ring->ring);
osmo_panic("failure registering io_uring-eventfd as osmo_fd: %d\n", rc);
}
- rc = io_uring_register_eventfd(&g_ring.ring, evfd);
+ rc = io_uring_register_eventfd(&g_ring->ring, evfd);
if (rc < 0) {
- osmo_fd_unregister(&g_ring.event_ofd);
+ osmo_fd_unregister(&g_ring->event_ofd);
close(evfd);
- io_uring_queue_exit(&g_ring.ring);
+ io_uring_queue_exit(&g_ring->ring);
osmo_panic("failure registering eventfd with io_uring: %s\n", strerror(-rc));
}
}

+static struct io_uring_sqe *iofd_uring_get_sqe(struct osmo_io_fd *iofd, bool read)
+{
+ struct io_uring_sqe *sqe;
+
+ /* All subsequent read SQEs must be on the same ring. */
+ if (read && iofd->u.uring.reads_submitted > 0 && iofd->u.uring.read_ring != &g_ring->ring)
+ return NULL;
+
+ sqe = io_uring_get_sqe(&g_ring->ring);
+ if (sqe)
+ return sqe;
+
+ /* The current ring is full, subsequent read SQEs on different ring are not allowed. */
+ if (read && iofd->u.uring.reads_submitted > 0)
+ return NULL;
+
+ if (g_io_uring_size < IOFD_URING_MAXIMUM_SIZE) {
+ LOGP(DLIO, LOGL_NOTICE, "io_uring too small to handle all SQEs with its current size of %d. "
+ "Increasing io_uring size to %d.\n", g_io_uring_size, g_io_uring_size * 2);
+ g_io_uring_size <<= 1;
+ } else {
+ LOGP(DLIO, LOGL_NOTICE, "io_uring too small to handle all SQEs with its maximum size of %d. "
+ "adding another one.\n", g_io_uring_size);
+ }
+
+ /* Submit all SQEs of current ring and create a new ring, if needed.
+ * The old ring will be kept, as there are uncompleted submissions.
+ * TODO: Destroy old ring, once all submissions are completed. */
+ osmo_io_uring_submit();
+
+ osmo_iofd_uring_init();
+
+ sqe = io_uring_get_sqe(&g_ring->ring);
+ OSMO_ASSERT(sqe);
+ return sqe;
+}
+
static inline void iofd_io_uring_submit(void)
{
if (OSMO_LIKELY(!g_io_uring_batch))
- io_uring_submit(&g_ring.ring);
+ io_uring_submit(&g_ring->ring);
else
g_io_uring_submit_needed = true;
}

-static void iofd_uring_submit_recv_sqe(struct osmo_io_fd *iofd, enum iofd_msg_action action)
+static inline int iofd_uring_submit_recv_sqe(struct osmo_io_fd *iofd, enum iofd_msg_action action)
{
struct msgb *msg;
struct iofd_msghdr *msghdr;
struct io_uring_sqe *sqe;
uint8_t idx;

+ /* Tell iofd_uring_get_sqe() not to allocate a new ring, if we want to enqueue multiple read SQEs. */
+ sqe = iofd_uring_get_sqe(iofd, true);
+ if (!sqe) {
+ if (iofd->u.uring.reads_submitted > 0)
+ return -EINVAL;
+ LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
+ OSMO_ASSERT(0);
+ }
+
msg = iofd_msgb_alloc(iofd);
if (!msg) {
LOGPIO(iofd, LOGL_ERROR, "Could not allocate msgb for reading\n");
@@ -209,12 +262,6 @@
OSMO_ASSERT(0);
}

- sqe = io_uring_get_sqe(&g_ring.ring);
- if (!sqe) {
- LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
- OSMO_ASSERT(0);
- }
-
switch (action) {
case IOFD_ACT_READ:
io_uring_prep_readv(sqe, iofd->fd, msghdr->iov, msghdr->hdr.msg_iovlen, -1);
@@ -232,13 +279,22 @@

iofd->u.uring.read_msghdr[iofd->u.uring.reads_submitted] = msghdr;
iofd->u.uring.reads_submitted++;
+ iofd->u.uring.read_ring = &g_ring->ring;
+
+ return 0;
}

static void iofd_uring_submit_recv(struct osmo_io_fd *iofd, enum iofd_msg_action action)
{
+ int rc;
+
/* Submit more read SQEs in advance, if requested. */
- while (iofd->u.uring.reads_submitted < iofd->u.uring.num_read_sqes)
- iofd_uring_submit_recv_sqe(iofd, action);
+ while (iofd->u.uring.reads_submitted < iofd->u.uring.num_read_sqes) {
+ rc = iofd_uring_submit_recv_sqe(iofd, action);
+ /* Stop, if we cannot enqueue multiple read SQEs in the same ring. */
+ if (rc < 0)
+ break;
+ }
}

/*! completion call-back for READ/RECVFROM */
@@ -311,8 +367,10 @@
* This callback function might close iofd, leading to the potential freeing of iofd->u.uring.write_msghdr if
* still attached. Since iofd_handle_send_completion() frees msghdr at the end of the function, detaching
* msghdr here prevents a double-free bug. */
- if (iofd->u.uring.write_msghdr == msghdr)
+ if (iofd->u.uring.write_msghdr == msghdr) {
iofd->u.uring.write_msghdr = NULL;
+ iofd->u.uring.write_ring = NULL;
+ }

if (OSMO_UNLIKELY(IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))) {
for (int idx = 0; idx < msghdr->io_len; idx++) {
@@ -362,7 +420,8 @@
{
int rc;
struct io_uring_cqe *cqe;
- struct iofd_msghdr *msghdr;
+ struct iofd_msghdr *msghdr, *msghdr2;
+ struct osmo_io_uring *orig_ring = container_of(ring, struct osmo_io_uring, ring);

while (io_uring_peek_cqe(ring, &cqe) == 0) {

@@ -374,6 +433,8 @@
}
if (!msghdr->iofd) {
io_uring_cqe_seen(ring, cqe);
+ if (msghdr->in_cancel_queue)
+ llist_del(&msghdr->list);
iofd_msghdr_free(msghdr);
continue;
}
@@ -385,6 +446,23 @@
iofd_uring_handle_completion(msghdr, rc);

}
+
+ /* If there are unsubmitted cancel SQEs, try to queue them now. */
+ if (OSMO_LIKELY(llist_empty(&orig_ring->cancel_queue)))
+ return;
+ llist_for_each_entry_safe(msghdr, msghdr2, &orig_ring->cancel_queue, list) {
+ struct io_uring_sqe *sqe;
+ sqe = io_uring_get_sqe(&orig_ring->ring);
+ if (!sqe)
+ break;
+ io_uring_sqe_set_data(sqe, NULL);
+ LOGP(DLIO, LOGL_DEBUG, "Cancelling queued read/write\n");
+ io_uring_prep_cancel(sqe, msghdr, 0);
+ llist_del(&msghdr->list);
+ msghdr->in_cancel_queue = false;
+ }
+ io_uring_submit(&orig_ring->ring);
+
}

/*! will submit the next to-be-transmitted message for given iofd */
@@ -397,7 +475,7 @@
if (!msghdr)
return -ENODATA;

- sqe = io_uring_get_sqe(&g_ring.ring);
+ sqe = iofd_uring_get_sqe(iofd, false);
if (!sqe) {
LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
OSMO_ASSERT(0);
@@ -420,6 +498,7 @@
iofd_io_uring_submit();

iofd->u.uring.write_msghdr = msghdr;
+ iofd->u.uring.write_ring = &g_ring->ring;

return 0;
}
@@ -503,36 +582,58 @@
uint8_t idx;

for (idx = 0; idx < iofd->u.uring.reads_submitted; idx++) {
+ struct osmo_io_uring *ring = container_of(iofd->u.uring.read_ring, struct osmo_io_uring, ring);
msghdr = iofd->u.uring.read_msghdr[idx];
iofd->u.uring.read_msghdr[idx] = NULL;
- sqe = io_uring_get_sqe(&g_ring.ring);
- OSMO_ASSERT(sqe != NULL);
- io_uring_sqe_set_data(sqe, NULL);
- LOGPIO(iofd, LOGL_DEBUG, "Cancelling read\n");
+ /* Submit SQEs of the current ring, if needed. */
+ if (&ring->ring == &g_ring->ring)
+ osmo_io_uring_submit();
+ /* If the submission queue is full, use cancel queue. We cannot cancel SQEs on the new ring. */
+ sqe = io_uring_get_sqe(&ring->ring);
+ if (sqe) {
+ io_uring_sqe_set_data(sqe, NULL);
+ LOGPIO(iofd, LOGL_DEBUG, "Cancelling read\n");
+ io_uring_prep_cancel(sqe, msghdr, 0);
+ io_uring_submit(&ring->ring);
+ } else {
+ llist_add_tail(&msghdr->list, &ring->cancel_queue);
+ msghdr->in_cancel_queue = true;
+ }
talloc_steal(OTC_GLOBAL, msghdr);
msghdr->iofd = NULL;
- io_uring_prep_cancel(sqe, msghdr, 0);
}
- iofd->u.uring.reads_submitted = 0;
+ if (iofd->u.uring.reads_submitted) {
+ iofd->u.uring.read_ring = NULL;
+ iofd->u.uring.reads_submitted = 0;
+ }

if (iofd->u.uring.write_msghdr) {
+ struct osmo_io_uring *ring = container_of(iofd->u.uring.write_ring, struct osmo_io_uring, ring);
msghdr = iofd->u.uring.write_msghdr;
- sqe = io_uring_get_sqe(&g_ring.ring);
- OSMO_ASSERT(sqe != NULL);
- io_uring_sqe_set_data(sqe, NULL);
- LOGPIO(iofd, LOGL_DEBUG, "Cancelling write\n");
iofd->u.uring.write_msghdr = NULL;
- talloc_steal(OTC_GLOBAL, msghdr);
for (int idx = 0; idx < msghdr->io_len; idx++) {
msgb_free(msghdr->msg[idx]);
msghdr->msg[idx] = NULL;
}
+ /* Submit SQEs of the current ring, if needed. */
+ if (&ring->ring == &g_ring->ring)
+ osmo_io_uring_submit();
+ /* If the submission queue is full, use cancel queue. We cannot cancel SQEs on the new ring. */
+ sqe = io_uring_get_sqe(&ring->ring);
+ if (sqe) {
+ io_uring_sqe_set_data(sqe, NULL);
+ LOGPIO(iofd, LOGL_DEBUG, "Cancelling write\n");
+ io_uring_prep_cancel(sqe, msghdr, 0);
+ io_uring_submit(&ring->ring);
+ } else {
+ llist_add_tail(&msghdr->list, &ring->cancel_queue);
+ msghdr->in_cancel_queue = true;
+ }
+ talloc_steal(OTC_GLOBAL, msghdr);
msghdr->iofd = NULL;
- io_uring_prep_cancel(sqe, msghdr, 0);
+ iofd->u.uring.write_ring = NULL;
}

- iofd_io_uring_submit();
-
if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_NOTIFY_CONNECTED)) {
osmo_fd_unregister(&iofd->u.uring.connect_ofd);
IOFD_FLAG_UNSET(iofd, IOFD_FLAG_NOTIFY_CONNECTED);
@@ -572,7 +673,7 @@
msghdr->iov[0].iov_base = msgb_data(msg);
msghdr->iov[0].iov_len = msgb_length(msg);

- sqe = io_uring_get_sqe(&g_ring.ring);
+ sqe = iofd_uring_get_sqe(iofd, false);
if (!sqe) {
LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
OSMO_ASSERT(0);
@@ -583,6 +684,7 @@
iofd_io_uring_submit();

iofd->u.uring.write_msghdr = msghdr;
+ iofd->u.uring.write_ring = &g_ring->ring;
}
}

@@ -656,7 +758,7 @@
void osmo_io_uring_submit(void)
{
if (OSMO_LIKELY(g_io_uring_submit_needed)) {
- io_uring_submit(&g_ring.ring);
+ io_uring_submit(&g_ring->ring);
g_io_uring_submit_needed = false;
}
}

To view, visit change 40725. To unsubscribe, or for help writing mail filters, visit settings.

Gerrit-MessageType: merged
Gerrit-Project: libosmocore
Gerrit-Branch: master
Gerrit-Change-Id: Id9230146acc8d54bfd44834e783c31b37bd64bca
Gerrit-Change-Number: 40725
Gerrit-PatchSet: 15
Gerrit-Owner: jolly <andreas@eversberg.eu>
Gerrit-Reviewer: Jenkins Builder
Gerrit-Reviewer: laforge <laforge@osmocom.org>
Gerrit-Reviewer: pespin <pespin@sysmocom.de>