pespin has uploaded this change for review. ( https://gerrit.osmocom.org/c/osmo-pcap/+/39367?usp=email )
Change subject: server: Implement non-blocking write to pcap file with osmo_io ......................................................................
server: Implement non-blocking write to pcap file with osmo_io
Actual zero-copy msgb passing from read tcp socket will be implemented in follow-up patches.
Change-Id: I098a9455a2a4cc626444e6fc13aa88c4cc9694f0 Related: SYS#7080 --- M include/osmo-pcap/osmo_pcap_server.h M src/osmo_pcap_wr_file.c M src/osmo_server_core.c 3 files changed, 171 insertions(+), 39 deletions(-)
git pull ssh://gerrit.osmocom.org:29418/osmo-pcap refs/changes/67/39367/1
diff --git a/include/osmo-pcap/osmo_pcap_server.h b/include/osmo-pcap/osmo_pcap_server.h index 9b93888..e71096b 100644 --- a/include/osmo-pcap/osmo_pcap_server.h +++ b/include/osmo-pcap/osmo_pcap_server.h @@ -29,6 +29,7 @@
#include <pcap.h>
+#include <osmocom/core/osmo_io.h> #include <osmocom/core/select.h> #include <osmocom/core/linuxlist.h> #include <osmocom/core/write_queue.h> @@ -73,20 +74,30 @@ SERVER_CTR_NOCLIENT, };
+struct osmo_pcap_wr_file; +typedef void (*osmo_pcap_wr_file_flush_completed_cb_t)(struct osmo_pcap_wr_file *wrf, void *data); struct osmo_pcap_wr_file { + struct llist_head entry; /* entry into (osmo_pcap_conn)->wrf_flushing_list */ void *data; /* user backpointer */ /* canonicalized absolute pathname of pcap file we write to */ char *filename; /* file descriptor of the file we write to */ - int local_fd; + struct osmo_io_fd *local_iofd; /* Current write offset of the file we write to (local_fd) */ off_t wr_offset; + /* Number of bytes confirmed to be written, <=wr_offset */ + off_t wr_completed; + osmo_pcap_wr_file_flush_completed_cb_t flush_completed_cb; }; struct osmo_pcap_wr_file *osmo_pcap_wr_file_alloc(void *ctx, void *data); void osmo_pcap_wr_file_free(struct osmo_pcap_wr_file *wrf); +void osmo_pcap_wr_file_set_flush_completed_cb(struct osmo_pcap_wr_file *wrf, osmo_pcap_wr_file_flush_completed_cb_t flush_completed_cb); int osmo_pcap_wr_file_open(struct osmo_pcap_wr_file *wrf, const char *filename, mode_t mode); void osmo_pcap_wr_file_close(struct osmo_pcap_wr_file *wrf); -int osmo_pcap_wr_file_write(struct osmo_pcap_wr_file *wrf, const uint8_t *data, size_t len); +int osmo_pcap_wr_file_write_msgb(struct osmo_pcap_wr_file *wrf, struct msgb *msg); +bool osmo_pcap_wr_file_has_pending_writes(const struct osmo_pcap_wr_file *wrf); +int osmo_pcap_wr_file_flush(struct osmo_pcap_wr_file *wrf, struct llist_head *wrf_flushing_list); +bool osmo_pcap_wr_file_is_flushing(const struct osmo_pcap_wr_file *wrf); void osmo_pcap_wr_file_move_to_dir(struct osmo_pcap_wr_file *wrf, const char *dst_dirpath);
struct osmo_pcap_conn { @@ -103,6 +114,9 @@ /* Remote connection */ struct osmo_stream_srv *srv; struct osmo_pcap_wr_file *wrf; + /* list of osmo_pcap_wr_file->entry. + * wrf which we want to close but still have pending writes to be completed */ + struct llist_head wrf_flushing_list;
/* pcap stuff */ enum osmo_pcap_fmt file_fmt; diff --git a/src/osmo_pcap_wr_file.c b/src/osmo_pcap_wr_file.c index a66e5e2..418a129 100644 --- a/src/osmo_pcap_wr_file.c +++ b/src/osmo_pcap_wr_file.c @@ -1,5 +1,5 @@ /* - * Write to a file + * Asynchronous non-blocking write to a file * * (C) 2025 by sysmocom - s.f.m.c. GmbH info@sysmocom.de * All Rights Reserved @@ -32,14 +32,49 @@ #include <osmo-pcap/common.h> #include <osmo-pcap/osmo_pcap_server.h>
+static void local_iofd_write_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg) +{ + struct osmo_pcap_wr_file *wrf = osmo_iofd_get_data(iofd); + + LOGP(DSERVER, LOGL_DEBUG, "%s: write_cb(%s, %d)\n", osmo_iofd_get_name(iofd), wrf->filename, res); + if (res <= 0) { + LOGP(DSERVER, LOGL_ERROR, "%s: Failed writing to file path='%s' fd=%d: %s (%d)\n", + osmo_iofd_get_name(iofd), wrf->filename, osmo_iofd_get_fd(iofd), strerror(-res), res); + /* Trigger cb to tell user to free it, even if it was not being flushed. + * Special attention must be kept at the user regarding this code path, ie. + * user can't assume the werf was actually in flushing state... + */ + if (wrf->flush_completed_cb) + wrf->flush_completed_cb(wrf, wrf->data); + /* wrf may be freed here. */ + return; + } + + wrf->wr_completed += res; + + if (osmo_pcap_wr_file_is_flushing(wrf)) { + if (!osmo_pcap_wr_file_has_pending_writes(wrf)) { + LOGP(DSERVER, LOGL_DEBUG, "%s: closing now after completed data write path='%s' fd=%d\n", + osmo_iofd_get_name(iofd), wrf->filename, osmo_iofd_get_fd(iofd)); + if (wrf->flush_completed_cb) + wrf->flush_completed_cb(wrf, wrf->data); + /* wrf may be freed here. */ + return; + } + } +} + struct osmo_pcap_wr_file *osmo_pcap_wr_file_alloc(void *ctx, void *data) { struct osmo_pcap_wr_file *wrf = talloc_zero(ctx, struct osmo_pcap_wr_file); OSMO_ASSERT(wrf);
+ /* Initialize entry so that we can know whether we are included in a + * list in osmo_pcap_wr_file_is_flushing(): */ + INIT_LLIST_HEAD(&wrf->entry); wrf->data = data; - wrf->local_fd = -1; wrf->wr_offset = 0; + wrf->wr_completed = 0;
return wrf; } @@ -49,22 +84,43 @@ if (!wrf) return; osmo_pcap_wr_file_close(wrf); + if (osmo_pcap_wr_file_is_flushing(wrf)) + llist_del(&wrf->entry); talloc_free(wrf); }
+void osmo_pcap_wr_file_set_flush_completed_cb(struct osmo_pcap_wr_file *wrf, osmo_pcap_wr_file_flush_completed_cb_t flush_completed_cb) +{ + wrf->flush_completed_cb = flush_completed_cb; +} + int osmo_pcap_wr_file_open(struct osmo_pcap_wr_file *wrf, const char *filename, mode_t mode) { + struct osmo_io_ops ioops = { + .read_cb = NULL, + .write_cb = local_iofd_write_cb, + }; int rc; OSMO_ASSERT(filename); - OSMO_ASSERT(wrf->local_fd == -1); + OSMO_ASSERT(wrf->local_iofd == NULL);
- rc = open(filename, O_CREAT|O_WRONLY|O_TRUNC, mode); + rc = open(filename, O_CREAT|O_WRONLY|O_TRUNC|O_NONBLOCK, mode); if (rc < 0) { LOGP(DSERVER, LOGL_ERROR, "Failed to open file '%s': %s\n", filename, strerror(errno)); return rc; } - wrf->local_fd = rc; + + wrf->local_iofd = osmo_iofd_setup(wrf, rc, filename, OSMO_IO_FD_MODE_READ_WRITE, + &ioops, wrf); + if (!wrf->local_iofd) + return -EBADFD; + if (osmo_iofd_register(wrf->local_iofd, -1) < 0) { + osmo_iofd_free(wrf->local_iofd); + wrf->local_iofd = NULL; + return -ENAVAIL; + } + wrf->filename = talloc_strdup(wrf, filename); OSMO_ASSERT(wrf->filename); return rc; @@ -72,26 +128,55 @@
void osmo_pcap_wr_file_close(struct osmo_pcap_wr_file *wrf) { - if (wrf->local_fd > 0) { - close(wrf->local_fd); - wrf->local_fd = -1; - } + osmo_iofd_free(wrf->local_iofd); + wrf->local_iofd = NULL; }
-int osmo_pcap_wr_file_write(struct osmo_pcap_wr_file *wrf, const uint8_t *data, size_t len) +int osmo_pcap_wr_file_write_msgb(struct osmo_pcap_wr_file *wrf, struct msgb *msg) { - int rc = write(wrf->local_fd, data, len); - if (rc >= 0) { - wrf->wr_offset += rc; - if (rc != len) { - LOGP(DSERVER, LOGL_ERROR, "Short write '%s': ret %d != %zu\n", - wrf->filename, rc, len); - return -1; - } - } + int rc = osmo_iofd_write_msgb(wrf->local_iofd, msg); + if (rc < 0) + return rc; + wrf->wr_offset += msgb_length(msg); return rc; }
+bool osmo_pcap_wr_file_has_pending_writes(const struct osmo_pcap_wr_file *wrf) +{ + return wrf->wr_completed < wrf->wr_offset; +} + +/* Mark the wrf as done writing to it. It will be closed and freed + * asynchronously when all data has been written to it. + * wrf may be freed during the call to this function, so don't use it anymore. */ +int osmo_pcap_wr_file_flush(struct osmo_pcap_wr_file *wrf, struct llist_head *wrf_flushing_list) +{ + if (osmo_pcap_wr_file_is_flushing(wrf)) { + LOGP(DSERVER, LOGL_ERROR, "Trying to flush a file which was already being flushed: '%s'\n", + wrf->filename); + return -EINVAL; + } + + if (!osmo_pcap_wr_file_has_pending_writes(wrf)) { + if (wrf->flush_completed_cb) + wrf->flush_completed_cb(wrf, wrf->data); + /* wrf may be freed here. */ + return 0; + } + + /* Put it in the flushing list, it will be closed freed once pending writes complete. */ + llist_add_tail(&wrf->entry, wrf_flushing_list); + return 0; +} + +/* whether we finished pushing more data to the wrf and we are waiting for it to + * finish writing before closing: + */ +bool osmo_pcap_wr_file_is_flushing(const struct osmo_pcap_wr_file *wrf) +{ + return !llist_empty(&wrf->entry); +} + /* Move file from current dir to dst_dirpath, and updates wrf->filename to point to new location. */ void osmo_pcap_wr_file_move_to_dir(struct osmo_pcap_wr_file *wrf, const char *dst_dirpath) { diff --git a/src/osmo_server_core.c b/src/osmo_server_core.c index 782ead9..bc8c08c 100644 --- a/src/osmo_server_core.c +++ b/src/osmo_server_core.c @@ -116,6 +116,35 @@ 0); }
+/* wrf has written all data and can safely be closed, rotated, etc. */ +static void osmo_pcap_wr_file_flush_completed_cb(struct osmo_pcap_wr_file *wrf, void *data) +{ + struct osmo_pcap_conn *conn = data; + + if (wrf->wr_completed < wrf->wr_offset) { + LOGP(DSERVER, LOGL_NOTICE, "%s: Closing file with pending writes (%zu completed bytes < %zu wrote bytes)\n", + wrf->filename, wrf->wr_completed, wrf->wr_offset); + } + + if (!osmo_pcap_wr_file_is_flushing(wrf)) { + /* If it is not flushing, it probably is still assigned to conn; + * unassign it: */ + if (conn->wrf == wrf) + conn->wrf = NULL; + } + + osmo_pcap_wr_file_close(wrf); + + if (conn->server->completed_path) + osmo_pcap_wr_file_move_to_dir(wrf, conn->server->completed_path); + + osmo_pcap_conn_event(conn, "closingtracefile", wrf->filename); + rate_ctr_inc2(conn->ctrg, PEER_CTR_PROTATE); + rate_ctr_inc2(conn->server->ctrg, SERVER_CTR_PROTATE); + + osmo_pcap_wr_file_free(wrf); +} + static inline size_t calc_data_max_len(const struct osmo_pcap_server *server) { size_t data_max_len; @@ -168,6 +197,7 @@ return NULL; }
+ INIT_LLIST_HEAD(&conn->wrf_flushing_list); conn->data_max_len = calc_data_max_len(server); conn->data = talloc_zero_size(conn, sizeof(struct osmo_pcap_data) + conn->data_max_len); /* a bit nasty. we do not work with ids but names */ @@ -227,7 +257,11 @@
void osmo_pcap_conn_free(struct osmo_pcap_conn *conn) { + struct osmo_pcap_wr_file *wrf; osmo_pcap_conn_close(conn); + /* We are freeing, make sure all files are processed even if we may be lossing some data... */ + while ((wrf = llist_first_entry_or_null(&conn->wrf_flushing_list, struct osmo_pcap_wr_file, entry))) + osmo_pcap_wr_file_flush_completed_cb(wrf, conn); llist_del(&conn->entry); talloc_free(conn); } @@ -237,16 +271,8 @@ if (!conn->wrf) return;
- osmo_pcap_wr_file_close(conn->wrf); - - if (conn->server->completed_path) - osmo_pcap_wr_file_move_to_dir(conn->wrf, conn->server->completed_path); - - osmo_pcap_conn_event(conn, "closingtracefile", conn->wrf->filename); - rate_ctr_inc2(conn->ctrg, PEER_CTR_PROTATE); - rate_ctr_inc2(conn->server->ctrg, SERVER_CTR_PROTATE); - - osmo_pcap_wr_file_free(conn->wrf); + osmo_pcap_wr_file_flush(conn->wrf, &conn->wrf_flushing_list); + /* conn->wrf may have been freed or moved to conn->wrf_flushing_list: */ conn->wrf = NULL; }
@@ -298,9 +324,6 @@ /* omit any storing/creation of the file */ if (!conn->store) { update_last_write(conn, now); - /* TODO: Once we support async write, we'll want to schedule close here instead of freeing: */ - osmo_pcap_wr_file_free(conn->wrf); - conn->wrf = NULL; return; }
@@ -323,16 +346,21 @@ }
conn->wrf = osmo_pcap_wr_file_alloc(conn, conn); + osmo_pcap_wr_file_set_flush_completed_cb(conn->wrf, osmo_pcap_wr_file_flush_completed_cb); rc = osmo_pcap_wr_file_open(conn->wrf, curr_filename, conn->server->permission_mask); talloc_free(curr_filename); if (rc < 0) return;
- rc = osmo_pcap_wr_file_write(conn->wrf, conn->file_hdr, conn->file_hdr_len); - if (rc != conn->file_hdr_len) { + /* TODO: get msgb from conn object stored: */ + struct msgb *msg = msgb_alloc_c(conn->wrf, conn->file_hdr_len, "local_iofd_hdr"); + memcpy(msgb_put(msg, conn->file_hdr_len), conn->file_hdr, conn->file_hdr_len); + + rc = osmo_pcap_wr_file_write_msgb(conn->wrf, msg); + if (rc < 0) { LOGP(DSERVER, LOGL_ERROR, "Failed to write the header: %d\n", errno); - osmo_pcap_wr_file_free(conn->wrf); - conn->wrf = NULL; + msgb_free(msg); + osmo_pcap_conn_close_trace(conn); return; } update_last_write(conn, now); @@ -512,9 +540,14 @@ if (!check_restart_pcap_max_size(conn, len)) check_restart_pcap_localtime(conn, now);
- rc = osmo_pcap_wr_file_write(conn->wrf, data, len); + /* TODO: get msgb from caller: */ + struct msgb *msg = msgb_alloc_c(conn->wrf, len, "local_iofd_msg"); + memcpy(msgb_put(msg, len), data, len); + + rc = osmo_pcap_wr_file_write_msgb(conn->wrf, msg); if (rc < 0) { LOGP(DSERVER, LOGL_ERROR, "%s: Failed writing to file\n", conn->name); + msgb_free(msg); return -1; } update_last_write(conn, now);