pespin has uploaded this change for review.

View Change

stream_cli: Add API osmo_stream_cli_set_tx_queue_max_length()

Change-Id: I3935fb933fe6136d68a9403eebbaf2616c2e5578
---
M TODO-RELEASE
M include/osmocom/netif/stream.h
M src/stream_cli.c
3 files changed, 37 insertions(+), 6 deletions(-)

git pull ssh://gerrit.osmocom.org:29418/libosmo-netif refs/changes/55/38955/1
diff --git a/TODO-RELEASE b/TODO-RELEASE
index 4f59788..74a0abb 100644
--- a/TODO-RELEASE
+++ b/TODO-RELEASE
@@ -8,3 +8,4 @@
# If any interfaces have been removed or changed since the last public release: c:r:0.
#library what description / commit summary line
libosmo-netif add API osmo_stream_cli_set_{ip_dscp,priority}(), osmo_stream_srv_link_set_{ip_dscp,priority}()
+libosmo-netif add API osmo-stream_cli_set_tx_queue_max_length()
diff --git a/include/osmocom/netif/stream.h b/include/osmocom/netif/stream.h
index 6edf915..3b3e04e 100644
--- a/include/osmocom/netif/stream.h
+++ b/include/osmocom/netif/stream.h
@@ -208,6 +208,7 @@
void osmo_stream_cli_set_data(struct osmo_stream_cli *cli, void *data);
void osmo_stream_cli_set_reconnect_timeout(struct osmo_stream_cli *cli, int timeout);
void *osmo_stream_cli_get_data(struct osmo_stream_cli *cli);
+int osmo_stream_cli_set_tx_queue_max_length(struct osmo_stream_cli *cli, unsigned int size);
char *osmo_stream_cli_get_sockname(const struct osmo_stream_cli *cli);
struct osmo_fd *osmo_stream_cli_get_ofd(struct osmo_stream_cli *cli);
int osmo_stream_cli_get_fd(const struct osmo_stream_cli *cli);
diff --git a/src/stream_cli.c b/src/stream_cli.c
index 88d5f9e..a00f32a 100644
--- a/src/stream_cli.c
+++ b/src/stream_cli.c
@@ -96,7 +96,9 @@
struct osmo_fd ofd;
struct osmo_io_fd *iofd;
};
- struct llist_head tx_queue;
+ struct llist_head tx_queue; /* osmo_ofd mode (only): Queue of msgbs */
+ unsigned int tx_queue_count; /* osmo_ofd mode (only): Current amount of msgbs queued */
+ unsigned int tx_queue_max_length; /* Max amount of msgbs which can be enqueued */
struct osmo_timer_list timer;
enum osmo_stream_cli_state state;
char *addr[OSMO_STREAM_MAX_ADDRS];
@@ -321,12 +323,11 @@
struct msgb *msg;
int ret;

- if (llist_empty(&cli->tx_queue)) {
+ msg = msgb_dequeue_count(&cli->tx_queue, &cli->tx_queue_count);
+ if (!msg) { /* done, tx_queue empty */
osmo_fd_write_disable(&cli->ofd);
return 0;
}
- msg = llist_first_entry(&cli->tx_queue, struct msgb, list);
- llist_del(&msg->list);

if (!osmo_stream_cli_is_connected(cli)) {
LOGSCLI(cli, LOGL_ERROR, "send: not connected, dropping data!\n");
@@ -367,6 +368,7 @@
/* Update msgb and re-add it at the start of the queue: */
msgb_pull(msg, ret);
llist_add(&msg->list, &cli->tx_queue);
+ cli->tx_queue_count++;
return 0;
}

@@ -376,6 +378,7 @@
if (err == EAGAIN) {
/* Re-add at the start of the queue to re-attempt: */
llist_add(&msg->list, &cli->tx_queue);
+ cli->tx_queue_count++;
return 0;
}
msgb_free(msg);
@@ -510,6 +513,7 @@
cli->reconnect_timeout = 5; /* default is 5 seconds. */
cli->segmentation_cb = NULL;
INIT_LLIST_HEAD(&cli->tx_queue);
+ cli->tx_queue_max_length = 1024; /* Default tx queue size, msgbs. */

cli->ma_pars.sctp.version = 0;

@@ -863,6 +867,24 @@
return cli->data;
}

+/*! Set the maximum length queue of the stream client.
+ * \param[in] cli Stream Client to modify
+ * \param[in] size maximum amount of msgbs which can be queued in the internal tx queue.
+ * \returns 0 on success, negative on error.
+ *
+ * The maximum length queue default value is 1024 msgbs. */
+int osmo_stream_cli_set_tx_queue_max_length(struct osmo_stream_cli *cli, unsigned int size)
+{
+ cli->tx_queue_max_length = size;
+
+ if (cli->iofd) /* Otherwise, this will be done in osmo_stream_cli_open() */
+ osmo_iofd_set_txqueue_max_length(cli->iofd, cli->tx_queue_max_length);
+
+ /* XXX: Here, in OSMO_STREAM_MODE_OSMO_FD mode we could check current
+ * size of cli->tx_queue and shrink it from the front or back... */
+ return 0;
+}
+
/*! Retrieve the stream client socket description.
* Calling this function will build a string that describes the socket in terms of its local/remote
* address/port. The returned name is stored in a static buffer; it is hence not re-entrant or thread-safe.
@@ -944,6 +966,7 @@
OSMO_ASSERT(!stream_cli_close(cli));
osmo_timer_del(&cli->timer);
msgb_queue_free(&cli->tx_queue);
+ cli->tx_queue_count = 0;
/* if we are in a user callback, delay freeing. */
if (cli->in_cb_mask != 0) {
LOGSCLI(cli, LOGL_DEBUG, "delay free() in_cb_mask=0x%02x\n", cli->in_cb_mask);
@@ -1204,8 +1227,8 @@
if (!cli->iofd)
goto error_close_socket;

+ osmo_iofd_set_txqueue_max_length(cli->iofd, cli->tx_queue_max_length);
osmo_iofd_notify_connected(cli->iofd);
-
configure_cli_segmentation_cb(cli, cli->segmentation_cb);

if (osmo_iofd_register(cli->iofd, fd) < 0)
@@ -1253,7 +1276,12 @@

switch (cli->mode) {
case OSMO_STREAM_MODE_OSMO_FD:
- msgb_enqueue(&cli->tx_queue, msg);
+ if (cli->tx_queue_count >= cli->tx_queue_max_length) {
+ LOGSCLI(cli, LOGL_ERROR, "send: tx queue full, dropping msg!\n");
+ msgb_free(msg);
+ return;
+ }
+ msgb_enqueue_count(&cli->tx_queue, msg, &cli->tx_queue_count);
osmo_fd_write_enable(&cli->ofd);
break;
case OSMO_STREAM_MODE_OSMO_IO:
@@ -1350,6 +1378,7 @@
switch (cli->mode) {
case OSMO_STREAM_MODE_OSMO_FD:
msgb_queue_free(&cli->tx_queue);
+ cli->tx_queue_count = 0;
/* If in state 'connecting', keep WRITE flag up to receive
* socket connection signal and then transition to STATE_CONNECTED: */
if (cli->state == STREAM_CLI_STATE_CONNECTED)

To view, visit change 38955. To unsubscribe, or for help writing mail filters, visit settings.

Gerrit-MessageType: newchange
Gerrit-Project: libosmo-netif
Gerrit-Branch: master
Gerrit-Change-Id: I3935fb933fe6136d68a9403eebbaf2616c2e5578
Gerrit-Change-Number: 38955
Gerrit-PatchSet: 1
Gerrit-Owner: pespin <pespin@sysmocom.de>