pespin has uploaded this change for review.

View Change

Port pfcp socket to osmo_iofd

As a indirect consequence, the pfcp socket now earns a Tx queue, which
should avoid sporadic tx msg drops during high load.

Related: OS#6714
Change-Id: If85888b6857899e0aca1fdd1d806c53476b4d239
---
M src/libosmo-pfcp/pfcp_endpoint.c
1 file changed, 73 insertions(+), 65 deletions(-)

git pull ssh://gerrit.osmocom.org:29418/libosmo-pfcp refs/changes/59/39459/1
diff --git a/src/libosmo-pfcp/pfcp_endpoint.c b/src/libosmo-pfcp/pfcp_endpoint.c
index e2b25d4..ee454da 100644
--- a/src/libosmo-pfcp/pfcp_endpoint.c
+++ b/src/libosmo-pfcp/pfcp_endpoint.c
@@ -30,6 +30,7 @@
#include <osmocom/core/tdef.h>
#include <osmocom/core/linuxlist.h>
#include <osmocom/core/hashtable.h>
+#include <osmocom/core/osmo_io.h>

#include <osmocom/pfcp/pfcp_endpoint.h>
#include <osmocom/pfcp/pfcp_msg.h>
@@ -39,7 +40,7 @@
struct osmo_pfcp_endpoint_cfg cfg;

/* PFCP socket */
- struct osmo_fd pfcp_fd;
+ struct osmo_io_fd *iofd;

/* The time at which this endpoint last restarted, as seconds since unix epoch. */
uint32_t recovery_time_stamp;
@@ -138,8 +139,6 @@
hash_init(ep->sent_requests_by_seq_nr);
hash_init(ep->sent_responses_by_seq_nr);

- ep->pfcp_fd.fd = -1;
-
/* time() returns seconds since 1970 (UNIX epoch), but the recovery_time_stamp is coded in the NTP format, which is
* seconds since 1900, the NTP era 0. 2208988800L is the offset between UNIX epoch and NTP era 0.
* TODO: what happens when we enter NTP era 1? Is it sufficient to integer-wrap? */
@@ -233,26 +232,22 @@
/* Directly encode and transmit the message, without storing in the retrans_queue. */
static int osmo_pfcp_endpoint_tx_data_no_logging(struct osmo_pfcp_endpoint *ep, struct osmo_pfcp_msg *m)
{
+ struct msgb *msg;
int rc;

- if (!m->encoded) {
- /* Allocate msgb as child of the message m, so that when m gets deallocated at the end of
- * retransmission queueing, the msgb gets deallocated with it. */
- m->encoded = msgb_alloc_c(m, OSMO_PFCP_MSGB_ALLOC_SIZE, "PFCP-tx");
- OSMO_ASSERT(m->encoded);
- rc = osmo_pfcp_msg_encode(m->encoded, m);
- if (rc) {
- msgb_free(m->encoded);
- m->encoded = NULL;
- return rc;
- }
+ msg = msgb_alloc_c(ep->iofd, OSMO_PFCP_MSGB_ALLOC_SIZE, "PFCP-tx");
+ OSMO_ASSERT(msg);
+
+ rc = osmo_pfcp_msg_encode(msg, m);
+ if (rc) {
+ msgb_free(msg);
+ return rc;
}

- rc = sendto(ep->pfcp_fd.fd, msgb_data(m->encoded), msgb_length(m->encoded), 0,
- (struct sockaddr *)&m->remote_addr, sizeof(m->remote_addr));
- if (rc != msgb_length(m->encoded)) {
- OSMO_LOG_PFCP_MSG(m, LOGL_ERROR, "sendto() failed: rc = %d != length %u\n",
- rc, msgb_length(m->encoded));
+ rc = osmo_iofd_sendto_msgb(ep->iofd, msg, 0, &m->remote_addr);
+ if (rc < 0) {
+ OSMO_LOG_PFCP_MSG(m, LOGL_ERROR, "sendto() failed: rc = %d\n", rc);
+ msgb_free(msg);
return -EIO;
}
return 0;
@@ -442,50 +437,63 @@
}

/* call-back for PFCP socket file descriptor */
-static int osmo_pfcp_fd_cb(struct osmo_fd *ofd, unsigned int what)
+static void osmo_pfcp_iofd_sendto_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg,
+ const struct osmo_sockaddr *daddr)
+{
+ if (OSMO_UNLIKELY(res <= 0)) {
+ char addrbuf[INET6_ADDRSTRLEN];
+ LOGP(DLPFCP, LOGL_ERROR, "PFCP Tx to %s returned %d!\n",
+ osmo_sockaddr_to_str_buf(addrbuf, sizeof(addrbuf), daddr), res);
+ }
+}
+
+static void osmo_pfcp_iofd_recvfrom_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg,
+ const struct osmo_sockaddr *saddr)
{
int rc;
- struct osmo_pfcp_endpoint *ep = ofd->data;
+ struct osmo_pfcp_endpoint *ep = osmo_iofd_get_data(iofd);

- if (what & OSMO_FD_READ) {
- struct osmo_sockaddr remote;
- socklen_t remote_len = sizeof(remote);
- struct msgb *msg = msgb_alloc_c(OTC_SELECT, OSMO_PFCP_MSGB_ALLOC_SIZE, "PFCP-rx");
- if (!msg)
- return -ENOMEM;
-
- msg->l3h = msg->tail;
- rc = recvfrom(ofd->fd, msg->tail, msgb_tailroom(msg), 0, (struct sockaddr *)&remote, &remote_len);
- if (rc <= 0)
- return -EIO;
- msgb_put(msg, rc);
-
- OSMO_ASSERT(ep->cfg.rx_msg_cb);
-
- /* This may be a bundle of PFCP messages. Parse and receive each message received, by shifting l4h
- * through the message bundle. */
- msg->l4h = msg->l3h;
- while (msgb_l4len(msg)) {
- struct osmo_gtlv_load tlv;
- struct osmo_pfcp_msg *m = osmo_pfcp_msg_alloc_rx(OTC_SELECT, &remote);
- m->encoded = msg;
-
- rc = osmo_pfcp_msg_decode_header(&tlv, m, msg);
- if (rc < 0)
- break;
- msg->l4h += rc;
-
- rc = osmo_pfcp_msg_decode_tlv(m, &tlv);
- /* If errors occurred, they have already been logged on DLPFCP. */
- if (rc == 0)
- osmo_pfcp_endpoint_handle_rx(ep, m);
- osmo_pfcp_msg_free(m);
- }
- msgb_free(msg);
+ if (OSMO_UNLIKELY(res <= 0)) {
+ char addrbuf[INET6_ADDRSTRLEN];
+ LOGP(DLPFCP, LOGL_ERROR, "PFCP Rx from %s returned %d!\n",
+ osmo_sockaddr_to_str_buf(addrbuf, sizeof(addrbuf), saddr), res);
+ return;
}
- return 0;
+
+ if (!msg)
+ return;
+
+ msg->l3h = msgb_data(msg);
+
+ OSMO_ASSERT(ep->cfg.rx_msg_cb);
+
+ /* This may be a bundle of PFCP messages. Parse and receive each message received, by shifting l4h
+ * through the message bundle. */
+ msg->l4h = msg->l3h;
+ while (msgb_l4len(msg)) {
+ struct osmo_gtlv_load tlv;
+ struct osmo_pfcp_msg *m = osmo_pfcp_msg_alloc_rx(OTC_SELECT, saddr);
+ m->encoded = msg;
+
+ rc = osmo_pfcp_msg_decode_header(&tlv, m, msg);
+ if (rc < 0)
+ break;
+ msg->l4h += rc;
+
+ rc = osmo_pfcp_msg_decode_tlv(m, &tlv);
+ /* If errors occurred, they have already been logged on DLPFCP. */
+ if (rc == 0)
+ osmo_pfcp_endpoint_handle_rx(ep, m);
+ osmo_pfcp_msg_free(m);
+ }
+ msgb_free(msg);
}

+struct osmo_io_ops ioops = {
+ .sendto_cb = &osmo_pfcp_iofd_sendto_cb,
+ .recvfrom_cb = &osmo_pfcp_iofd_recvfrom_cb,
+};
+
/*! bind a PFCP endpoint to its configured address (ep->cfg.local_addr).
* \return 0 on success, negative on error. */
int osmo_pfcp_endpoint_bind(struct osmo_pfcp_endpoint *ep)
@@ -500,9 +508,12 @@
}

/* create the new socket, binding to configured local address */
- ep->pfcp_fd.cb = osmo_pfcp_fd_cb;
- ep->pfcp_fd.data = ep;
- rc = osmo_sock_init_osa_ofd(&ep->pfcp_fd, SOCK_DGRAM, IPPROTO_UDP, &ep->cfg.local_addr, NULL, OSMO_SOCK_F_BIND);
+ rc = osmo_sock_init_osa(SOCK_DGRAM, IPPROTO_UDP, &ep->cfg.local_addr, NULL, OSMO_SOCK_F_BIND);
+ if (rc < 0)
+ return rc;
+ ep->iofd = osmo_iofd_setup(ep, rc, "pfcp", OSMO_IO_FD_MODE_RECVFROM_SENDTO, &ioops, ep);
+ osmo_iofd_register(ep->iofd, -1);
+ osmo_iofd_set_alloc_info(ep->iofd, OSMO_PFCP_MSGB_ALLOC_SIZE, 0);
if (rc < 0)
return rc;
return 0;
@@ -516,11 +527,8 @@
while ((qe = llist_first_entry_or_null(&ep->sent_responses, struct osmo_pfcp_queue_entry, entry)))
osmo_pfcp_queue_del(qe);

- if (ep->pfcp_fd.fd != -1) {
- osmo_fd_unregister(&ep->pfcp_fd);
- close(ep->pfcp_fd.fd);
- ep->pfcp_fd.fd = -1;
- }
+ osmo_iofd_free(ep->iofd);
+ ep->iofd = NULL;
}

void osmo_pfcp_endpoint_free(struct osmo_pfcp_endpoint **ep)

To view, visit change 39459. To unsubscribe, or for help writing mail filters, visit settings.

Gerrit-MessageType: newchange
Gerrit-Project: libosmo-pfcp
Gerrit-Branch: master
Gerrit-Change-Id: If85888b6857899e0aca1fdd1d806c53476b4d239
Gerrit-Change-Number: 39459
Gerrit-PatchSet: 1
Gerrit-Owner: pespin <pespin@sysmocom.de>