pespin has uploaded this change for review. ( https://gerrit.osmocom.org/c/libosmo-netif/+/38955?usp=email )
Change subject: stream_cli: Add API osmo_stream_cli_set_tx_queue_max_length() ......................................................................
stream_cli: Add API osmo_stream_cli_set_tx_queue_max_length()
Change-Id: I3935fb933fe6136d68a9403eebbaf2616c2e5578 --- M TODO-RELEASE M include/osmocom/netif/stream.h M src/stream_cli.c 3 files changed, 37 insertions(+), 6 deletions(-)
git pull ssh://gerrit.osmocom.org:29418/libosmo-netif refs/changes/55/38955/1
diff --git a/TODO-RELEASE b/TODO-RELEASE index 4f59788..74a0abb 100644 --- a/TODO-RELEASE +++ b/TODO-RELEASE @@ -8,3 +8,4 @@ # If any interfaces have been removed or changed since the last public release: c:r:0. #library what description / commit summary line libosmo-netif add API osmo_stream_cli_set_{ip_dscp,priority}(), osmo_stream_srv_link_set_{ip_dscp,priority}() +libosmo-netif add API osmo-stream_cli_set_tx_queue_max_length() diff --git a/include/osmocom/netif/stream.h b/include/osmocom/netif/stream.h index 6edf915..3b3e04e 100644 --- a/include/osmocom/netif/stream.h +++ b/include/osmocom/netif/stream.h @@ -208,6 +208,7 @@ void osmo_stream_cli_set_data(struct osmo_stream_cli *cli, void *data); void osmo_stream_cli_set_reconnect_timeout(struct osmo_stream_cli *cli, int timeout); void *osmo_stream_cli_get_data(struct osmo_stream_cli *cli); +int osmo_stream_cli_set_tx_queue_max_length(struct osmo_stream_cli *cli, unsigned int size); char *osmo_stream_cli_get_sockname(const struct osmo_stream_cli *cli); struct osmo_fd *osmo_stream_cli_get_ofd(struct osmo_stream_cli *cli); int osmo_stream_cli_get_fd(const struct osmo_stream_cli *cli); diff --git a/src/stream_cli.c b/src/stream_cli.c index 88d5f9e..a00f32a 100644 --- a/src/stream_cli.c +++ b/src/stream_cli.c @@ -96,7 +96,9 @@ struct osmo_fd ofd; struct osmo_io_fd *iofd; }; - struct llist_head tx_queue; + struct llist_head tx_queue; /* osmo_ofd mode (only): Queue of msgbs */ + unsigned int tx_queue_count; /* osmo_ofd mode (only): Current amount of msgbs queued */ + unsigned int tx_queue_max_length; /* Max amount of msgbs which can be enqueued */ struct osmo_timer_list timer; enum osmo_stream_cli_state state; char *addr[OSMO_STREAM_MAX_ADDRS]; @@ -321,12 +323,11 @@ struct msgb *msg; int ret;
- if (llist_empty(&cli->tx_queue)) { + msg = msgb_dequeue_count(&cli->tx_queue, &cli->tx_queue_count); + if (!msg) { /* done, tx_queue empty */ osmo_fd_write_disable(&cli->ofd); return 0; } - msg = llist_first_entry(&cli->tx_queue, struct msgb, list); - llist_del(&msg->list);
if (!osmo_stream_cli_is_connected(cli)) { LOGSCLI(cli, LOGL_ERROR, "send: not connected, dropping data!\n"); @@ -367,6 +368,7 @@ /* Update msgb and re-add it at the start of the queue: */ msgb_pull(msg, ret); llist_add(&msg->list, &cli->tx_queue); + cli->tx_queue_count++; return 0; }
@@ -376,6 +378,7 @@ if (err == EAGAIN) { /* Re-add at the start of the queue to re-attempt: */ llist_add(&msg->list, &cli->tx_queue); + cli->tx_queue_count++; return 0; } msgb_free(msg); @@ -510,6 +513,7 @@ cli->reconnect_timeout = 5; /* default is 5 seconds. */ cli->segmentation_cb = NULL; INIT_LLIST_HEAD(&cli->tx_queue); + cli->tx_queue_max_length = 1024; /* Default tx queue size, msgbs. */
cli->ma_pars.sctp.version = 0;
@@ -863,6 +867,24 @@ return cli->data; }
+/*! Set the maximum length queue of the stream client. + * \param[in] cli Stream Client to modify + * \param[in] size maximum amount of msgbs which can be queued in the internal tx queue. + * \returns 0 on success, negative on error. + * + * The maximum length queue default value is 1024 msgbs. */ +int osmo_stream_cli_set_tx_queue_max_length(struct osmo_stream_cli *cli, unsigned int size) +{ + cli->tx_queue_max_length = size; + + if (cli->iofd) /* Otherwise, this will be done in osmo_stream_cli_open() */ + osmo_iofd_set_txqueue_max_length(cli->iofd, cli->tx_queue_max_length); + + /* XXX: Here, in OSMO_STREAM_MODE_OSMO_FD mode we could check current + * size of cli->tx_queue and shrink it from the front or back... */ + return 0; +} + /*! Retrieve the stream client socket description. * Calling this function will build a string that describes the socket in terms of its local/remote * address/port. The returned name is stored in a static buffer; it is hence not re-entrant or thread-safe. @@ -944,6 +966,7 @@ OSMO_ASSERT(!stream_cli_close(cli)); osmo_timer_del(&cli->timer); msgb_queue_free(&cli->tx_queue); + cli->tx_queue_count = 0; /* if we are in a user callback, delay freeing. */ if (cli->in_cb_mask != 0) { LOGSCLI(cli, LOGL_DEBUG, "delay free() in_cb_mask=0x%02x\n", cli->in_cb_mask); @@ -1204,8 +1227,8 @@ if (!cli->iofd) goto error_close_socket;
+ osmo_iofd_set_txqueue_max_length(cli->iofd, cli->tx_queue_max_length); osmo_iofd_notify_connected(cli->iofd); - configure_cli_segmentation_cb(cli, cli->segmentation_cb);
if (osmo_iofd_register(cli->iofd, fd) < 0) @@ -1253,7 +1276,12 @@
switch (cli->mode) { case OSMO_STREAM_MODE_OSMO_FD: - msgb_enqueue(&cli->tx_queue, msg); + if (cli->tx_queue_count >= cli->tx_queue_max_length) { + LOGSCLI(cli, LOGL_ERROR, "send: tx queue full, dropping msg!\n"); + msgb_free(msg); + return; + } + msgb_enqueue_count(&cli->tx_queue, msg, &cli->tx_queue_count); osmo_fd_write_enable(&cli->ofd); break; case OSMO_STREAM_MODE_OSMO_IO: @@ -1350,6 +1378,7 @@ switch (cli->mode) { case OSMO_STREAM_MODE_OSMO_FD: msgb_queue_free(&cli->tx_queue); + cli->tx_queue_count = 0; /* If in state 'connecting', keep WRITE flag up to receive * socket connection signal and then transition to STATE_CONNECTED: */ if (cli->state == STREAM_CLI_STATE_CONNECTED)