jolly has uploaded this change for review.

View Change

Remove old empty io_uring

A previous patch creates a new io_uring, if it becomes too small to
store all SQEs. When all SQEs of the old ring are completed, the old
ring will be destroyed.

A counter is incremented whenever an SQE is submitted to an io_uring.
The counter is decrements whenever a CQE is received and handled. This
counter will determine when a ring is empty and can be destroyed.

Related: OS#6705
Change-Id: Id2d2a0400ad442198c684ea0ead4eaeaead4c53d
---
M src/core/osmo_io_uring.c
1 file changed, 27 insertions(+), 4 deletions(-)

git pull ssh://gerrit.osmocom.org:29418/libosmocore refs/changes/55/40855/1
diff --git a/src/core/osmo_io_uring.c b/src/core/osmo_io_uring.c
index 1a817dd..a867968 100644
--- a/src/core/osmo_io_uring.c
+++ b/src/core/osmo_io_uring.c
@@ -65,7 +65,7 @@
bool g_io_uring_batch = false;
bool g_io_uring_submit_needed = false;

-static unsigned int g_io_uring_size = IOFD_URING_INITIAL_SIZE;
+static int g_io_uring_size = IOFD_URING_INITIAL_SIZE;

static int g_io_uring_read_sqes = 1;

@@ -73,6 +73,7 @@
struct osmo_fd event_ofd;
struct io_uring ring;
struct llist_head cancel_queue;
+ unsigned int in_flight;
};

static __thread struct osmo_io_uring *g_ring = NULL;
@@ -167,13 +168,27 @@
}
}

+static void osmo_iofd_uring_exit(struct osmo_io_uring *ring)
+{
+ LOGP(DLIO, LOGL_DEBUG, "Old empty io_uring will be destroyed.");
+
+ io_uring_queue_exit(&ring->ring);
+
+ osmo_fd_unregister(&ring->event_ofd);
+ close(ring->event_ofd.fd);
+
+ talloc_free(ring);
+}
+
static struct io_uring_sqe *iofd_uring_get_sqe(bool current_ring)
{
struct io_uring_sqe *sqe;

sqe = io_uring_get_sqe(&g_ring->ring);
- if (sqe)
+ if (sqe) {
+ g_ring->in_flight++;
return sqe;
+ }

if (g_io_uring_size < IOFD_URING_MAXIMUM_SIZE) {
LOGP(DLIO, LOGL_NOTICE, "io_uring too small to handle all SQEs with its current size of %d. "
@@ -197,6 +212,7 @@

sqe = io_uring_get_sqe(&g_ring->ring);
OSMO_ASSERT(sqe);
+ g_ring->in_flight++;
return sqe;
}

@@ -424,6 +440,7 @@
struct osmo_io_uring *orig_ring = container_of(ring, struct osmo_io_uring, ring);

while (io_uring_peek_cqe(ring, &cqe) == 0) {
+ orig_ring->in_flight--;

msghdr = io_uring_cqe_get_data(cqe);
if (!msghdr) {
@@ -448,13 +465,18 @@
}

/* If there are unsubmitted cancel SQEs, try to queue them now. */
- if (OSMO_LIKELY(llist_empty(&orig_ring->cancel_queue)))
+ if (OSMO_LIKELY(llist_empty(&orig_ring->cancel_queue))) {
+ /* Old ring is empty, remove it. */
+ if (OSMO_UNLIKELY(orig_ring != g_ring && orig_ring->in_flight == 0))
+ osmo_iofd_uring_exit(orig_ring);
return;
+ }
llist_for_each_entry_safe(msghdr, msghdr2, &orig_ring->cancel_queue, list) {
struct io_uring_sqe *sqe;
sqe = io_uring_get_sqe(&orig_ring->ring);
if (!sqe)
break;
+ orig_ring->in_flight++;
io_uring_sqe_set_data(sqe, NULL);
LOGP(DLIO, LOGL_DEBUG, "Cancelling queued read/write\n");
io_uring_prep_cancel(sqe, msghdr, 0);
@@ -462,7 +484,6 @@
msghdr->in_cancel_queue = false;
}
io_uring_submit(&orig_ring->ring);
-
}

/*! will submit the next to-be-transmitted message for given iofd */
@@ -584,6 +605,7 @@
/* If the submission queue is full, use cancel queue. We cannot cancel SQEs on the new ring. */
sqe = io_uring_get_sqe(&ring->ring);
if (sqe) {
+ ring->in_flight++;
io_uring_sqe_set_data(sqe, NULL);
LOGPIO(iofd, LOGL_DEBUG, "Cancelling read\n");
io_uring_prep_cancel(sqe, msghdr, 0);
@@ -614,6 +636,7 @@
/* If the submission queue is full, use cancel queue. We cannot cancel SQEs on the new ring. */
sqe = io_uring_get_sqe(&ring->ring);
if (sqe) {
+ ring->in_flight++;
io_uring_sqe_set_data(sqe, NULL);
LOGPIO(iofd, LOGL_DEBUG, "Cancelling write\n");
io_uring_prep_cancel(sqe, msghdr, 0);

To view, visit change 40855. To unsubscribe, or for help writing mail filters, visit settings.

Gerrit-MessageType: newchange
Gerrit-Project: libosmocore
Gerrit-Branch: master
Gerrit-Change-Id: Id2d2a0400ad442198c684ea0ead4eaeaead4c53d
Gerrit-Change-Number: 40855
Gerrit-PatchSet: 1
Gerrit-Owner: jolly <andreas@eversberg.eu>