laforge has submitted this change. ( https://gerrit.osmocom.org/c/libosmocore/+/35078?usp=email )
Change subject: osmo_io: Factor out and use common send function from backend ......................................................................
osmo_io: Factor out and use common send function from backend
This handles reenqueuing a message on EAGAIN and incomplete write
Change-Id: I6da2653d32aedd0e7872be0cf90a841b56462e59 --- M src/core/osmo_io.c M src/core/osmo_io_internal.h M src/core/osmo_io_poll.c M src/core/osmo_io_uring.c 4 files changed, 58 insertions(+), 60 deletions(-)
Approvals: Jenkins Builder: Verified laforge: Looks good to me, approved pespin: Looks good to me, but someone else must approve
diff --git a/src/core/osmo_io.c b/src/core/osmo_io.c index 8507f46..e059f87 100644 --- a/src/core/osmo_io.c +++ b/src/core/osmo_io.c @@ -344,6 +344,46 @@ } }
+/*! completion handler: Calld by osmo_io backend after a given I/O operation has completed + * \param[in] iofd I/O file-descriptor on which I/O has completed + * \param[in] rc return value of the I/O operation + * \param[in] msghdr serialized msghdr containing state of completed I/O + */ +void iofd_handle_send_completion(struct osmo_io_fd *iofd, int rc, struct iofd_msghdr *msghdr) +{ + struct msgb *msg = msghdr->msg; + + /* Incomplete write */ + if (rc > 0 && rc < msgb_length(msg)) { + /* Re-enqueue remaining data */ + msgb_pull(msg, rc); + msghdr->iov[0].iov_len = msgb_length(msg); + iofd_txqueue_enqueue_front(iofd, msghdr); + return; + } + + /* Reenqueue the complete msgb */ + if (rc == -EAGAIN) { + iofd_txqueue_enqueue_front(iofd, msghdr); + return; + } + + /* All other failure and success cases are handled here */ + switch (msghdr->action) { + case IOFD_ACT_WRITE: + iofd->io_ops.write_cb(iofd, rc, msg); + break; + case IOFD_ACT_SENDTO: + iofd->io_ops.sendto_cb(iofd, rc, msg, &msghdr->osa); + break; + default: + OSMO_ASSERT(0); + } + + msgb_free(msghdr->msg); + iofd_msghdr_free(msghdr); +} + /* Public functions */
/*! Send a message through a connected socket. diff --git a/src/core/osmo_io_internal.h b/src/core/osmo_io_internal.h index 73a81e1..e8f4ea2 100644 --- a/src/core/osmo_io_internal.h +++ b/src/core/osmo_io_internal.h @@ -146,6 +146,7 @@ struct msgb *iofd_msgb_pending_or_alloc(struct osmo_io_fd *iofd);
void iofd_handle_recv(struct osmo_io_fd *iofd, struct msgb *msg, int rc, struct iofd_msghdr *msghdr); +void iofd_handle_send_completion(struct osmo_io_fd *iofd, int rc, struct iofd_msghdr *msghdr); void iofd_handle_segmented_read(struct osmo_io_fd *iofd, struct msgb *msg, int rc);
int iofd_txqueue_enqueue(struct osmo_io_fd *iofd, struct iofd_msghdr *msghdr); diff --git a/src/core/osmo_io_poll.c b/src/core/osmo_io_poll.c index 2c1e422..5000dca 100644 --- a/src/core/osmo_io_poll.c +++ b/src/core/osmo_io_poll.c @@ -78,33 +78,8 @@ if (what & OSMO_FD_WRITE) { struct iofd_msghdr *msghdr = iofd_txqueue_dequeue(iofd); if (msghdr) { - msg = msghdr->msg; - rc = sendmsg(ofd->fd, &msghdr->hdr, msghdr->flags); - if (rc > 0 && rc < msgb_length(msg)) { - msgb_pull(msg, rc); - iofd_txqueue_enqueue_front(iofd, msghdr); - return; - } - if (rc == -EAGAIN) { - iofd_txqueue_enqueue_front(iofd, msghdr); - return; - } - - switch (iofd->mode) { - case OSMO_IO_FD_MODE_READ_WRITE: - iofd->io_ops.write_cb(iofd, rc, msg); - break; - case OSMO_IO_FD_MODE_RECVFROM_SENDTO: - iofd->io_ops.sendto_cb(iofd, rc, msg, &msghdr->osa); - break; - case OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND: - OSMO_ASSERT(false); - break; - } - - talloc_free(msghdr); - msgb_free(msg); + iofd_handle_send_completion(iofd, rc, msghdr); } else { if (iofd->mode == OSMO_IO_FD_MODE_READ_WRITE) /* Socket is writable, but we have no data to send. A non-blocking/async diff --git a/src/core/osmo_io_uring.c b/src/core/osmo_io_uring.c index abeea79..1aa17d4 100644 --- a/src/core/osmo_io_uring.c +++ b/src/core/osmo_io_uring.c @@ -186,43 +186,14 @@ static void iofd_uring_handle_tx(struct iofd_msghdr *msghdr, int rc) { struct osmo_io_fd *iofd = msghdr->iofd; - struct msgb *msg = msghdr->msg;
- if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED)) - goto out_free; - - /* Error during write */ - if (rc < 0) { - if (msghdr->action == IOFD_ACT_WRITE) - iofd->io_ops.write_cb(iofd, rc, msg); - else if (msghdr->action == IOFD_ACT_SENDTO) - iofd->io_ops.sendto_cb(iofd, rc, msg, &msghdr->osa); - else - OSMO_ASSERT(0); - goto out_free; + if (OSMO_UNLIKELY(IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))) { + msgb_free(msghdr->msg); + iofd_msghdr_free(msghdr); + } else { + iofd_handle_send_completion(iofd, rc, msghdr); }
- /* Incomplete write */ - if (rc < msgb_length(msg)) { - /* Re-enqueue remaining data */ - msgb_pull(msg, rc); - msghdr->iov[0].iov_len = msgb_length(msg); - iofd_txqueue_enqueue_front(iofd, msghdr); - goto out; - } - - if (msghdr->action == IOFD_ACT_WRITE) - iofd->io_ops.write_cb(iofd, rc, msg); - else if (msghdr->action == IOFD_ACT_SENDTO) - iofd->io_ops.sendto_cb(iofd, rc, msg, &msghdr->osa); - else - OSMO_ASSERT(0); - -out_free: - msgb_free(msghdr->msg); - iofd_msghdr_free(msghdr); - -out: iofd->u.uring.write_msghdr = NULL; /* submit the next to-be-transmitted message for this file descriptor */ if (iofd->u.uring.write_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))