laforge has submitted this change. ( https://gerrit.osmocom.org/c/libosmocore/+/40760?usp=email )
Change subject: osmo-io: Put together message buffers when dequeued from tx queue ......................................................................
osmo-io: Put together message buffers when dequeued from tx queue
Write operations may be incomplete. osmo-io processs will remove complete message buffers after a write operation from msghdr and put the msghdr with the remaining buffers back into tx_queue.
If the user requests multiple messages buffers per write operation, the msghdr of an incomplete write may have less message buffers than the user requested. The remaining buffers are buffers are taken from next msghdr in the queue, if exists.
Change-Id: I97c366211dd266fd58ec252890ec017d6d334534 --- M src/core/osmo_io.c 1 file changed, 36 insertions(+), 5 deletions(-)
Approvals: pespin: Looks good to me, approved laforge: Looks good to me, but someone else must approve Jenkins Builder: Verified
diff --git a/src/core/osmo_io.c b/src/core/osmo_io.c index bfc75c6..62d2362 100644 --- a/src/core/osmo_io.c +++ b/src/core/osmo_io.c @@ -247,21 +247,52 @@ */ struct iofd_msghdr *iofd_txqueue_dequeue(struct osmo_io_fd *iofd) { - struct llist_head *lh; + struct iofd_msghdr *msghdr;
if (iofd->tx_queue.current_length == 0) return NULL;
- lh = iofd->tx_queue.msg_queue.next; + msghdr = llist_first_entry_or_null(&iofd->tx_queue.msg_queue, struct iofd_msghdr, list);
- OSMO_ASSERT(lh); + OSMO_ASSERT(msghdr); iofd->tx_queue.current_length--; - llist_del(lh); + llist_del(&msghdr->list); + + /* Fill up empty buffers in dequeued msghdr with buffers from the next msghdr. + * There can be empty buffers, when a msghdr is queued to the front with incomplete write. */ + while (OSMO_UNLIKELY(msghdr->io_len < iofd->io_write_buffers)) { + struct iofd_msghdr *next; + int i; + + if (iofd->tx_queue.current_length == 0) + break; + next = llist_first_entry_or_null(&iofd->tx_queue.msg_queue, struct iofd_msghdr, list); + OSMO_ASSERT(next->io_len > 0); + /* Get first message buffer from next msghdr and store them in the dequeued one. */ + msghdr->iov[msghdr->io_len] = next->iov[0]; + msghdr->msg[msghdr->io_len] = next->msg[0]; + msghdr->hdr.msg_iovlen = ++msghdr->io_len; + /* Remove the message buffer from the next msghdr and free, if empty. */ + next->io_len--; + for (i = 0; i < next->io_len; i++) { + next->iov[i] = next->iov[i + 1]; + next->msg[i] = next->msg[i + 1]; + } + if (next->io_len == 0) { + iofd->tx_queue.current_length--; + llist_del(&next->list); + iofd_msghdr_free(next); + } else { + memset(&next->iov[next->io_len], 0, sizeof(struct iovec)); + next->msg[next->io_len] = NULL; + next->hdr.msg_iovlen = --next->io_len; + } + }
if (iofd->tx_queue.current_length == 0) osmo_iofd_ops.write_disable(iofd);
- return llist_entry(lh, struct iofd_msghdr, list); + return msghdr; }
/*! Handle segmentation of the msg. If this function returns *_HANDLE_ONE or MORE then the data in msg will contain