pespin submitted this change.

View Change

Approvals: osmith: Looks good to me, but someone else must approve Jenkins Builder: Verified pespin: Looks good to me, approved laforge: Looks good to me, but someone else must approve
server: Implement non-blocking write to pcap file with osmo_io

Actual zero-copy msgb passing from read tcp socket will be implemented
in follow-up patches.

Change-Id: I098a9455a2a4cc626444e6fc13aa88c4cc9694f0
Related: SYS#7080
---
M include/osmo-pcap/osmo_pcap_server.h
M src/osmo_pcap_wr_file.c
M src/osmo_server_core.c
3 files changed, 173 insertions(+), 39 deletions(-)

diff --git a/include/osmo-pcap/osmo_pcap_server.h b/include/osmo-pcap/osmo_pcap_server.h
index 9b93888..e71096b 100644
--- a/include/osmo-pcap/osmo_pcap_server.h
+++ b/include/osmo-pcap/osmo_pcap_server.h
@@ -29,6 +29,7 @@

#include <pcap.h>

+#include <osmocom/core/osmo_io.h>
#include <osmocom/core/select.h>
#include <osmocom/core/linuxlist.h>
#include <osmocom/core/write_queue.h>
@@ -73,20 +74,30 @@
SERVER_CTR_NOCLIENT,
};

+struct osmo_pcap_wr_file;
+typedef void (*osmo_pcap_wr_file_flush_completed_cb_t)(struct osmo_pcap_wr_file *wrf, void *data);
struct osmo_pcap_wr_file {
+ struct llist_head entry; /* entry into (osmo_pcap_conn)->wrf_flushing_list */
void *data; /* user backpointer */
/* canonicalized absolute pathname of pcap file we write to */
char *filename;
/* file descriptor of the file we write to */
- int local_fd;
+ struct osmo_io_fd *local_iofd;
/* Current write offset of the file we write to (local_fd) */
off_t wr_offset;
+ /* Number of bytes confirmed to be written, <=wr_offset */
+ off_t wr_completed;
+ osmo_pcap_wr_file_flush_completed_cb_t flush_completed_cb;
};
struct osmo_pcap_wr_file *osmo_pcap_wr_file_alloc(void *ctx, void *data);
void osmo_pcap_wr_file_free(struct osmo_pcap_wr_file *wrf);
+void osmo_pcap_wr_file_set_flush_completed_cb(struct osmo_pcap_wr_file *wrf, osmo_pcap_wr_file_flush_completed_cb_t flush_completed_cb);
int osmo_pcap_wr_file_open(struct osmo_pcap_wr_file *wrf, const char *filename, mode_t mode);
void osmo_pcap_wr_file_close(struct osmo_pcap_wr_file *wrf);
-int osmo_pcap_wr_file_write(struct osmo_pcap_wr_file *wrf, const uint8_t *data, size_t len);
+int osmo_pcap_wr_file_write_msgb(struct osmo_pcap_wr_file *wrf, struct msgb *msg);
+bool osmo_pcap_wr_file_has_pending_writes(const struct osmo_pcap_wr_file *wrf);
+int osmo_pcap_wr_file_flush(struct osmo_pcap_wr_file *wrf, struct llist_head *wrf_flushing_list);
+bool osmo_pcap_wr_file_is_flushing(const struct osmo_pcap_wr_file *wrf);
void osmo_pcap_wr_file_move_to_dir(struct osmo_pcap_wr_file *wrf, const char *dst_dirpath);

struct osmo_pcap_conn {
@@ -103,6 +114,9 @@
/* Remote connection */
struct osmo_stream_srv *srv;
struct osmo_pcap_wr_file *wrf;
+ /* list of osmo_pcap_wr_file->entry.
+ * wrf which we want to close but still have pending writes to be completed */
+ struct llist_head wrf_flushing_list;

/* pcap stuff */
enum osmo_pcap_fmt file_fmt;
diff --git a/src/osmo_pcap_wr_file.c b/src/osmo_pcap_wr_file.c
index a66e5e2..582caa8 100644
--- a/src/osmo_pcap_wr_file.c
+++ b/src/osmo_pcap_wr_file.c
@@ -1,5 +1,5 @@
/*
- * Write to a file
+ * Asynchronous non-blocking write to a file
*
* (C) 2025 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
* All Rights Reserved
@@ -32,14 +32,51 @@
#include <osmo-pcap/common.h>
#include <osmo-pcap/osmo_pcap_server.h>

+static void local_iofd_write_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg)
+{
+ struct osmo_pcap_wr_file *wrf = osmo_iofd_get_data(iofd);
+
+ LOGP(DSERVER, LOGL_DEBUG, "%s,fd=%d,wr_completed=%zu: write_cb(res=%d)\n",
+ osmo_iofd_get_name(iofd), osmo_iofd_get_fd(iofd), (size_t)wrf->wr_completed, res);
+ if (res <= 0) {
+ LOGP(DSERVER, LOGL_ERROR, "%s,fd=%d,wr_completed=%zu: Failed writing: %s (%d)\n",
+ osmo_iofd_get_name(iofd), osmo_iofd_get_fd(iofd), (size_t)wrf->wr_completed,
+ strerror(-res), res);
+ /* Trigger cb to tell user to free it, even if it was not being flushed.
+ * Special attention must be kept at the user regarding this code path, ie.
+ * user can't assume the wrf was actually in flushing state...
+ */
+ if (wrf->flush_completed_cb)
+ wrf->flush_completed_cb(wrf, wrf->data);
+ /* wrf may be freed here. */
+ return;
+ }
+
+ wrf->wr_completed += res;
+
+ if (osmo_pcap_wr_file_is_flushing(wrf)) {
+ if (!osmo_pcap_wr_file_has_pending_writes(wrf)) {
+ LOGP(DSERVER, LOGL_DEBUG, "%s,fd=%d,wr_completed=%zu: closing now after completed data write\n",
+ osmo_iofd_get_name(iofd), osmo_iofd_get_fd(iofd), (size_t)wrf->wr_completed);
+ if (wrf->flush_completed_cb)
+ wrf->flush_completed_cb(wrf, wrf->data);
+ /* wrf may be freed here. */
+ return;
+ }
+ }
+}
+
struct osmo_pcap_wr_file *osmo_pcap_wr_file_alloc(void *ctx, void *data)
{
struct osmo_pcap_wr_file *wrf = talloc_zero(ctx, struct osmo_pcap_wr_file);
OSMO_ASSERT(wrf);

+ /* Initialize entry so that we can know whether we are included in a
+ * list in osmo_pcap_wr_file_is_flushing(): */
+ INIT_LLIST_HEAD(&wrf->entry);
wrf->data = data;
- wrf->local_fd = -1;
wrf->wr_offset = 0;
+ wrf->wr_completed = 0;

return wrf;
}
@@ -49,22 +86,43 @@
if (!wrf)
return;
osmo_pcap_wr_file_close(wrf);
+ if (osmo_pcap_wr_file_is_flushing(wrf))
+ llist_del(&wrf->entry);
talloc_free(wrf);
}

+void osmo_pcap_wr_file_set_flush_completed_cb(struct osmo_pcap_wr_file *wrf, osmo_pcap_wr_file_flush_completed_cb_t flush_completed_cb)
+{
+ wrf->flush_completed_cb = flush_completed_cb;
+}
+
int osmo_pcap_wr_file_open(struct osmo_pcap_wr_file *wrf, const char *filename, mode_t mode)
{
+ struct osmo_io_ops ioops = {
+ .read_cb = NULL,
+ .write_cb = local_iofd_write_cb,
+ };
int rc;
OSMO_ASSERT(filename);
- OSMO_ASSERT(wrf->local_fd == -1);
+ OSMO_ASSERT(wrf->local_iofd == NULL);

- rc = open(filename, O_CREAT|O_WRONLY|O_TRUNC, mode);
+ rc = open(filename, O_CREAT|O_WRONLY|O_TRUNC|O_NONBLOCK, mode);
if (rc < 0) {
LOGP(DSERVER, LOGL_ERROR, "Failed to open file '%s': %s\n",
filename, strerror(errno));
return rc;
}
- wrf->local_fd = rc;
+
+ wrf->local_iofd = osmo_iofd_setup(wrf, rc, filename, OSMO_IO_FD_MODE_READ_WRITE,
+ &ioops, wrf);
+ if (!wrf->local_iofd)
+ return -EBADFD;
+ if (osmo_iofd_register(wrf->local_iofd, -1) < 0) {
+ osmo_iofd_free(wrf->local_iofd);
+ wrf->local_iofd = NULL;
+ return -ENAVAIL;
+ }
+
wrf->filename = talloc_strdup(wrf, filename);
OSMO_ASSERT(wrf->filename);
return rc;
@@ -72,26 +130,55 @@

void osmo_pcap_wr_file_close(struct osmo_pcap_wr_file *wrf)
{
- if (wrf->local_fd > 0) {
- close(wrf->local_fd);
- wrf->local_fd = -1;
- }
+ osmo_iofd_free(wrf->local_iofd);
+ wrf->local_iofd = NULL;
}

-int osmo_pcap_wr_file_write(struct osmo_pcap_wr_file *wrf, const uint8_t *data, size_t len)
+int osmo_pcap_wr_file_write_msgb(struct osmo_pcap_wr_file *wrf, struct msgb *msg)
{
- int rc = write(wrf->local_fd, data, len);
- if (rc >= 0) {
- wrf->wr_offset += rc;
- if (rc != len) {
- LOGP(DSERVER, LOGL_ERROR, "Short write '%s': ret %d != %zu\n",
- wrf->filename, rc, len);
- return -1;
- }
- }
+ int rc = osmo_iofd_write_msgb(wrf->local_iofd, msg);
+ if (rc < 0)
+ return rc;
+ wrf->wr_offset += msgb_length(msg);
return rc;
}

+bool osmo_pcap_wr_file_has_pending_writes(const struct osmo_pcap_wr_file *wrf)
+{
+ return wrf->wr_completed < wrf->wr_offset;
+}
+
+/* Mark the wrf as done writing to it. It will be closed and freed
+ * asynchronously when all data has been written to it.
+ * wrf may be freed during the call to this function, so don't use it anymore. */
+int osmo_pcap_wr_file_flush(struct osmo_pcap_wr_file *wrf, struct llist_head *wrf_flushing_list)
+{
+ if (osmo_pcap_wr_file_is_flushing(wrf)) {
+ LOGP(DSERVER, LOGL_ERROR, "Trying to flush a file which was already being flushed: '%s'\n",
+ wrf->filename);
+ return -EINVAL;
+ }
+
+ if (!osmo_pcap_wr_file_has_pending_writes(wrf)) {
+ if (wrf->flush_completed_cb)
+ wrf->flush_completed_cb(wrf, wrf->data);
+ /* wrf may be freed here. */
+ return 0;
+ }
+
+ /* Put it in the flushing list, it will be closed freed once pending writes complete. */
+ llist_add_tail(&wrf->entry, wrf_flushing_list);
+ return 0;
+}
+
+/* whether we finished pushing more data to the wrf and we are waiting for it to
+ * finish writing before closing:
+ */
+bool osmo_pcap_wr_file_is_flushing(const struct osmo_pcap_wr_file *wrf)
+{
+ return !llist_empty(&wrf->entry);
+}
+
/* Move file from current dir to dst_dirpath, and updates wrf->filename to point to new location. */
void osmo_pcap_wr_file_move_to_dir(struct osmo_pcap_wr_file *wrf, const char *dst_dirpath)
{
diff --git a/src/osmo_server_core.c b/src/osmo_server_core.c
index 782ead9..04c14c1 100644
--- a/src/osmo_server_core.c
+++ b/src/osmo_server_core.c
@@ -116,6 +116,35 @@
0);
}

+/* wrf has written all data and can safely be closed, rotated, etc. */
+static void osmo_pcap_wr_file_flush_completed_cb(struct osmo_pcap_wr_file *wrf, void *data)
+{
+ struct osmo_pcap_conn *conn = data;
+
+ if (wrf->wr_completed < wrf->wr_offset) {
+ LOGP(DSERVER, LOGL_NOTICE, "%s: Closing file with pending writes (%zu completed bytes < %zu wrote bytes)\n",
+ wrf->filename, wrf->wr_completed, wrf->wr_offset);
+ }
+
+ if (!osmo_pcap_wr_file_is_flushing(wrf)) {
+ /* If it is not flushing, it probably is still assigned to conn;
+ * unassign it: */
+ if (conn->wrf == wrf)
+ conn->wrf = NULL;
+ }
+
+ osmo_pcap_wr_file_close(wrf);
+
+ if (conn->server->completed_path)
+ osmo_pcap_wr_file_move_to_dir(wrf, conn->server->completed_path);
+
+ osmo_pcap_conn_event(conn, "closingtracefile", wrf->filename);
+ rate_ctr_inc2(conn->ctrg, PEER_CTR_PROTATE);
+ rate_ctr_inc2(conn->server->ctrg, SERVER_CTR_PROTATE);
+
+ osmo_pcap_wr_file_free(wrf);
+}
+
static inline size_t calc_data_max_len(const struct osmo_pcap_server *server)
{
size_t data_max_len;
@@ -168,6 +197,7 @@
return NULL;
}

+ INIT_LLIST_HEAD(&conn->wrf_flushing_list);
conn->data_max_len = calc_data_max_len(server);
conn->data = talloc_zero_size(conn, sizeof(struct osmo_pcap_data) + conn->data_max_len);
/* a bit nasty. we do not work with ids but names */
@@ -227,7 +257,11 @@

void osmo_pcap_conn_free(struct osmo_pcap_conn *conn)
{
+ struct osmo_pcap_wr_file *wrf;
osmo_pcap_conn_close(conn);
+ /* We are freeing, make sure all files are processed even if we may be losing some data... */
+ while ((wrf = llist_first_entry_or_null(&conn->wrf_flushing_list, struct osmo_pcap_wr_file, entry)))
+ osmo_pcap_wr_file_flush_completed_cb(wrf, conn);
llist_del(&conn->entry);
talloc_free(conn);
}
@@ -237,16 +271,8 @@
if (!conn->wrf)
return;

- osmo_pcap_wr_file_close(conn->wrf);
-
- if (conn->server->completed_path)
- osmo_pcap_wr_file_move_to_dir(conn->wrf, conn->server->completed_path);
-
- osmo_pcap_conn_event(conn, "closingtracefile", conn->wrf->filename);
- rate_ctr_inc2(conn->ctrg, PEER_CTR_PROTATE);
- rate_ctr_inc2(conn->server->ctrg, SERVER_CTR_PROTATE);
-
- osmo_pcap_wr_file_free(conn->wrf);
+ osmo_pcap_wr_file_flush(conn->wrf, &conn->wrf_flushing_list);
+ /* conn->wrf may have been freed or moved to conn->wrf_flushing_list: */
conn->wrf = NULL;
}

@@ -298,9 +324,6 @@
/* omit any storing/creation of the file */
if (!conn->store) {
update_last_write(conn, now);
- /* TODO: Once we support async write, we'll want to schedule close here instead of freeing: */
- osmo_pcap_wr_file_free(conn->wrf);
- conn->wrf = NULL;
return;
}

@@ -323,16 +346,21 @@
}

conn->wrf = osmo_pcap_wr_file_alloc(conn, conn);
+ osmo_pcap_wr_file_set_flush_completed_cb(conn->wrf, osmo_pcap_wr_file_flush_completed_cb);
rc = osmo_pcap_wr_file_open(conn->wrf, curr_filename, conn->server->permission_mask);
talloc_free(curr_filename);
if (rc < 0)
return;

- rc = osmo_pcap_wr_file_write(conn->wrf, conn->file_hdr, conn->file_hdr_len);
- if (rc != conn->file_hdr_len) {
+ /* TODO: get msgb from conn object stored: */
+ struct msgb *msg = msgb_alloc_c(conn->wrf, conn->file_hdr_len, "local_iofd_hdr");
+ memcpy(msgb_put(msg, conn->file_hdr_len), conn->file_hdr, conn->file_hdr_len);
+
+ rc = osmo_pcap_wr_file_write_msgb(conn->wrf, msg);
+ if (rc < 0) {
LOGP(DSERVER, LOGL_ERROR, "Failed to write the header: %d\n", errno);
- osmo_pcap_wr_file_free(conn->wrf);
- conn->wrf = NULL;
+ msgb_free(msg);
+ osmo_pcap_conn_close_trace(conn);
return;
}
update_last_write(conn, now);
@@ -512,9 +540,14 @@
if (!check_restart_pcap_max_size(conn, len))
check_restart_pcap_localtime(conn, now);

- rc = osmo_pcap_wr_file_write(conn->wrf, data, len);
+ /* TODO: get msgb from caller: */
+ struct msgb *msg = msgb_alloc_c(conn->wrf, len, "local_iofd_msg");
+ memcpy(msgb_put(msg, len), data, len);
+
+ rc = osmo_pcap_wr_file_write_msgb(conn->wrf, msg);
if (rc < 0) {
LOGP(DSERVER, LOGL_ERROR, "%s: Failed writing to file\n", conn->name);
+ msgb_free(msg);
return -1;
}
update_last_write(conn, now);

To view, visit change 39367. To unsubscribe, or for help writing mail filters, visit settings.

Gerrit-MessageType: merged
Gerrit-Project: osmo-pcap
Gerrit-Branch: master
Gerrit-Change-Id: I098a9455a2a4cc626444e6fc13aa88c4cc9694f0
Gerrit-Change-Number: 39367
Gerrit-PatchSet: 3
Gerrit-Owner: pespin <pespin@sysmocom.de>
Gerrit-Reviewer: Jenkins Builder
Gerrit-Reviewer: laforge <laforge@osmocom.org>
Gerrit-Reviewer: osmith <osmith@sysmocom.de>
Gerrit-Reviewer: pespin <pespin@sysmocom.de>