laforge has submitted this change. (
https://gerrit.osmocom.org/c/libosmocore/+/40493?usp=email )
(
10 is the latest approved patch-set.
No files were changed between the latest approved patch-set and the submitted one.
)Change subject: Add multiple messages buffers to io_uring write operations
......................................................................
Add multiple messages buffers to io_uring write operations
Multiple message buffers can be writen by sending a single SQE when
using io_uring. If there is less data written, the completely written
buffers are removed and the partly written buffers are truncated.
Afterwards they are re-queued for next write operation.
Having more than one buffer is optional and the number can be controlled
via environment variable.
Related: OS#6705
Change-Id: I8c4e0a785cf66becd7fb5b2caf718c9724b56686
---
M src/core/osmo_io.c
M src/core/osmo_io_uring.c
2 files changed, 103 insertions(+), 46 deletions(-)
Approvals:
laforge: Looks good to me, but someone else must approve
pespin: Looks good to me, approved
Jenkins Builder: Verified
diff --git a/src/core/osmo_io.c b/src/core/osmo_io.c
index 46885aa..bfc75c6 100644
--- a/src/core/osmo_io.c
+++ b/src/core/osmo_io.c
@@ -417,42 +417,74 @@
*/
void iofd_handle_send_completion(struct osmo_io_fd *iofd, int rc, struct iofd_msghdr
*msghdr)
{
- struct msgb *msg = msghdr->msg[0];
+ int idx, i;
- /* Incomplete write */
- if (rc > 0 && rc < msgb_length(msg)) {
- /* Re-enqueue remaining data */
- msgb_pull(msg, rc);
- msghdr->iov[0].iov_len = msgb_length(msg);
- iofd_txqueue_enqueue_front(iofd, msghdr);
- return;
- }
-
- /* Reenqueue the complete msgb */
+ /* Re-enqueue the complete msgb. */
if (rc == -EAGAIN) {
iofd_txqueue_enqueue_front(iofd, msghdr);
return;
}
- /* All other failure and success cases are handled here */
- switch (msghdr->action) {
- case IOFD_ACT_WRITE:
- if (iofd->io_ops.write_cb)
- iofd->io_ops.write_cb(iofd, rc, msg);
- break;
- case IOFD_ACT_SENDTO:
- if (iofd->io_ops.sendto_cb)
- iofd->io_ops.sendto_cb(iofd, rc, msg, &msghdr->osa);
- break;
- case IOFD_ACT_SENDMSG:
- if (iofd->io_ops.sendmsg_cb)
- iofd->io_ops.sendmsg_cb(iofd, rc, msg);
- break;
- default:
- OSMO_ASSERT(0);
- }
+ for (idx = 0; idx < msghdr->io_len; idx++) {
+ struct msgb *msg = msghdr->msg[idx];
+ int chunk;
- msgb_free(msghdr->msg[0]);
+ /* Incomplete write */
+ if (rc > 0 && rc < msgb_length(msg)) {
+ /* Keep msg with unsent data only. */
+ msgb_pull(msg, rc);
+ msghdr->iov[idx].iov_len = msgb_length(msg);
+ /* Shift all existing buffers down. */
+ if (idx) {
+ msghdr->io_len -= idx;
+ for (i = 0; i < msghdr->io_len; i++) {
+ msghdr->iov[i] = msghdr->iov[idx + i];
+ msghdr->msg[i] = msghdr->msg[idx + i];
+ }
+ for (i = 0; i < idx; i++) {
+ memset(&msghdr->iov[msghdr->io_len + i], 0, sizeof(struct iovec));
+ msghdr->msg[msghdr->io_len + i] = NULL;
+ }
+ msghdr->hdr.msg_iovlen = msghdr->io_len;
+ }
+ /* Re-enqueue remaining buffers. */
+ iofd_txqueue_enqueue_front(iofd, msghdr);
+ return;
+ }
+
+ if (rc >= 0) {
+ chunk = msgb_length(msg);
+ if (rc < chunk)
+ chunk = rc;
+ } else {
+ chunk = rc;
+ }
+
+ /* All other failure and success cases are handled here */
+ switch (msghdr->action) {
+ case IOFD_ACT_WRITE:
+ if (iofd->io_ops.write_cb)
+ iofd->io_ops.write_cb(iofd, chunk, msg);
+ break;
+ case IOFD_ACT_SENDTO:
+ if (iofd->io_ops.sendto_cb)
+ iofd->io_ops.sendto_cb(iofd, chunk, msg, &msghdr->osa);
+ break;
+ case IOFD_ACT_SENDMSG:
+ if (iofd->io_ops.sendmsg_cb)
+ iofd->io_ops.sendmsg_cb(iofd, chunk, msg);
+ break;
+ default:
+ OSMO_ASSERT(0);
+ }
+
+ msgb_free(msghdr->msg[idx]);
+ msghdr->msg[idx] = NULL;
+
+ /* The user can unregister/close the iofd during callback above. */
+ if (!IOFD_FLAG_ISSET(iofd, IOFD_FLAG_FD_REGISTERED))
+ break;
+ }
iofd_msghdr_free(msghdr);
}
@@ -475,6 +507,8 @@
int osmo_iofd_write_msgb(struct osmo_io_fd *iofd, struct msgb *msg)
{
int rc;
+ struct iofd_msghdr *msghdr;
+ int idx;
if (OSMO_UNLIKELY(msgb_length(msg) == 0)) {
LOGPIO(iofd, LOGL_ERROR, "Length is 0, rejecting msgb.\n");
@@ -483,21 +517,35 @@
OSMO_ASSERT(iofd->mode == OSMO_IO_FD_MODE_READ_WRITE);
- struct iofd_msghdr *msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_WRITE, msg, 0);
- if (!msghdr)
- return -ENOMEM;
+ /* Always try to add msg to last msghdr. Only if it is completely filled, allocate a new
msghdr.
+ * This way all the previous meghdrs in the queue are completely filled. */
+ msghdr = llist_last_entry_or_null(&iofd->tx_queue.msg_queue, struct iofd_msghdr,
list);
+ if (msghdr && msghdr->io_len < iofd->io_write_buffers) {
+ /* Add msg to existing msghdr. */
+ msghdr->msg[msghdr->io_len++] = msg;
+ } else {
+ /* Create new msghdr with msg. */
+ msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_WRITE, msg, 0);
+ if (!msghdr)
+ return -ENOMEM;
+ msghdr->hdr.msg_iov = &msghdr->iov[0];
+ msghdr->flags = MSG_NOSIGNAL;
+ }
- msghdr->flags = MSG_NOSIGNAL;
- msghdr->iov[0].iov_base = msgb_data(msghdr->msg[0]);
- msghdr->iov[0].iov_len = msgb_length(msghdr->msg[0]);
- msghdr->hdr.msg_iov = &msghdr->iov[0];
- msghdr->hdr.msg_iovlen = 1;
+ /* Add set IO vector to msg. */
+ idx = msghdr->io_len - 1;
+ msghdr->iov[idx].iov_base = msgb_data(msg);
+ msghdr->iov[idx].iov_len = msgb_length(msg);
+ msghdr->hdr.msg_iovlen = msghdr->io_len;
- rc = iofd_txqueue_enqueue(iofd, msghdr);
- if (rc < 0) {
- iofd_msghdr_free(msghdr);
- LOGPIO(iofd, LOGL_ERROR, "enqueueing message failed (%d). Rejecting msgb\n",
rc);
- return rc;
+ /* Only new msghdr will be enqueued. */
+ if (msghdr->io_len == 1) {
+ rc = iofd_txqueue_enqueue(iofd, msghdr);
+ if (rc < 0) {
+ iofd_msghdr_free(msghdr);
+ LOGPIO(iofd, LOGL_ERROR, "enqueueing message failed (%d). Rejecting msgb\n",
rc);
+ return rc;
+ }
}
return 0;
@@ -884,7 +932,10 @@
{
struct iofd_msghdr *hdr;
while ((hdr = iofd_txqueue_dequeue(iofd))) {
- msgb_free(hdr->msg[0]);
+ for (int idx = 0; idx < hdr->io_len; idx++) {
+ msgb_free(hdr->msg[idx]);
+ hdr->msg[idx] = NULL;
+ }
iofd_msghdr_free(hdr);
}
}
diff --git a/src/core/osmo_io_uring.c b/src/core/osmo_io_uring.c
index 754a39a..018bdab 100644
--- a/src/core/osmo_io_uring.c
+++ b/src/core/osmo_io_uring.c
@@ -262,7 +262,10 @@
iofd->u.uring.write_msghdr = NULL;
if (OSMO_UNLIKELY(IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))) {
- msgb_free(msghdr->msg[0]);
+ for (int idx = 0; idx < msghdr->io_len; idx++) {
+ msgb_free(msghdr->msg[idx]);
+ msghdr->msg[idx] = NULL;
+ }
iofd_msghdr_free(msghdr);
} else {
iofd_handle_send_completion(iofd, rc, msghdr);
@@ -351,7 +354,7 @@
switch (msghdr->action) {
case IOFD_ACT_WRITE:
- io_uring_prep_writev(sqe, msghdr->iofd->fd, msghdr->iov, 1, -1);
+ io_uring_prep_writev(sqe, msghdr->iofd->fd, msghdr->iov, msghdr->io_len,
-1);
break;
case IOFD_ACT_SENDTO:
case IOFD_ACT_SENDMSG:
@@ -458,7 +461,10 @@
LOGPIO(iofd, LOGL_DEBUG, "Cancelling write\n");
iofd->u.uring.write_msghdr = NULL;
talloc_steal(OTC_GLOBAL, msghdr);
- msgb_free(msghdr->msg[0]);
+ for (int idx = 0; idx < msghdr->io_len; idx++) {
+ msgb_free(msghdr->msg[idx]);
+ msghdr->msg[idx] = NULL;
+ }
msghdr->iofd = NULL;
io_uring_prep_cancel(sqe, msghdr, 0);
}
--
To view, visit
https://gerrit.osmocom.org/c/libosmocore/+/40493?usp=email
To unsubscribe, or for help writing mail filters, visit
https://gerrit.osmocom.org/settings?usp=email
Gerrit-MessageType: merged
Gerrit-Project: libosmocore
Gerrit-Branch: master
Gerrit-Change-Id: I8c4e0a785cf66becd7fb5b2caf718c9724b56686
Gerrit-Change-Number: 40493
Gerrit-PatchSet: 14
Gerrit-Owner: jolly <andreas(a)eversberg.eu>
Gerrit-Reviewer: Jenkins Builder
Gerrit-Reviewer: laforge <laforge(a)osmocom.org>
Gerrit-Reviewer: pespin <pespin(a)sysmocom.de>