pespin has uploaded this change for review. ( https://gerrit.osmocom.org/c/libosmo-netif/+/38956?usp=email )
Change subject: stream_srv: Add API osmo_stream_srv_link_set_tx_queue_max_length() ......................................................................
stream_srv: Add API osmo_stream_srv_link_set_tx_queue_max_length()
Change-Id: I3c2deac7f7be0cf838834135a548cce70367a905 --- M TODO-RELEASE M include/osmocom/netif/stream.h M src/stream_srv.c 3 files changed, 33 insertions(+), 7 deletions(-)
git pull ssh://gerrit.osmocom.org:29418/libosmo-netif refs/changes/56/38956/1
diff --git a/TODO-RELEASE b/TODO-RELEASE index 74a0abb..268b142 100644 --- a/TODO-RELEASE +++ b/TODO-RELEASE @@ -8,4 +8,4 @@ # If any interfaces have been removed or changed since the last public release: c:r:0. #library what description / commit summary line libosmo-netif add API osmo_stream_cli_set_{ip_dscp,priority}(), osmo_stream_srv_link_set_{ip_dscp,priority}() -libosmo-netif add API osmo-stream_cli_set_tx_queue_max_length() +libosmo-netif add API osmo-stream_cli_set_tx_queue_max_length(), osmo_stream_srv_link_set_tx_queue_max_length() diff --git a/include/osmocom/netif/stream.h b/include/osmocom/netif/stream.h index 3b3e04e..a132a27 100644 --- a/include/osmocom/netif/stream.h +++ b/include/osmocom/netif/stream.h @@ -81,6 +81,7 @@ void osmo_stream_srv_link_set_accept_cb(struct osmo_stream_srv_link *link, osmo_stream_srv_link_accept_cb_t accept_cb); void osmo_stream_srv_link_set_data(struct osmo_stream_srv_link *link, void *data); void *osmo_stream_srv_link_get_data(struct osmo_stream_srv_link *link); +int osmo_stream_srv_link_set_tx_queue_max_length(struct osmo_stream_srv_link *link, unsigned int size); char *osmo_stream_srv_link_get_sockname(const struct osmo_stream_srv_link *link); struct osmo_fd *osmo_stream_srv_link_get_ofd(struct osmo_stream_srv_link *link); int osmo_stream_srv_link_get_fd(const struct osmo_stream_srv_link *link); diff --git a/src/stream_srv.c b/src/stream_srv.c index aa7edbf..4f9323b 100644 --- a/src/stream_srv.c +++ b/src/stream_srv.c @@ -87,6 +87,7 @@ osmo_stream_srv_link_accept_cb_t accept_cb; void *data; int flags; + unsigned int tx_queue_max_length; /* Max amount of msgbs which can be enqueued */ struct osmo_sock_init2_multiaddr_pars ma_pars; };
@@ -203,6 +204,7 @@ link->sk_domain = AF_UNSPEC; link->sk_type = SOCK_STREAM; link->proto = IPPROTO_TCP; + link->tx_queue_max_length = 1024; /* Default tx queue size, msgbs. */ osmo_fd_setup(&link->ofd, -1, OSMO_FD_READ | OSMO_FD_WRITE, osmo_stream_srv_link_ofd_cb, link, 0);
link->ma_pars.sctp.version = 0; @@ -394,6 +396,18 @@ return link->data; }
+/*! Set the maximum length queue of the stream servers accepted and allocated from this server link. + * \param[in] link Stream Server Link to modify + * \param[in] size maximum amount of msgbs which can be queued in the internal tx queue. + * \returns 0 on success, negative on error. + * + * The maximum length queue default value is 1024 msgbs. */ +int osmo_stream_srv_link_set_tx_queue_max_length(struct osmo_stream_srv_link *link, unsigned int size) +{ + link->tx_queue_max_length = size; + return 0; +} + /* Similar to osmo_sock_multiaddr_get_name_buf(), but aimed at listening sockets (only local part): */ static char *get_local_sockname_buf(char *buf, size_t buf_len, const struct osmo_stream_srv_link *link) { @@ -643,7 +657,8 @@ struct osmo_fd ofd; struct osmo_io_fd *iofd; }; - struct llist_head tx_queue; + struct llist_head tx_queue; /* osmo_ofd mode (only): Queue of msgbs */ + unsigned int tx_queue_count; /* osmo_ofd mode (only): Current amount of msgbs queued */ osmo_stream_srv_closed_cb_t closed_cb; osmo_stream_srv_read_cb_t read_cb; osmo_stream_srv_read_cb2_t iofd_read_cb; @@ -769,12 +784,11 @@ struct msgb *msg; int ret;
- if (llist_empty(&conn->tx_queue)) { + msg = msgb_dequeue_count(&conn->tx_queue, &conn->tx_queue_count); + if (!msg) { /* done, tx_queue empty */ osmo_fd_write_disable(&conn->ofd); return; } - msg = llist_first_entry(&conn->tx_queue, struct msgb, list); - llist_del(&msg->list);
LOGSSRV(conn, LOGL_DEBUG, "sending %u bytes of data\n", msg->len);
@@ -811,6 +825,7 @@ /* Update msgb and re-add it at the start of the queue: */ msgb_pull(msg, ret); llist_add(&msg->list, &conn->tx_queue); + conn->tx_queue_count++; return; }
@@ -820,6 +835,7 @@ if (err == EAGAIN) { /* Re-add at the start of the queue to re-attempt: */ llist_add(&msg->list, &conn->tx_queue); + conn->tx_queue_count++; return; } msgb_free(msg); @@ -923,6 +939,7 @@
conn->mode = OSMO_STREAM_MODE_OSMO_IO; conn->srv = link; + conn->data = data;
osmo_sock_get_name_buf(conn->sockname, sizeof(conn->sockname), fd);
@@ -939,7 +956,8 @@ talloc_free(conn); return NULL; } - conn->data = data; + + osmo_iofd_set_txqueue_max_length(conn->iofd, conn->srv->tx_queue_max_length);
if (osmo_iofd_register(conn->iofd, fd) < 0) { LOGSSRV(conn, LOGL_ERROR, "could not register FD %d\n", fd); @@ -1127,6 +1145,7 @@ osmo_fd_unregister(&conn->ofd); close(conn->ofd.fd); msgb_queue_free(&conn->tx_queue); + conn->tx_queue_count = 0; conn->ofd.fd = -1; break; case OSMO_STREAM_MODE_OSMO_IO: @@ -1158,7 +1177,12 @@
switch (conn->mode) { case OSMO_STREAM_MODE_OSMO_FD: - msgb_enqueue(&conn->tx_queue, msg); + if (conn->tx_queue_count >= conn->srv->tx_queue_max_length) { + LOGSSRV(conn, LOGL_ERROR, "send: tx queue full, dropping msg!\n"); + msgb_free(msg); + return; + } + msgb_enqueue_count(&conn->tx_queue, msg, &conn->tx_queue_count); osmo_fd_write_enable(&conn->ofd); break; case OSMO_STREAM_MODE_OSMO_IO: @@ -1249,6 +1273,7 @@ switch (conn->mode) { case OSMO_STREAM_MODE_OSMO_FD: msgb_queue_free(&conn->tx_queue); + conn->tx_queue_count = 0; osmo_fd_write_disable(&conn->ofd); break; case OSMO_STREAM_MODE_OSMO_IO: