laforge has submitted this change. ( https://gerrit.osmocom.org/c/libosmocore/+/40856?usp=email )
Change subject: Put all io_uring related read and write states into sub structures ......................................................................
Put all io_uring related read and write states into sub structures
Related: OS#6705 Change-Id: I3cf4ab6d9aebf5532ad174c90c7b0e9633491c88 --- M src/core/osmo_io.c M src/core/osmo_io_internal.h M src/core/osmo_io_uring.c 3 files changed, 70 insertions(+), 63 deletions(-)
Approvals: laforge: Looks good to me, approved Jenkins Builder: Verified pespin: Looks good to me, but someone else must approve
diff --git a/src/core/osmo_io.c b/src/core/osmo_io.c index 2f3af06..399a604 100644 --- a/src/core/osmo_io.c +++ b/src/core/osmo_io.c @@ -901,7 +901,7 @@ if (sqes < 1 || sqes > IOFD_MSGHDR_MAX_READ_SQES) return -EINVAL;
- iofd->u.uring.num_read_sqes = sqes; + iofd->u.uring.read.num_sqes = sqes; return 0; }
diff --git a/src/core/osmo_io_internal.h b/src/core/osmo_io_internal.h index bf5bb76..f425da2 100644 --- a/src/core/osmo_io_internal.h +++ b/src/core/osmo_io_internal.h @@ -108,19 +108,26 @@ struct osmo_fd ofd; } poll; struct { - bool read_enabled; - bool write_enabled; - /*! requested number of simultaniously submitted read SQEs */ - uint8_t num_read_sqes; - /*! array of simultaneously submitted read SQEs */ - void *read_msghdr[IOFD_MSGHDR_MAX_READ_SQES]; - /*! ring the read SQEs have been submitted to */ - struct io_uring *read_ring; - /*! current number of simultaneously submitted read SQEs */ - uint8_t reads_submitted; - void *write_msghdr; - /*! ring the write SQE has been submitted to */ - struct io_uring *write_ring; + struct { + /*! read is enabled, due to registration of callback function */ + bool enabled; + /*! requested number of simultaniously submitted read SQEs */ + uint8_t num_sqes; + /*! array of simultaneously submitted read SQEs */ + void *msghdr[IOFD_MSGHDR_MAX_READ_SQES]; + /*! ring the read SQEs have been submitted to */ + struct io_uring *ring; + /*! current number of simultaneously submitted read SQEs */ + uint8_t sqes_submitted; + } read; + struct { + /*! write is enabled, due to pending msghdr in tx_queue */ + bool enabled; + /*! submitted write SQE */ + void *msghdr; + /*! ring the write SQE has been submitted to */ + struct io_uring *ring; + } write; /* TODO: index into array of registered fd's? */ /* osmo_fd for non-blocking connect handling */ struct osmo_fd connect_ofd; diff --git a/src/core/osmo_io_uring.c b/src/core/osmo_io_uring.c index 45a08fe..01b8063 100644 --- a/src/core/osmo_io_uring.c +++ b/src/core/osmo_io_uring.c @@ -196,7 +196,7 @@ struct io_uring_sqe *sqe;
/* All subsequent read SQEs must be on the same ring. */ - if (read && iofd->u.uring.reads_submitted > 0 && iofd->u.uring.read_ring != &g_ring->ring) + if (read && iofd->u.uring.read.sqes_submitted > 0 && iofd->u.uring.read.ring != &g_ring->ring) return NULL;
sqe = io_uring_get_sqe_and_count(g_ring); @@ -204,7 +204,7 @@ return sqe;
/* The current ring is full, subsequent read SQEs on different ring are not allowed. */ - if (read && iofd->u.uring.reads_submitted > 0) + if (read && iofd->u.uring.read.sqes_submitted > 0) return NULL;
if (g_io_uring_size < IOFD_URING_MAXIMUM_SIZE) { @@ -246,7 +246,7 @@ /* Tell iofd_uring_get_sqe() not to allocate a new ring, if we want to enqueue multiple read SQEs. */ sqe = iofd_uring_get_sqe(iofd, true); if (!sqe) { - if (iofd->u.uring.reads_submitted > 0) + if (iofd->u.uring.read.sqes_submitted > 0) return -EINVAL; LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n"); OSMO_ASSERT(0); @@ -301,9 +301,9 @@
iofd_io_uring_submit();
- iofd->u.uring.read_msghdr[iofd->u.uring.reads_submitted] = msghdr; - iofd->u.uring.reads_submitted++; - iofd->u.uring.read_ring = &g_ring->ring; + iofd->u.uring.read.msghdr[iofd->u.uring.read.sqes_submitted] = msghdr; + iofd->u.uring.read.sqes_submitted++; + iofd->u.uring.read.ring = &g_ring->ring;
return 0; } @@ -313,7 +313,7 @@ int rc;
/* Submit more read SQEs in advance, if requested. */ - while (iofd->u.uring.reads_submitted < iofd->u.uring.num_read_sqes) { + while (iofd->u.uring.read.sqes_submitted < iofd->u.uring.read.num_sqes) { rc = iofd_uring_submit_recv_sqe(iofd, action); /* Stop, if we cannot enqueue multiple read SQEs in the same ring. */ if (rc < 0) @@ -328,19 +328,19 @@ uint8_t idx, i;
/* Find which read_msghdr is completed and remove from list. */ - for (idx = 0; idx < iofd->u.uring.reads_submitted; idx++) { - if (iofd->u.uring.read_msghdr[idx] == msghdr) + for (idx = 0; idx < iofd->u.uring.read.sqes_submitted; idx++) { + if (iofd->u.uring.read.msghdr[idx] == msghdr) break; } - if (idx == iofd->u.uring.reads_submitted) { + if (idx == iofd->u.uring.read.sqes_submitted) { LOGP(DLIO, LOGL_FATAL, "Read SQE completion, but msghdr not found, please fix!\n"); return; } /* Remove entry at idx. */ - iofd->u.uring.reads_submitted--; - for (i = idx; i < iofd->u.uring.reads_submitted; i++) - iofd->u.uring.read_msghdr[i] = iofd->u.uring.read_msghdr[i + 1]; - iofd->u.uring.read_msghdr[i] = NULL; + iofd->u.uring.read.sqes_submitted--; + for (i = idx; i < iofd->u.uring.read.sqes_submitted; i++) + iofd->u.uring.read.msghdr[i] = iofd->u.uring.read.msghdr[i + 1]; + iofd->u.uring.read.msghdr[i] = NULL;
for (idx = 0; idx < msghdr->io_len; idx++) { struct msgb *msg = msghdr->msg[idx]; @@ -359,7 +359,7 @@ }
/* Check for every iteration, because iofd might get unregistered/closed during receive function. */ - if (iofd->u.uring.read_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED)) + if (iofd->u.uring.read.enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED)) iofd_handle_recv(iofd, msg, chunk, msghdr); else msgb_free(msg); @@ -372,7 +372,7 @@ msghdr->msg[idx] = NULL; }
- if (iofd->u.uring.read_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED)) + if (iofd->u.uring.read.enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED)) iofd_uring_submit_recv(iofd, msghdr->action);
iofd_msghdr_free(msghdr); @@ -388,12 +388,12 @@ /* Detach msghdr from iofd. It might get freed here or it is freed during iofd_handle_send_completion(). * If there is pending data to send, iofd_uring_submit_tx() will attach it again. * iofd_handle_send_completion() will invoke a callback function to signal the possibility of write/send. - * This callback function might close iofd, leading to the potential freeing of iofd->u.uring.write_msghdr if + * This callback function might close iofd, leading to the potential freeing of iofd->u.uring.write.msghdr if * still attached. Since iofd_handle_send_completion() frees msghdr at the end of the function, detaching * msghdr here prevents a double-free bug. */ - if (iofd->u.uring.write_msghdr == msghdr) { - iofd->u.uring.write_msghdr = NULL; - iofd->u.uring.write_ring = NULL; + if (iofd->u.uring.write.msghdr == msghdr) { + iofd->u.uring.write.msghdr = NULL; + iofd->u.uring.write.ring = NULL; }
if (OSMO_UNLIKELY(IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))) { @@ -407,7 +407,7 @@ }
/* submit the next to-be-transmitted message for this file descriptor */ - if (iofd->u.uring.write_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED)) + if (iofd->u.uring.write.enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED)) iofd_uring_submit_tx(iofd); }
@@ -435,7 +435,7 @@
IOFD_FLAG_UNSET(iofd, IOFD_FLAG_IN_CALLBACK);
- if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) && !iofd->u.uring.reads_submitted && !iofd->u.uring.write_msghdr) + if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) && !iofd->u.uring.read.sqes_submitted && !iofd->u.uring.write.msghdr) talloc_free(iofd); }
@@ -525,8 +525,8 @@
iofd_io_uring_submit();
- iofd->u.uring.write_msghdr = msghdr; - iofd->u.uring.write_ring = &g_ring->ring; + iofd->u.uring.write.msghdr = msghdr; + iofd->u.uring.write.ring = &g_ring->ring;
return 0; } @@ -565,19 +565,19 @@ IOFD_FLAG_UNSET(iofd, IOFD_FLAG_IN_CALLBACK);
/* If write/read notifications are pending, enable it now. */ - if (iofd->u.uring.write_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED)) + if (iofd->u.uring.write.enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED)) iofd_uring_write_enable(iofd); - if (iofd->u.uring.read_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED)) + if (iofd->u.uring.read.enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED)) iofd_uring_read_enable(iofd);
- if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) && !iofd->u.uring.reads_submitted && !iofd->u.uring.write_msghdr) + if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) && !iofd->u.uring.read.sqes_submitted && !iofd->u.uring.write.msghdr) talloc_free(iofd); return 0; }
static int iofd_uring_setup(struct osmo_io_fd *iofd) { - iofd->u.uring.num_read_sqes = g_io_uring_read_sqes; + iofd->u.uring.read.num_sqes = g_io_uring_read_sqes;
return 0; } @@ -609,10 +609,10 @@ struct iofd_msghdr *msghdr; uint8_t idx;
- for (idx = 0; idx < iofd->u.uring.reads_submitted; idx++) { - struct osmo_io_uring *ring = container_of(iofd->u.uring.read_ring, struct osmo_io_uring, ring); - msghdr = iofd->u.uring.read_msghdr[idx]; - iofd->u.uring.read_msghdr[idx] = NULL; + for (idx = 0; idx < iofd->u.uring.read.sqes_submitted; idx++) { + struct osmo_io_uring *ring = container_of(iofd->u.uring.read.ring, struct osmo_io_uring, ring); + msghdr = iofd->u.uring.read.msghdr[idx]; + iofd->u.uring.read.msghdr[idx] = NULL; /* Submit SQEs of the current ring, if needed. */ if (&ring->ring == &g_ring->ring) osmo_io_uring_submit(); @@ -630,15 +630,15 @@ talloc_steal(OTC_GLOBAL, msghdr); msghdr->iofd = NULL; } - if (iofd->u.uring.reads_submitted) { - iofd->u.uring.read_ring = NULL; - iofd->u.uring.reads_submitted = 0; + if (iofd->u.uring.read.sqes_submitted) { + iofd->u.uring.read.ring = NULL; + iofd->u.uring.read.sqes_submitted = 0; }
- if (iofd->u.uring.write_msghdr) { - struct osmo_io_uring *ring = container_of(iofd->u.uring.write_ring, struct osmo_io_uring, ring); - msghdr = iofd->u.uring.write_msghdr; - iofd->u.uring.write_msghdr = NULL; + if (iofd->u.uring.write.msghdr) { + struct osmo_io_uring *ring = container_of(iofd->u.uring.write.ring, struct osmo_io_uring, ring); + msghdr = iofd->u.uring.write.msghdr; + iofd->u.uring.write.msghdr = NULL; for (int idx = 0; idx < msghdr->io_len; idx++) { msgb_free(msghdr->msg[idx]); msghdr->msg[idx] = NULL; @@ -659,7 +659,7 @@ } talloc_steal(OTC_GLOBAL, msghdr); msghdr->iofd = NULL; - iofd->u.uring.write_ring = NULL; + iofd->u.uring.write.ring = NULL; }
if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_NOTIFY_CONNECTED)) { @@ -672,9 +672,9 @@
static void iofd_uring_write_enable(struct osmo_io_fd *iofd) { - iofd->u.uring.write_enabled = true; + iofd->u.uring.write.enabled = true;
- if (iofd->u.uring.write_msghdr) + if (iofd->u.uring.write.msghdr) return;
/* This function is called again, once the socket is connected. */ @@ -711,21 +711,21 @@
iofd_io_uring_submit();
- iofd->u.uring.write_msghdr = msghdr; - iofd->u.uring.write_ring = &g_ring->ring; + iofd->u.uring.write.msghdr = msghdr; + iofd->u.uring.write.ring = &g_ring->ring; } }
static void iofd_uring_write_disable(struct osmo_io_fd *iofd) { - iofd->u.uring.write_enabled = false; + iofd->u.uring.write.enabled = false; }
static void iofd_uring_read_enable(struct osmo_io_fd *iofd) { - iofd->u.uring.read_enabled = true; + iofd->u.uring.read.enabled = true;
- if (iofd->u.uring.reads_submitted) + if (iofd->u.uring.read.sqes_submitted) return;
/* This function is called again, once the socket is connected. */ @@ -749,7 +749,7 @@
static void iofd_uring_read_disable(struct osmo_io_fd *iofd) { - iofd->u.uring.read_enabled = false; + iofd->u.uring.read.enabled = false; }
static int iofd_uring_close(struct osmo_io_fd *iofd) @@ -765,7 +765,7 @@ }
/* OSMO_IO_FD_MODE_RECVMSG_SENDMSG: Don't call this function after enabling read or write. */ - OSMO_ASSERT(!iofd->u.uring.write_enabled && !iofd->u.uring.read_enabled); + OSMO_ASSERT(!iofd->u.uring.write.enabled && !iofd->u.uring.read.enabled);
/* Set flag to enable temporary osmo_fd during register() time: */ IOFD_FLAG_SET(iofd, IOFD_FLAG_NOTIFY_CONNECTED);