pespin submitted this change.

View Change

Approvals: fixeria: Looks good to me, but someone else must approve Jenkins Builder: Verified pespin: Looks good to me, approved laforge: Looks good to me, but someone else must approve osmith: Looks good to me, but someone else must approve
stream: Introduce osmo_stream_{cli,srv}_set_segmentation_cb2

This is useful for users requiring to store global state or access
external objects while segmenting.
For instance, figure out if too much data has been received according to
whatever external limits are expected according to configuration.

Change-Id: I6e8dd6ece13397074075f05a1a0a8dbdd80d8848
---
M TODO-RELEASE
M include/osmocom/netif/stream.h
M src/stream_cli.c
M src/stream_srv.c
4 files changed, 105 insertions(+), 23 deletions(-)

diff --git a/TODO-RELEASE b/TODO-RELEASE
index 5092ee7..e66b421 100644
--- a/TODO-RELEASE
+++ b/TODO-RELEASE
@@ -9,4 +9,5 @@
#library what description / commit summary line
libosmo-netif add API osmo_stream_cli_set_{ip_dscp,priority}(), osmo_stream_srv_link_set_{ip_dscp,priority}()
libosmo-netif add API osmo-stream_cli_set_tx_queue_max_length(), osmo_stream_srv_link_set_tx_queue_max_length()
-libosmo-netif add API struct osmo_ipa_ka_fsm_inst
\ No newline at end of file
+libosmo-netif add API struct osmo_ipa_ka_fsm_inst
+libosmo-netif add API osmo_stream_{cli,srv}_set_segmentation_cb2()
diff --git a/include/osmocom/netif/stream.h b/include/osmocom/netif/stream.h
index f63560a..e793a26 100644
--- a/include/osmocom/netif/stream.h
+++ b/include/osmocom/netif/stream.h
@@ -117,6 +117,7 @@
typedef int (*osmo_stream_srv_read_cb2_t)(struct osmo_stream_srv *conn, int res, struct msgb *msg);

typedef int (*osmo_stream_srv_segmentation_cb_t)(struct msgb *msg);
+typedef int (*osmo_stream_srv_segmentation_cb2_t)(struct osmo_stream_srv *conn, struct msgb *msg);

struct osmo_stream_srv *osmo_stream_srv_create(void *ctx, struct osmo_stream_srv_link *link, int fd,
osmo_stream_srv_read_cb_t read_cb,
@@ -138,7 +139,8 @@
void osmo_stream_srv_set_flush_and_destroy(struct osmo_stream_srv *conn);
void osmo_stream_srv_set_data(struct osmo_stream_srv *conn, void *data);

-void osmo_stream_srv_set_segmentation_cb(struct osmo_stream_srv *conn, osmo_stream_srv_segmentation_cb_t segmentation_cb);
+void osmo_stream_srv_set_segmentation_cb(struct osmo_stream_srv *conn, osmo_stream_srv_segmentation_cb_t segmentation_cb) OSMO_DEPRECATED("Use osmo_stream_srv_set_segmentation_cb2() instead");
+void osmo_stream_srv_set_segmentation_cb2(struct osmo_stream_srv *conn, osmo_stream_srv_segmentation_cb2_t segmentation_cb2);

void osmo_stream_srv_send(struct osmo_stream_srv *conn, struct msgb *msg);
int osmo_stream_srv_recv(struct osmo_stream_srv *conn, struct msgb *msg);
@@ -197,6 +199,7 @@
typedef int (*osmo_stream_cli_read_cb2_t)(struct osmo_stream_cli *cli, int res, struct msgb *msg);

typedef int (*osmo_stream_cli_segmentation_cb_t)(struct msgb *msg);
+typedef int (*osmo_stream_cli_segmentation_cb2_t)(struct osmo_stream_cli *cli, struct msgb *msg);

void osmo_stream_cli_set_name(struct osmo_stream_cli *cli, const char *name);
const char *osmo_stream_cli_get_name(const struct osmo_stream_cli *cli);
@@ -224,7 +227,8 @@
void osmo_stream_cli_set_disconnect_cb(struct osmo_stream_cli *cli, osmo_stream_cli_disconnect_cb_t disconnect_cb);
void osmo_stream_cli_set_read_cb(struct osmo_stream_cli *cli, osmo_stream_cli_read_cb_t read_cb);
void osmo_stream_cli_set_read_cb2(struct osmo_stream_cli *cli, osmo_stream_cli_read_cb2_t read_cb);
-void osmo_stream_cli_set_segmentation_cb(struct osmo_stream_cli *cli, osmo_stream_cli_segmentation_cb_t segmentation_cb);
+void osmo_stream_cli_set_segmentation_cb(struct osmo_stream_cli *cli, osmo_stream_cli_segmentation_cb_t segmentation_cb) OSMO_DEPRECATED("Use osmo_stream_cli_set_segmentation_cb2() instead");
+void osmo_stream_cli_set_segmentation_cb2(struct osmo_stream_cli *cli, osmo_stream_cli_segmentation_cb2_t segmentation_cb2);
void osmo_stream_cli_reconnect(struct osmo_stream_cli *cli);
bool osmo_stream_cli_is_connected(struct osmo_stream_cli *cli);

diff --git a/src/stream_cli.c b/src/stream_cli.c
index 48a8858..586b1b0 100644
--- a/src/stream_cli.c
+++ b/src/stream_cli.c
@@ -117,6 +117,7 @@
osmo_stream_cli_read_cb_t read_cb;
osmo_stream_cli_read_cb2_t iofd_read_cb;
osmo_stream_cli_segmentation_cb_t segmentation_cb;
+ osmo_stream_cli_segmentation_cb2_t segmentation_cb2;
void *data;
int flags;
int reconnect_timeout;
@@ -511,7 +512,6 @@
cli->state = STREAM_CLI_STATE_CLOSED;
osmo_timer_setup(&cli->timer, cli_timer_cb, cli);
cli->reconnect_timeout = 5; /* default is 5 seconds. */
- cli->segmentation_cb = NULL;
INIT_LLIST_HEAD(&cli->tx_queue);
cli->tx_queue_max_length = 1024; /* Default tx queue size, msgbs. */

@@ -598,8 +598,6 @@
static const struct osmo_io_ops osmo_stream_cli_ioops = {
.read_cb = stream_cli_iofd_read_cb,
.write_cb = stream_cli_iofd_write_cb,
-
- .segmentation_cb = NULL,
};

#ifdef HAVE_LIBSCTP
@@ -659,8 +657,6 @@
static const struct osmo_io_ops osmo_stream_cli_ioops_sctp = {
.recvmsg_cb = stream_cli_iofd_recvmsg_cb,
.sendmsg_cb = stream_cli_iofd_write_cb,
-
- .segmentation_cb = NULL,
};
#endif

@@ -795,28 +791,58 @@
cli->flags |= OSMO_STREAM_CLI_F_RECONF;
}

+/* Callback from iofd, forward to stream_cli user: */
+static int stream_cli_iofd_segmentation_cb2(struct osmo_io_fd *iofd, struct msgb *msg)
+{
+ struct osmo_stream_cli *cli = osmo_iofd_get_data(iofd);
+ if (cli->segmentation_cb2)
+ return cli->segmentation_cb2(cli, msg);
+ if (cli->segmentation_cb)
+ return cli->segmentation_cb(msg);
+ OSMO_ASSERT(0);
+ return 0;
+}
+
/* Configure client side segmentation for the iofd */
-static void configure_cli_segmentation_cb(struct osmo_stream_cli *cli,
- osmo_stream_cli_segmentation_cb_t segmentation_cb)
+static void configure_cli_segmentation_cb(struct osmo_stream_cli *cli)
{
/* Copy default settings */
struct osmo_io_ops client_ops;
osmo_iofd_get_ioops(cli->iofd, &client_ops);
/* Set segmentation cb for this client */
- client_ops.segmentation_cb = segmentation_cb;
+ if (cli->segmentation_cb || cli->segmentation_cb2)
+ client_ops.segmentation_cb2 = stream_cli_iofd_segmentation_cb2;
+ else
+ client_ops.segmentation_cb2 = NULL;
osmo_iofd_set_ioops(cli->iofd, &client_ops);
}

/*! Set the segmentation callback for the client.
* \param[in,out] cli Stream Client to modify
* \param[in] segmentation_cb Target segmentation callback
+ *
+ * DEPRECATED: Use osmo_cli_set_segmentation_cb2() instead.
*/
void osmo_stream_cli_set_segmentation_cb(struct osmo_stream_cli *cli,
osmo_stream_cli_segmentation_cb_t segmentation_cb)
{
cli->segmentation_cb = segmentation_cb;
+ cli->segmentation_cb2 = NULL;
if (cli->iofd) /* Otherwise, this will be done in osmo_stream_cli_open() */
- configure_cli_segmentation_cb(cli, segmentation_cb);
+ configure_cli_segmentation_cb(cli);
+}
+
+/*! Set the segmentation callback for the client.
+ * \param[in,out] cli Stream Client to modify
+ * \param[in] segmentation_cb2 Target segmentation callback
+ */
+void osmo_stream_cli_set_segmentation_cb2(struct osmo_stream_cli *cli,
+ osmo_stream_cli_segmentation_cb2_t segmentation_cb2)
+{
+ cli->segmentation_cb = NULL;
+ cli->segmentation_cb2 = segmentation_cb2;
+ if (cli->iofd) /* Otherwise, this will be done in osmo_stream_cli_open() */
+ configure_cli_segmentation_cb(cli);
}

/*! Set the socket type for the stream server link.
@@ -1257,7 +1283,7 @@

osmo_iofd_set_txqueue_max_length(cli->iofd, cli->tx_queue_max_length);
osmo_iofd_notify_connected(cli->iofd);
- configure_cli_segmentation_cb(cli, cli->segmentation_cb);
+ configure_cli_segmentation_cb(cli);

if (osmo_iofd_register(cli->iofd, fd) < 0)
goto error_close_socket;
diff --git a/src/stream_srv.c b/src/stream_srv.c
index c74f7b0..c2b187a 100644
--- a/src/stream_srv.c
+++ b/src/stream_srv.c
@@ -664,6 +664,8 @@
osmo_stream_srv_closed_cb_t closed_cb;
osmo_stream_srv_read_cb_t read_cb;
osmo_stream_srv_read_cb2_t iofd_read_cb;
+ osmo_stream_srv_segmentation_cb_t segmentation_cb;
+ osmo_stream_srv_segmentation_cb2_t segmentation_cb2;
void *data;
int flags;
};
@@ -1040,7 +1042,37 @@
conn->data = data;
}

+/* Callback from iofd, forward to stream_srv user: */
+static int stream_srv_iofd_segmentation_cb2(struct osmo_io_fd *iofd, struct msgb *msg)
+{
+ struct osmo_stream_srv *conn = osmo_iofd_get_data(iofd);
+ if (conn->segmentation_cb2)
+ return conn->segmentation_cb2(conn, msg);
+ if (conn->segmentation_cb)
+ return conn->segmentation_cb(msg);
+ OSMO_ASSERT(0);
+ return 0;
+}
+
+/* Configure server side segmentation for the iofd */
+static void configure_srv_segmentation_cb(struct osmo_stream_srv *conn)
+{
+ /* Copy default settings */
+ struct osmo_io_ops client_ops;
+ osmo_iofd_get_ioops(conn->iofd, &client_ops);
+ /* Set segmentation cb for this client */
+ if (conn->segmentation_cb || conn->segmentation_cb2)
+ client_ops.segmentation_cb2 = stream_srv_iofd_segmentation_cb2;
+ else
+ client_ops.segmentation_cb2 = NULL;
+ osmo_iofd_set_ioops(conn->iofd, &client_ops);
+}
+
/*! Set the segmentation callback for target osmo_stream_srv structure.
+ * \param[in,out] conn Target Stream Server to modify
+ * \param[in] segmentation_cb Segmentation callback to be set
+ *
+ * DEPRECATED: Use osmo_stream_srv_set_segmentation_cb2() instead.
*
* A segmentation call-back can optionally be used when a packet based protocol (like TCP) is used within a
* STREAM style socket that does not preserve message boundaries within the stream. If a segmentation
@@ -1048,22 +1080,41 @@
* complete single messages, and not arbitrary segments of the stream.
*
* This function only works with osmo_stream_srv in osmo_io mode, created by osmo_stream_srv_create2()!
- * The connection has to have been established prior to calling this function.
- *
- * \param[in,out] conn Target Stream Server to modify
- * \param[in] segmentation_cb Segmentation callback to be set */
+ * The connection has to have been established prior to calling this function.
+ *
+ */
void osmo_stream_srv_set_segmentation_cb(struct osmo_stream_srv *conn,
osmo_stream_srv_segmentation_cb_t segmentation_cb)
{
/* Note that the following implies that iofd != NULL, since
* osmo_stream_srv_create2() creates the iofd member, too */
OSMO_ASSERT(conn->mode == OSMO_STREAM_MODE_OSMO_IO);
- /* Copy default settings */
- struct osmo_io_ops conn_ops;
- osmo_iofd_get_ioops(conn->iofd, &conn_ops);
- /* Set segmentation cb for this connection */
- conn_ops.segmentation_cb = segmentation_cb;
- osmo_iofd_set_ioops(conn->iofd, &conn_ops);
+ conn->segmentation_cb = segmentation_cb;
+ conn->segmentation_cb2 = NULL;
+ configure_srv_segmentation_cb(conn);
+}
+
+/*! Set the segmentation callback for target osmo_stream_srv structure.
+ * \param[in,out] conn Target Stream Server to modify
+ * \param[in] segmentation_cb2 Segmentation callback to be set
+ *
+ * A segmentation call-back can optionally be used when a packet based protocol (like TCP) is used within a
+ * STREAM style socket that does not preserve message boundaries within the stream. If a segmentation
+ * call-back is given, the osmo_stream_srv library code will makes sure that the read_cb called only for
+ * complete single messages, and not arbitrary segments of the stream.
+ *
+ * This function only works with osmo_stream_srv in osmo_io mode, created by osmo_stream_srv_create2()!
+ * The connection has to have been established prior to calling this function.
+ */
+void osmo_stream_srv_set_segmentation_cb2(struct osmo_stream_srv *conn,
+ osmo_stream_srv_segmentation_cb2_t segmentation_cb2)
+{
+ /* Note that the following implies that iofd != NULL, since
+ * osmo_stream_srv_create2() creates the iofd member, too */
+ OSMO_ASSERT(conn->mode == OSMO_STREAM_MODE_OSMO_IO);
+ conn->segmentation_cb = NULL;
+ conn->segmentation_cb2 = segmentation_cb2;
+ configure_srv_segmentation_cb(conn);
}

/*! Retrieve application private data of the stream server

To view, visit change 39316. To unsubscribe, or for help writing mail filters, visit settings.

Gerrit-MessageType: merged
Gerrit-Project: libosmo-netif
Gerrit-Branch: master
Gerrit-Change-Id: I6e8dd6ece13397074075f05a1a0a8dbdd80d8848
Gerrit-Change-Number: 39316
Gerrit-PatchSet: 3
Gerrit-Owner: pespin <pespin@sysmocom.de>
Gerrit-Reviewer: Jenkins Builder
Gerrit-Reviewer: daniel <dwillmann@sysmocom.de>
Gerrit-Reviewer: fixeria <vyanitskiy@sysmocom.de>
Gerrit-Reviewer: laforge <laforge@osmocom.org>
Gerrit-Reviewer: neels <nhofmeyr@sysmocom.de>
Gerrit-Reviewer: osmith <osmith@sysmocom.de>
Gerrit-Reviewer: pespin <pespin@sysmocom.de>