jolly has uploaded this change for review. (
https://gerrit.osmocom.org/c/libosmocore/+/40725?usp=email )
Change subject: Automatically increase io_uring, if too small.
......................................................................
Automatically increase io_uring, if too small.
The ring may be too small to store all SQEs before the kernel can
handle them. If this happens, a new ring is allocated with twice of the
size of the old ring. The old ring will not be destroyed, as it still
contains uncompleted elements. Some of them may never be completed.
A pointer to the current ring will be stored within the msghdr
structure. It is used when cancelling an SQE. The cancellation must be
performed in the same ring where it was created.
It is quite unlikely that the old ring cannot store the cancellation
SQE. If this happens, the kernel is given some time to handle the SQEs.
The initial size of the ring can be changed via environment variable.
Related: OS#6705
Change-Id: Id9230146acc8d54bfd44834e783c31b37bd64bca
---
M src/core/osmo_io_internal.h
M src/core/osmo_io_uring.c
2 files changed, 63 insertions(+), 25 deletions(-)
git pull ssh://gerrit.osmocom.org:29418/libosmocore refs/changes/25/40725/1
diff --git a/src/core/osmo_io_internal.h b/src/core/osmo_io_internal.h
index 511e0f5..9254f6e 100644
--- a/src/core/osmo_io_internal.h
+++ b/src/core/osmo_io_internal.h
@@ -104,8 +104,10 @@
bool read_enabled;
bool write_enabled;
void *read_msghdr[IOFD_MSGHDR_READ_SQES];
+ void *read_ring[IOFD_MSGHDR_READ_SQES];
int read_len;
void *write_msghdr;
+ void *write_ring;
/* TODO: index into array of registered fd's? */
/* osmo_fd for non-blocking connect handling */
struct osmo_fd connect_ofd;
diff --git a/src/core/osmo_io_uring.c b/src/core/osmo_io_uring.c
index 68508ad..c9fdf2a 100644
--- a/src/core/osmo_io_uring.c
+++ b/src/core/osmo_io_uring.c
@@ -56,12 +56,16 @@
#define OSMO_IO_URING_BATCH "LIBOSMO_IO_URING_BATCH"
+#define OSMO_IO_URING_QUEUE "LIBOSMO_IO_URING_QUEUE"
+
#define OSMO_IO_URING_IOV "LIBOSMO_IO_URING_IOV"
#define OSMO_IO_URING_READ_SQE "LIBOSMO_IO_URING_READ_SQE"
static bool g_io_uring_batch = false;
+static int g_io_uring_queue = IOFD_URING_ENTRIES;
+
extern int g_io_uring_iov;
static int g_io_uring_read_sqes = 1;
@@ -71,7 +75,7 @@
struct io_uring ring;
};
-static __thread struct osmo_io_uring g_ring;
+static __thread struct osmo_io_uring *g_ring = NULL;
static void iofd_uring_cqe(struct io_uring *ring);
@@ -106,7 +110,12 @@
if ((env = getenv(OSMO_IO_URING_BATCH)))
g_io_uring_batch = true;
- rc = io_uring_queue_init(IOFD_URING_ENTRIES, &g_ring.ring, 0);
+ if (!g_ring && (env = getenv(OSMO_IO_URING_QUEUE)))
+ g_io_uring_queue = atoi(env);
+
+ g_ring = calloc(1, sizeof(*g_ring));
+
+ rc = io_uring_queue_init(g_io_uring_queue, &g_ring->ring, 0);
if (rc < 0)
osmo_panic("failure during io_uring_queue_init(): %s\n", strerror(-rc));
@@ -130,27 +139,47 @@
rc = eventfd(0, 0);
if (rc < 0) {
- io_uring_queue_exit(&g_ring.ring);
+ io_uring_queue_exit(&g_ring->ring);
osmo_panic("failure creating eventfd(0, 0) for io_uring: %s\n",
strerror(-rc));
}
evfd = rc;
- osmo_fd_setup(&g_ring.event_ofd, evfd, OSMO_FD_READ, iofd_uring_poll_cb,
&g_ring.ring, 0);
- rc = osmo_fd_register(&g_ring.event_ofd);
+ osmo_fd_setup(&g_ring->event_ofd, evfd, OSMO_FD_READ, iofd_uring_poll_cb,
&g_ring->ring, 0);
+ rc = osmo_fd_register(&g_ring->event_ofd);
if (rc < 0) {
close(evfd);
- io_uring_queue_exit(&g_ring.ring);
+ io_uring_queue_exit(&g_ring->ring);
osmo_panic("failure registering io_uring-eventfd as osmo_fd: %d\n", rc);
}
- rc = io_uring_register_eventfd(&g_ring.ring, evfd);
+ rc = io_uring_register_eventfd(&g_ring->ring, evfd);
if (rc < 0) {
- osmo_fd_unregister(&g_ring.event_ofd);
+ osmo_fd_unregister(&g_ring->event_ofd);
close(evfd);
- io_uring_queue_exit(&g_ring.ring);
+ io_uring_queue_exit(&g_ring->ring);
osmo_panic("failure registering eventfd with io_uring: %s\n",
strerror(-rc));
}
}
+static struct io_uring_sqe *osmo_iofd_uring_get_sqe(void)
+{
+ struct io_uring_sqe *sqe;
+
+ sqe = io_uring_get_sqe(&g_ring->ring);
+ if (sqe)
+ return sqe;
+
+ g_io_uring_queue *= 2;
+ LOGP(DLIO, LOGL_NOTICE, "io_uring too small to handle all SQEs, increasing size to
%d.\n", g_io_uring_queue);
+
+ /* Submit all SQEs of current ring and create a new ring.
+ * The old ring will be kept, as there are uncompleted submissions. */
+ io_uring_submit(&g_ring->ring);
+ osmo_iofd_uring_init();
+
+ sqe = io_uring_get_sqe(&g_ring->ring);
+ OSMO_ASSERT(sqe);
+ return sqe;
+}
static void iofd_uring_submit_recv(struct osmo_io_fd *iofd, enum iofd_msg_action action)
{
@@ -200,7 +229,7 @@
OSMO_ASSERT(0);
}
- sqe = io_uring_get_sqe(&g_ring.ring);
+ sqe = osmo_iofd_uring_get_sqe();
if (!sqe) {
LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
OSMO_ASSERT(0);
@@ -220,9 +249,11 @@
io_uring_sqe_set_data(sqe, msghdr);
if (!g_io_uring_batch)
- io_uring_submit(&g_ring.ring);
+ io_uring_submit(&g_ring->ring);
- iofd->u.uring.read_msghdr[iofd->u.uring.read_len++] = msghdr;
+ iofd->u.uring.read_msghdr[iofd->u.uring.read_len] = msghdr;
+ iofd->u.uring.read_ring[iofd->u.uring.read_len] = &g_ring->ring;
+ iofd->u.uring.read_len++;
/* Submit more read SQEs in advance. */
if (iofd->u.uring.read_len < g_io_uring_read_sqes)
@@ -246,8 +277,10 @@
}
/* Remove entry at idx. */
iofd->u.uring.read_len--;
- for (i = idx; i < iofd->u.uring.read_len; i++)
+ for (i = idx; i < iofd->u.uring.read_len; i++) {
iofd->u.uring.read_msghdr[i] = iofd->u.uring.read_msghdr[i + 1];
+ iofd->u.uring.read_ring[i] = iofd->u.uring.read_ring[i + 1];
+ }
for (idx = 0; idx < msghdr->msg_len; idx++) {
struct msgb *msg = msghdr->msg[idx];
@@ -298,8 +331,10 @@
* This callback function might close iofd, leading to the potential freeing of
iofd->u.uring.write_msghdr if
* still attached. Since iofd_handle_send_completion() frees msghdr at the end of the
function, detaching
* msghdr here prevents a double-free bug. */
- if (iofd->u.uring.write_msghdr == msghdr)
+ if (iofd->u.uring.write_msghdr == msghdr) {
iofd->u.uring.write_msghdr = NULL;
+ iofd->u.uring.write_ring = NULL;
+ }
if (OSMO_UNLIKELY(IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))) {
for (int idx = 0; idx < msghdr->msg_len; idx++)
@@ -382,7 +417,7 @@
if (!msghdr)
return -ENODATA;
- sqe = io_uring_get_sqe(&g_ring.ring);
+ sqe = osmo_iofd_uring_get_sqe();
if (!sqe) {
LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
OSMO_ASSERT(0);
@@ -403,8 +438,9 @@
}
if (!g_io_uring_batch)
- io_uring_submit(&g_ring.ring);
+ io_uring_submit(&g_ring->ring);
iofd->u.uring.write_msghdr = msghdr;
+ iofd->u.uring.write_ring = &g_ring->ring;
return 0;
}
@@ -482,20 +518,19 @@
for (idx = 0; idx < iofd->u.uring.read_len; idx++) {
msghdr = iofd->u.uring.read_msghdr[idx];
- sqe = io_uring_get_sqe(&g_ring.ring);
- OSMO_ASSERT(sqe != NULL);
+ while ((sqe = io_uring_get_sqe(iofd->u.uring.read_ring[idx])) == NULL);
io_uring_sqe_set_data(sqe, NULL);
LOGPIO(iofd, LOGL_DEBUG, "Cancelling read\n");
talloc_steal(OTC_GLOBAL, msghdr);
msghdr->iofd = NULL;
io_uring_prep_cancel(sqe, msghdr, 0);
+ io_uring_submit(iofd->u.uring.read_ring[idx]);
}
iofd->u.uring.read_len = 0;
if (iofd->u.uring.write_msghdr) {
msghdr = iofd->u.uring.write_msghdr;
- sqe = io_uring_get_sqe(&g_ring.ring);
- OSMO_ASSERT(sqe != NULL);
+ while ((sqe = io_uring_get_sqe(iofd->u.uring.write_ring)) == NULL);
io_uring_sqe_set_data(sqe, NULL);
LOGPIO(iofd, LOGL_DEBUG, "Cancelling write\n");
iofd->u.uring.write_msghdr = NULL;
@@ -504,9 +539,9 @@
msgb_free(msghdr->msg[idx]);
msghdr->iofd = NULL;
io_uring_prep_cancel(sqe, msghdr, 0);
+ io_uring_submit(iofd->u.uring.write_ring);
+ iofd->u.uring.write_ring = NULL;
}
- if (!g_io_uring_batch)
- io_uring_submit(&g_ring.ring);
if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_NOTIFY_CONNECTED)) {
osmo_fd_unregister(&iofd->u.uring.connect_ofd);
@@ -547,7 +582,7 @@
msghdr->iov[0].iov_base = msgb_data(msg);
msghdr->iov[0].iov_len = msgb_length(msg);
- sqe = io_uring_get_sqe(&g_ring.ring);
+ sqe = osmo_iofd_uring_get_sqe();
if (!sqe) {
LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
OSMO_ASSERT(0);
@@ -556,8 +591,9 @@
io_uring_sqe_set_data(sqe, msghdr);
if (!g_io_uring_batch)
- io_uring_submit(&g_ring.ring);
+ io_uring_submit(&g_ring->ring);
iofd->u.uring.write_msghdr = msghdr;
+ iofd->u.uring.write_ring = &g_ring->ring;
}
}
@@ -630,7 +666,7 @@
void osmo_io_uring_submit(void)
{
if (g_io_uring_batch)
- io_uring_submit(&g_ring.ring);
+ io_uring_submit(&g_ring->ring);
}
#endif /* defined(__linux__) */
--
To view, visit
https://gerrit.osmocom.org/c/libosmocore/+/40725?usp=email
To unsubscribe, or for help writing mail filters, visit
https://gerrit.osmocom.org/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: libosmocore
Gerrit-Branch: master
Gerrit-Change-Id: Id9230146acc8d54bfd44834e783c31b37bd64bca
Gerrit-Change-Number: 40725
Gerrit-PatchSet: 1
Gerrit-Owner: jolly <andreas(a)eversberg.eu>