jolly has uploaded this change for review. (
https://gerrit.osmocom.org/c/libosmocore/+/40856?usp=email )
Change subject: Put all io_uring related read and write states into sub structures
......................................................................
Put all io_uring related read and write states into sub structures
Related: OS#6705
Change-Id: I3cf4ab6d9aebf5532ad174c90c7b0e9633491c88
---
M src/core/osmo_io.c
M src/core/osmo_io_internal.h
M src/core/osmo_io_uring.c
3 files changed, 69 insertions(+), 62 deletions(-)
git pull ssh://gerrit.osmocom.org:29418/libosmocore refs/changes/56/40856/1
diff --git a/src/core/osmo_io.c b/src/core/osmo_io.c
index 6a5dcce..6e19231 100644
--- a/src/core/osmo_io.c
+++ b/src/core/osmo_io.c
@@ -893,7 +893,7 @@
if (sqes < 1 || sqes > IOFD_MSGHDR_MAX_READ_SQES)
return -EINVAL;
- iofd->u.uring.num_read_sqes = sqes;
+ iofd->u.uring.read.num_sqes = sqes;
return 0;
}
diff --git a/src/core/osmo_io_internal.h b/src/core/osmo_io_internal.h
index 3d92b85..b417e37 100644
--- a/src/core/osmo_io_internal.h
+++ b/src/core/osmo_io_internal.h
@@ -107,19 +107,26 @@
struct osmo_fd ofd;
} poll;
struct {
- bool read_enabled;
- bool write_enabled;
- /*! requested number of simultaniously submitted read SQEs */
- uint8_t num_read_sqes;
- /*! array of simultaneously submitted read SQEs */
- void *read_msghdr[IOFD_MSGHDR_MAX_READ_SQES];
- /*! ring the read SQEs have been submitted to */
- struct io_uring *read_ring;
- /*! current number of simultaneously submitted read SQEs */
- uint8_t reads_submitted;
- void *write_msghdr;
- /*! ring the write SQE has been submitted to */
- struct io_uring *write_ring;
+ struct {
+ /*! read is enabled, due to registration of callback function */
+ bool enabled;
+ /*! requested number of simultaniously submitted read SQEs */
+ uint8_t num_sqes;
+ /*! array of simultaneously submitted read SQEs */
+ void *msghdr[IOFD_MSGHDR_MAX_READ_SQES];
+ /*! ring the read SQEs have been submitted to */
+ struct io_uring *ring;
+ /*! current number of simultaneously submitted read SQEs */
+ uint8_t sqes_submitted;
+ } read;
+ struct {
+ /*! write is enabled, due to pending msghdr in tx_queue */
+ bool enabled;
+ /*! submitted write SQE */
+ void *msghdr;
+ /*! ring the write SQE has been submitted to */
+ struct io_uring *ring;
+ } write;
/* TODO: index into array of registered fd's? */
/* osmo_fd for non-blocking connect handling */
struct osmo_fd connect_ofd;
diff --git a/src/core/osmo_io_uring.c b/src/core/osmo_io_uring.c
index a867968..b414c0d 100644
--- a/src/core/osmo_io_uring.c
+++ b/src/core/osmo_io_uring.c
@@ -232,13 +232,13 @@
uint8_t idx;
/* All subsequent read SQEs must be on the same ring. */
- if (iofd->u.uring.reads_submitted > 0 && iofd->u.uring.read_ring !=
&g_ring->ring)
+ if (iofd->u.uring.read.sqes_submitted > 0 && iofd->u.uring.read.ring !=
&g_ring->ring)
return -EINVAL;
/* Tell iofd_uring_get_sqe() not to allocate a new ring, if we want to enqueue multiple
read SQEs. */
- sqe = iofd_uring_get_sqe(iofd->u.uring.reads_submitted > 0);
+ sqe = iofd_uring_get_sqe(iofd->u.uring.read.sqes_submitted > 0);
if (!sqe) {
- if (iofd->u.uring.reads_submitted > 0)
+ if (iofd->u.uring.read.sqes_submitted > 0)
return -EINVAL;
LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
OSMO_ASSERT(0);
@@ -293,9 +293,9 @@
iofd_io_uring_submit();
- iofd->u.uring.read_msghdr[iofd->u.uring.reads_submitted] = msghdr;
- iofd->u.uring.reads_submitted++;
- iofd->u.uring.read_ring = &g_ring->ring;
+ iofd->u.uring.read.msghdr[iofd->u.uring.read.sqes_submitted] = msghdr;
+ iofd->u.uring.read.sqes_submitted++;
+ iofd->u.uring.read.ring = &g_ring->ring;
return 0;
}
@@ -305,7 +305,7 @@
int rc;
/* Submit more read SQEs in advance, if requested. */
- while (iofd->u.uring.reads_submitted < ((iofd->u.uring.num_read_sqes) ? :
g_io_uring_read_sqes)) {
+ while (iofd->u.uring.read.sqes_submitted < ((iofd->u.uring.read.num_sqes) ? :
g_io_uring_read_sqes)) {
rc = iofd_uring_submit_recv_sqe(iofd, action);
/* Stop, if we cannot enqueue multiple read SQEs in the same ring. */
if (rc < 0)
@@ -320,19 +320,19 @@
uint8_t idx, i;
/* Find which read_msghdr is completed and remove from list. */
- for (idx = 0; idx < iofd->u.uring.reads_submitted; idx++) {
- if (iofd->u.uring.read_msghdr[idx] == msghdr)
+ for (idx = 0; idx < iofd->u.uring.read.sqes_submitted; idx++) {
+ if (iofd->u.uring.read.msghdr[idx] == msghdr)
break;
}
- if (idx == iofd->u.uring.reads_submitted) {
+ if (idx == iofd->u.uring.read.sqes_submitted) {
LOGP(DLIO, LOGL_FATAL, "Read SQE completion, but msghdr not found, please
fix!\n");
return;
}
/* Remove entry at idx. */
- iofd->u.uring.reads_submitted--;
- for (i = idx; i < iofd->u.uring.reads_submitted; i++)
- iofd->u.uring.read_msghdr[i] = iofd->u.uring.read_msghdr[i + 1];
- iofd->u.uring.read_msghdr[i] = NULL;
+ iofd->u.uring.read.sqes_submitted--;
+ for (i = idx; i < iofd->u.uring.read.sqes_submitted; i++)
+ iofd->u.uring.read.msghdr[i] = iofd->u.uring.read.msghdr[i + 1];
+ iofd->u.uring.read.msghdr[i] = NULL;
for (idx = 0; idx < msghdr->io_len; idx++) {
struct msgb *msg = msghdr->msg[idx];
@@ -351,7 +351,7 @@
}
/* Check for every iteration, because iofd might get unregistered/closed during receive
function. */
- if (iofd->u.uring.read_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
+ if (iofd->u.uring.read.enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
iofd_handle_recv(iofd, msg, chunk, msghdr);
else
msgb_free(msg);
@@ -364,7 +364,7 @@
msghdr->msg[idx] = NULL;
}
- if (iofd->u.uring.read_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
+ if (iofd->u.uring.read.enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
iofd_uring_submit_recv(iofd, msghdr->action);
iofd_msghdr_free(msghdr);
@@ -380,12 +380,12 @@
/* Detach msghdr from iofd. It might get freed here or it is freed during
iofd_handle_send_completion().
* If there is pending data to send, iofd_uring_submit_tx() will attach it again.
* iofd_handle_send_completion() will invoke a callback function to signal the
possibility of write/send.
- * This callback function might close iofd, leading to the potential freeing of
iofd->u.uring.write_msghdr if
+ * This callback function might close iofd, leading to the potential freeing of
iofd->u.uring.write.msghdr if
* still attached. Since iofd_handle_send_completion() frees msghdr at the end of the
function, detaching
* msghdr here prevents a double-free bug. */
- if (iofd->u.uring.write_msghdr == msghdr) {
- iofd->u.uring.write_msghdr = NULL;
- iofd->u.uring.write_ring = NULL;
+ if (iofd->u.uring.write.msghdr == msghdr) {
+ iofd->u.uring.write.msghdr = NULL;
+ iofd->u.uring.write.ring = NULL;
}
if (OSMO_UNLIKELY(IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))) {
@@ -399,7 +399,7 @@
}
/* submit the next to-be-transmitted message for this file descriptor */
- if (iofd->u.uring.write_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
+ if (iofd->u.uring.write.enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
iofd_uring_submit_tx(iofd);
}
@@ -427,7 +427,7 @@
IOFD_FLAG_UNSET(iofd, IOFD_FLAG_IN_CALLBACK);
- if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) &&
!iofd->u.uring.reads_submitted && !iofd->u.uring.write_msghdr)
+ if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) &&
!iofd->u.uring.read.sqes_submitted && !iofd->u.uring.write.msghdr)
talloc_free(iofd);
}
@@ -518,8 +518,8 @@
iofd_io_uring_submit();
- iofd->u.uring.write_msghdr = msghdr;
- iofd->u.uring.write_ring = &g_ring->ring;
+ iofd->u.uring.write.msghdr = msghdr;
+ iofd->u.uring.write.ring = &g_ring->ring;
return 0;
}
@@ -558,12 +558,12 @@
IOFD_FLAG_UNSET(iofd, IOFD_FLAG_IN_CALLBACK);
/* If write/read notifications are pending, enable it now. */
- if (iofd->u.uring.write_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
+ if (iofd->u.uring.write.enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
iofd_uring_write_enable(iofd);
- if (iofd->u.uring.read_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
+ if (iofd->u.uring.read.enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
iofd_uring_read_enable(iofd);
- if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) &&
!iofd->u.uring.reads_submitted && !iofd->u.uring.write_msghdr)
+ if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) &&
!iofd->u.uring.read.sqes_submitted && !iofd->u.uring.write.msghdr)
talloc_free(iofd);
return 0;
}
@@ -595,10 +595,10 @@
struct iofd_msghdr *msghdr;
uint8_t idx;
- for (idx = 0; idx < iofd->u.uring.reads_submitted; idx++) {
- struct osmo_io_uring *ring = container_of(iofd->u.uring.read_ring, struct
osmo_io_uring, ring);
- msghdr = iofd->u.uring.read_msghdr[idx];
- iofd->u.uring.read_msghdr[idx] = NULL;
+ for (idx = 0; idx < iofd->u.uring.read.sqes_submitted; idx++) {
+ struct osmo_io_uring *ring = container_of(iofd->u.uring.read.ring, struct
osmo_io_uring, ring);
+ msghdr = iofd->u.uring.read.msghdr[idx];
+ iofd->u.uring.read.msghdr[idx] = NULL;
/* Submit SQEs of the current ring, if needed. */
if (&ring->ring == &g_ring->ring)
osmo_io_uring_submit();
@@ -617,15 +617,15 @@
talloc_steal(OTC_GLOBAL, msghdr);
msghdr->iofd = NULL;
}
- if (iofd->u.uring.reads_submitted) {
- iofd->u.uring.read_ring = NULL;
- iofd->u.uring.reads_submitted = 0;
+ if (iofd->u.uring.read.sqes_submitted) {
+ iofd->u.uring.read.ring = NULL;
+ iofd->u.uring.read.sqes_submitted = 0;
}
- if (iofd->u.uring.write_msghdr) {
- struct osmo_io_uring *ring = container_of(iofd->u.uring.write_ring, struct
osmo_io_uring, ring);
- msghdr = iofd->u.uring.write_msghdr;
- iofd->u.uring.write_msghdr = NULL;
+ if (iofd->u.uring.write.msghdr) {
+ struct osmo_io_uring *ring = container_of(iofd->u.uring.write.ring, struct
osmo_io_uring, ring);
+ msghdr = iofd->u.uring.write.msghdr;
+ iofd->u.uring.write.msghdr = NULL;
for (int idx = 0; idx < msghdr->io_len; idx++) {
msgb_free(msghdr->msg[idx]);
msghdr->msg[idx] = NULL;
@@ -647,7 +647,7 @@
}
talloc_steal(OTC_GLOBAL, msghdr);
msghdr->iofd = NULL;
- iofd->u.uring.write_ring = NULL;
+ iofd->u.uring.write.ring = NULL;
}
if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_NOTIFY_CONNECTED)) {
@@ -660,9 +660,9 @@
static void iofd_uring_write_enable(struct osmo_io_fd *iofd)
{
- iofd->u.uring.write_enabled = true;
+ iofd->u.uring.write.enabled = true;
- if (iofd->u.uring.write_msghdr)
+ if (iofd->u.uring.write.msghdr)
return;
/* This function is called again, once the socket is connected. */
@@ -699,21 +699,21 @@
iofd_io_uring_submit();
- iofd->u.uring.write_msghdr = msghdr;
- iofd->u.uring.write_ring = &g_ring->ring;
+ iofd->u.uring.write.msghdr = msghdr;
+ iofd->u.uring.write.ring = &g_ring->ring;
}
}
static void iofd_uring_write_disable(struct osmo_io_fd *iofd)
{
- iofd->u.uring.write_enabled = false;
+ iofd->u.uring.write.enabled = false;
}
static void iofd_uring_read_enable(struct osmo_io_fd *iofd)
{
- iofd->u.uring.read_enabled = true;
+ iofd->u.uring.read.enabled = true;
- if (iofd->u.uring.reads_submitted)
+ if (iofd->u.uring.read.sqes_submitted)
return;
/* This function is called again, once the socket is connected. */
@@ -737,7 +737,7 @@
static void iofd_uring_read_disable(struct osmo_io_fd *iofd)
{
- iofd->u.uring.read_enabled = false;
+ iofd->u.uring.read.enabled = false;
}
static int iofd_uring_close(struct osmo_io_fd *iofd)
@@ -753,7 +753,7 @@
}
/* OSMO_IO_FD_MODE_RECVMSG_SENDMSG: Don't call this function after enabling read or
write. */
- OSMO_ASSERT(!iofd->u.uring.write_enabled && !iofd->u.uring.read_enabled);
+ OSMO_ASSERT(!iofd->u.uring.write.enabled && !iofd->u.uring.read.enabled);
/* Set flag to enable temporary osmo_fd during register() time: */
IOFD_FLAG_SET(iofd, IOFD_FLAG_NOTIFY_CONNECTED);
--
To view, visit
https://gerrit.osmocom.org/c/libosmocore/+/40856?usp=email
To unsubscribe, or for help writing mail filters, visit
https://gerrit.osmocom.org/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: libosmocore
Gerrit-Branch: master
Gerrit-Change-Id: I3cf4ab6d9aebf5532ad174c90c7b0e9633491c88
Gerrit-Change-Number: 40856
Gerrit-PatchSet: 1
Gerrit-Owner: jolly <andreas(a)eversberg.eu>