pespin submitted this change.

View Change

Approvals: Jenkins Builder: Verified osmith: Looks good to me, but someone else must approve pespin: Looks good to me, approved laforge: Looks good to me, but someone else must approve
stream_srv: Add API osmo_stream_srv_link_set_tx_queue_max_length()

Change-Id: I3c2deac7f7be0cf838834135a548cce70367a905
---
M TODO-RELEASE
M include/osmocom/netif/stream.h
M src/stream_srv.c
3 files changed, 33 insertions(+), 7 deletions(-)

diff --git a/TODO-RELEASE b/TODO-RELEASE
index 74a0abb..268b142 100644
--- a/TODO-RELEASE
+++ b/TODO-RELEASE
@@ -8,4 +8,4 @@
# If any interfaces have been removed or changed since the last public release: c:r:0.
#library what description / commit summary line
libosmo-netif add API osmo_stream_cli_set_{ip_dscp,priority}(), osmo_stream_srv_link_set_{ip_dscp,priority}()
-libosmo-netif add API osmo-stream_cli_set_tx_queue_max_length()
+libosmo-netif add API osmo-stream_cli_set_tx_queue_max_length(), osmo_stream_srv_link_set_tx_queue_max_length()
diff --git a/include/osmocom/netif/stream.h b/include/osmocom/netif/stream.h
index 3b3e04e..a132a27 100644
--- a/include/osmocom/netif/stream.h
+++ b/include/osmocom/netif/stream.h
@@ -81,6 +81,7 @@
void osmo_stream_srv_link_set_accept_cb(struct osmo_stream_srv_link *link, osmo_stream_srv_link_accept_cb_t accept_cb);
void osmo_stream_srv_link_set_data(struct osmo_stream_srv_link *link, void *data);
void *osmo_stream_srv_link_get_data(struct osmo_stream_srv_link *link);
+int osmo_stream_srv_link_set_tx_queue_max_length(struct osmo_stream_srv_link *link, unsigned int size);
char *osmo_stream_srv_link_get_sockname(const struct osmo_stream_srv_link *link);
struct osmo_fd *osmo_stream_srv_link_get_ofd(struct osmo_stream_srv_link *link);
int osmo_stream_srv_link_get_fd(const struct osmo_stream_srv_link *link);
diff --git a/src/stream_srv.c b/src/stream_srv.c
index aa7edbf..4f9323b 100644
--- a/src/stream_srv.c
+++ b/src/stream_srv.c
@@ -87,6 +87,7 @@
osmo_stream_srv_link_accept_cb_t accept_cb;
void *data;
int flags;
+ unsigned int tx_queue_max_length; /* Max amount of msgbs which can be enqueued */
struct osmo_sock_init2_multiaddr_pars ma_pars;
};

@@ -203,6 +204,7 @@
link->sk_domain = AF_UNSPEC;
link->sk_type = SOCK_STREAM;
link->proto = IPPROTO_TCP;
+ link->tx_queue_max_length = 1024; /* Default tx queue size, msgbs. */
osmo_fd_setup(&link->ofd, -1, OSMO_FD_READ | OSMO_FD_WRITE, osmo_stream_srv_link_ofd_cb, link, 0);

link->ma_pars.sctp.version = 0;
@@ -394,6 +396,18 @@
return link->data;
}

+/*! Set the maximum length queue of the stream servers accepted and allocated from this server link.
+ * \param[in] link Stream Server Link to modify
+ * \param[in] size maximum amount of msgbs which can be queued in the internal tx queue.
+ * \returns 0 on success, negative on error.
+ *
+ * The maximum length queue default value is 1024 msgbs. */
+int osmo_stream_srv_link_set_tx_queue_max_length(struct osmo_stream_srv_link *link, unsigned int size)
+{
+ link->tx_queue_max_length = size;
+ return 0;
+}
+
/* Similar to osmo_sock_multiaddr_get_name_buf(), but aimed at listening sockets (only local part): */
static char *get_local_sockname_buf(char *buf, size_t buf_len, const struct osmo_stream_srv_link *link)
{
@@ -643,7 +657,8 @@
struct osmo_fd ofd;
struct osmo_io_fd *iofd;
};
- struct llist_head tx_queue;
+ struct llist_head tx_queue; /* osmo_ofd mode (only): Queue of msgbs */
+ unsigned int tx_queue_count; /* osmo_ofd mode (only): Current amount of msgbs queued */
osmo_stream_srv_closed_cb_t closed_cb;
osmo_stream_srv_read_cb_t read_cb;
osmo_stream_srv_read_cb2_t iofd_read_cb;
@@ -769,12 +784,11 @@
struct msgb *msg;
int ret;

- if (llist_empty(&conn->tx_queue)) {
+ msg = msgb_dequeue_count(&conn->tx_queue, &conn->tx_queue_count);
+ if (!msg) { /* done, tx_queue empty */
osmo_fd_write_disable(&conn->ofd);
return;
}
- msg = llist_first_entry(&conn->tx_queue, struct msgb, list);
- llist_del(&msg->list);

LOGSSRV(conn, LOGL_DEBUG, "sending %u bytes of data\n", msg->len);

@@ -811,6 +825,7 @@
/* Update msgb and re-add it at the start of the queue: */
msgb_pull(msg, ret);
llist_add(&msg->list, &conn->tx_queue);
+ conn->tx_queue_count++;
return;
}

@@ -820,6 +835,7 @@
if (err == EAGAIN) {
/* Re-add at the start of the queue to re-attempt: */
llist_add(&msg->list, &conn->tx_queue);
+ conn->tx_queue_count++;
return;
}
msgb_free(msg);
@@ -923,6 +939,7 @@

conn->mode = OSMO_STREAM_MODE_OSMO_IO;
conn->srv = link;
+ conn->data = data;

osmo_sock_get_name_buf(conn->sockname, sizeof(conn->sockname), fd);

@@ -939,7 +956,8 @@
talloc_free(conn);
return NULL;
}
- conn->data = data;
+
+ osmo_iofd_set_txqueue_max_length(conn->iofd, conn->srv->tx_queue_max_length);

if (osmo_iofd_register(conn->iofd, fd) < 0) {
LOGSSRV(conn, LOGL_ERROR, "could not register FD %d\n", fd);
@@ -1127,6 +1145,7 @@
osmo_fd_unregister(&conn->ofd);
close(conn->ofd.fd);
msgb_queue_free(&conn->tx_queue);
+ conn->tx_queue_count = 0;
conn->ofd.fd = -1;
break;
case OSMO_STREAM_MODE_OSMO_IO:
@@ -1158,7 +1177,12 @@

switch (conn->mode) {
case OSMO_STREAM_MODE_OSMO_FD:
- msgb_enqueue(&conn->tx_queue, msg);
+ if (conn->tx_queue_count >= conn->srv->tx_queue_max_length) {
+ LOGSSRV(conn, LOGL_ERROR, "send: tx queue full, dropping msg!\n");
+ msgb_free(msg);
+ return;
+ }
+ msgb_enqueue_count(&conn->tx_queue, msg, &conn->tx_queue_count);
osmo_fd_write_enable(&conn->ofd);
break;
case OSMO_STREAM_MODE_OSMO_IO:
@@ -1249,6 +1273,7 @@
switch (conn->mode) {
case OSMO_STREAM_MODE_OSMO_FD:
msgb_queue_free(&conn->tx_queue);
+ conn->tx_queue_count = 0;
osmo_fd_write_disable(&conn->ofd);
break;
case OSMO_STREAM_MODE_OSMO_IO:

To view, visit change 38956. To unsubscribe, or for help writing mail filters, visit settings.

Gerrit-MessageType: merged
Gerrit-Project: libosmo-netif
Gerrit-Branch: master
Gerrit-Change-Id: I3c2deac7f7be0cf838834135a548cce70367a905
Gerrit-Change-Number: 38956
Gerrit-PatchSet: 1
Gerrit-Owner: pespin <pespin@sysmocom.de>
Gerrit-Reviewer: Jenkins Builder
Gerrit-Reviewer: daniel <dwillmann@sysmocom.de>
Gerrit-Reviewer: laforge <laforge@osmocom.org>
Gerrit-Reviewer: osmith <osmith@sysmocom.de>
Gerrit-Reviewer: pespin <pespin@sysmocom.de>