pespin has submitted this change. ( https://gerrit.osmocom.org/c/libosmocore/+/39363?usp=email )
Change subject: osmo_io: Support writing to files with mode OSMO_IO_FD_MODE_READ_WRITE ......................................................................
osmo_io: Support writing to files with mode OSMO_IO_FD_MODE_READ_WRITE
Trying to use IORING_OP_SENDMSG on local files fails with "Socket operation on non-socket" errors.
Use io_uring vectorized (readv, writev) since those are available since kernel v5.1, while non-vectorized ones (read, write) were only added later in kernel v5.10.
Change-Id: Iefcbc7d09f429f4ecc611227cb5ef796f50c0539 --- M src/core/osmo_io_poll.c M src/core/osmo_io_uring.c M tests/osmo_io/osmo_io_test.c M tests/osmo_io/osmo_io_test.ok 4 files changed, 138 insertions(+), 24 deletions(-)
Approvals: osmith: Looks good to me, approved laforge: Looks good to me, but someone else must approve Jenkins Builder: Verified
diff --git a/src/core/osmo_io_poll.c b/src/core/osmo_io_poll.c index bf0291e..8782adf 100644 --- a/src/core/osmo_io_poll.c +++ b/src/core/osmo_io_poll.c @@ -56,25 +56,36 @@ OSMO_ASSERT(0); }
- hdr.msg = msg; - hdr.iov[0].iov_base = msg->tail; - hdr.iov[0].iov_len = msgb_tailroom(msg); - hdr.hdr = (struct msghdr) { - .msg_iov = &hdr.iov[0], - .msg_iovlen = 1, - .msg_name = &hdr.osa.u.sa, - .msg_namelen = sizeof(struct osmo_sockaddr), - }; - if (iofd->mode == OSMO_IO_FD_MODE_RECVMSG_SENDMSG) { - hdr.hdr.msg_control = alloca(iofd->cmsg_size); - hdr.hdr.msg_controllen = iofd->cmsg_size; + switch (iofd->mode) { + case OSMO_IO_FD_MODE_READ_WRITE: + rc = read(ofd->fd, msg->tail, msgb_tailroom(msg)); + if (rc > 0) + msgb_put(msg, rc); + iofd_handle_recv(iofd, msg, (rc < 0 && errno > 0) ? -errno : rc, NULL); + break; + case OSMO_IO_FD_MODE_RECVFROM_SENDTO: + case OSMO_IO_FD_MODE_RECVMSG_SENDMSG: + hdr.msg = msg; + hdr.iov[0].iov_base = msg->tail; + hdr.iov[0].iov_len = msgb_tailroom(msg); + hdr.hdr = (struct msghdr) { + .msg_iov = &hdr.iov[0], + .msg_iovlen = 1, + .msg_name = &hdr.osa.u.sa, + .msg_namelen = sizeof(struct osmo_sockaddr), + }; + if (iofd->mode == OSMO_IO_FD_MODE_RECVMSG_SENDMSG) { + hdr.hdr.msg_control = alloca(iofd->cmsg_size); + hdr.hdr.msg_controllen = iofd->cmsg_size; + } + rc = recvmsg(ofd->fd, &hdr.hdr, flags); + if (rc > 0) + msgb_put(msg, rc); + iofd_handle_recv(iofd, msg, (rc < 0 && errno > 0) ? -errno : rc, &hdr); + break; + default: + OSMO_ASSERT(0); } - - rc = recvmsg(ofd->fd, &hdr.hdr, flags); - if (rc > 0) - msgb_put(msg, rc); - - iofd_handle_recv(iofd, msg, (rc < 0 && errno > 0) ? -errno : rc, &hdr); }
if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED)) @@ -83,7 +94,17 @@ if (what & OSMO_FD_WRITE) { struct iofd_msghdr *msghdr = iofd_txqueue_dequeue(iofd); if (msghdr) { - rc = sendmsg(ofd->fd, &msghdr->hdr, msghdr->flags); + switch (iofd->mode) { + case OSMO_IO_FD_MODE_READ_WRITE: + rc = write(ofd->fd, msghdr->iov[0].iov_base, msghdr->iov[0].iov_len); + break; + case OSMO_IO_FD_MODE_RECVFROM_SENDTO: + case OSMO_IO_FD_MODE_RECVMSG_SENDMSG: + rc = sendmsg(ofd->fd, &msghdr->hdr, msghdr->flags); + break; + default: + OSMO_ASSERT(0); + } iofd_handle_send_completion(iofd, (rc < 0 && errno > 0) ? -errno : rc, msghdr); } else { /* Socket is writable, but we have no data to send. A non-blocking/async diff --git a/src/core/osmo_io_uring.c b/src/core/osmo_io_uring.c index 72a465e..b8b240f 100644 --- a/src/core/osmo_io_uring.c +++ b/src/core/osmo_io_uring.c @@ -140,17 +140,17 @@ msghdr->iov[0].iov_len = msgb_tailroom(msg);
switch (action) { - case IOFD_ACT_READ: - break; case IOFD_ACT_RECVMSG: msghdr->hdr.msg_control = msghdr->cmsg; msghdr->hdr.msg_controllen = iofd->cmsg_size; /* fall-through */ case IOFD_ACT_RECVFROM: - msghdr->hdr.msg_iov = &msghdr->iov[0]; - msghdr->hdr.msg_iovlen = 1; msghdr->hdr.msg_name = &msghdr->osa.u.sa; msghdr->hdr.msg_namelen = osmo_sockaddr_size(&msghdr->osa); + /* fall-through */ + case IOFD_ACT_READ: + msghdr->hdr.msg_iov = &msghdr->iov[0]; + msghdr->hdr.msg_iovlen = 1; break; default: OSMO_ASSERT(0); @@ -164,7 +164,7 @@
switch (action) { case IOFD_ACT_READ: - io_uring_prep_readv(sqe, iofd->fd, msghdr->iov, 1, 0); + io_uring_prep_readv(sqe, iofd->fd, msghdr->iov, 1, -1); break; case IOFD_ACT_RECVMSG: case IOFD_ACT_RECVFROM: @@ -307,6 +307,8 @@
switch (msghdr->action) { case IOFD_ACT_WRITE: + io_uring_prep_writev(sqe, msghdr->iofd->fd, msghdr->iov, 1, -1); + break; case IOFD_ACT_SENDTO: case IOFD_ACT_SENDMSG: io_uring_prep_sendmsg(sqe, msghdr->iofd->fd, &msghdr->hdr, msghdr->flags); diff --git a/tests/osmo_io/osmo_io_test.c b/tests/osmo_io/osmo_io_test.c index 93beef4..82de564 100644 --- a/tests/osmo_io/osmo_io_test.c +++ b/tests/osmo_io/osmo_io_test.c @@ -45,6 +45,87 @@
static void *ctx = NULL;
+static unsigned file_read_wr_compl_counter = 0; +static void file_read_cb(struct osmo_io_fd *iofd, int rc, struct msgb *msg) +{ + printf("%s: read() msg with rc=%d\n", osmo_iofd_get_name(iofd), rc); + if (rc < 0) + printf("%s: error: %s\n", osmo_iofd_get_name(iofd), strerror(-rc)); + if (msg) + printf("%s\n", osmo_hexdump(msgb_data(msg), msgb_length(msg))); + file_read_wr_compl_counter++; + talloc_free(msg); + if (rc == 0) + osmo_iofd_close(iofd); +} + +static void file_write_cb(struct osmo_io_fd *iofd, int rc, struct msgb *msg) +{ + printf("%s: write() returned rc=%d\n", osmo_iofd_get_name(iofd), rc); + if (rc < 0) + printf("%s: error: %s\n", osmo_iofd_get_name(iofd), strerror(-rc)); + if (msg) + printf("%s\n", osmo_hexdump(msgb_data(msg), msgb_length(msg))); + file_read_wr_compl_counter++; +} + +static void test_file(void) +{ + struct osmo_io_fd *iofd; + struct msgb *msg; + uint8_t *buf; + int fd; + int rc; + struct osmo_io_ops ioops; + + TEST_START(); + + /* Create temporary file and pass fd to iofd: */ + FILE *fp = tmpfile(); + OSMO_ASSERT(fp); + fd = fileno(fp); + + /* First test writing to the file: */ + printf("Enable write\n"); + ioops = (struct osmo_io_ops){ .write_cb = file_write_cb }; + iofd = osmo_iofd_setup(ctx, fd, "file-iofd", OSMO_IO_FD_MODE_READ_WRITE, &ioops, NULL); + osmo_iofd_register(iofd, fd); + + msg = msgb_alloc(1024, "Test data"); + buf = msgb_put(msg, sizeof(TESTDATA)); + memcpy(buf, TESTDATA, sizeof(TESTDATA)); + osmo_iofd_write_msgb(iofd, msg); + /* Allow enough cycles to handle the messages */ + for (int i = 0; i < 128; i++) { + OSMO_ASSERT(file_read_wr_compl_counter <= 1); + if (file_read_wr_compl_counter == 1) + break; + osmo_select_main(1); + } + OSMO_ASSERT(file_read_wr_compl_counter == 1); + + /* Now, re-configure iofd to only read from the fd. Adjust the read/write offset beforehand: */ + printf("Enable read\n"); + rc = lseek(fd, 0, SEEK_SET); + OSMO_ASSERT(rc == 0); + ioops = (struct osmo_io_ops){ .read_cb = file_read_cb }; + rc = osmo_iofd_set_ioops(iofd, &ioops); + OSMO_ASSERT(rc == 0); + /* Allow enough cycles to handle the message. We expect 2 reads, 2nd read will return 0. */ + for (int i = 0; i < 128; i++) { + OSMO_ASSERT(file_read_wr_compl_counter <= 3); + if (file_read_wr_compl_counter == 3) + break; + osmo_select_main(1); + } + OSMO_ASSERT(file_read_wr_compl_counter == 3); + + osmo_iofd_free(iofd); + + for (int i = 0; i < 128; i++) + osmo_select_main(1); +} + static void read_cb(struct osmo_io_fd *iofd, int rc, struct msgb *msg) { printf("%s: read() msg with len=%d\n", osmo_iofd_get_name(iofd), rc); @@ -171,6 +252,7 @@ log_set_print_category(osmo_stderr_target, 0); log_set_print_category_hex(osmo_stderr_target, 0);
+ test_file(); test_connected(); test_unconnected();
diff --git a/tests/osmo_io/osmo_io_test.ok b/tests/osmo_io/osmo_io_test.ok index 6527c19..6b9e591 100644 --- a/tests/osmo_io/osmo_io_test.ok +++ b/tests/osmo_io/osmo_io_test.ok @@ -1,3 +1,12 @@ +Running test_file +Enable write +file-iofd: write() returned rc=16 +01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 10 +Enable read +file-iofd: read() msg with rc=16 +01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 10 +file-iofd: read() msg with rc=0 + Running test_connected ep1: write() returned rc=0 ep1: write() returned rc=16