pespin has uploaded this change for review. ( https://gerrit.osmocom.org/c/libosmo-pfcp/+/39459?usp=email )
Change subject: Port pfcp socket to osmo_iofd ......................................................................
Port pfcp socket to osmo_iofd
As a indirect consequence, the pfcp socket now earns a Tx queue, which should avoid sporadic tx msg drops during high load.
Related: OS#6714 Change-Id: If85888b6857899e0aca1fdd1d806c53476b4d239 --- M src/libosmo-pfcp/pfcp_endpoint.c 1 file changed, 73 insertions(+), 65 deletions(-)
git pull ssh://gerrit.osmocom.org:29418/libosmo-pfcp refs/changes/59/39459/1
diff --git a/src/libosmo-pfcp/pfcp_endpoint.c b/src/libosmo-pfcp/pfcp_endpoint.c index e2b25d4..ee454da 100644 --- a/src/libosmo-pfcp/pfcp_endpoint.c +++ b/src/libosmo-pfcp/pfcp_endpoint.c @@ -30,6 +30,7 @@ #include <osmocom/core/tdef.h> #include <osmocom/core/linuxlist.h> #include <osmocom/core/hashtable.h> +#include <osmocom/core/osmo_io.h>
#include <osmocom/pfcp/pfcp_endpoint.h> #include <osmocom/pfcp/pfcp_msg.h> @@ -39,7 +40,7 @@ struct osmo_pfcp_endpoint_cfg cfg;
/* PFCP socket */ - struct osmo_fd pfcp_fd; + struct osmo_io_fd *iofd;
/* The time at which this endpoint last restarted, as seconds since unix epoch. */ uint32_t recovery_time_stamp; @@ -138,8 +139,6 @@ hash_init(ep->sent_requests_by_seq_nr); hash_init(ep->sent_responses_by_seq_nr);
- ep->pfcp_fd.fd = -1; - /* time() returns seconds since 1970 (UNIX epoch), but the recovery_time_stamp is coded in the NTP format, which is * seconds since 1900, the NTP era 0. 2208988800L is the offset between UNIX epoch and NTP era 0. * TODO: what happens when we enter NTP era 1? Is it sufficient to integer-wrap? */ @@ -233,26 +232,22 @@ /* Directly encode and transmit the message, without storing in the retrans_queue. */ static int osmo_pfcp_endpoint_tx_data_no_logging(struct osmo_pfcp_endpoint *ep, struct osmo_pfcp_msg *m) { + struct msgb *msg; int rc;
- if (!m->encoded) { - /* Allocate msgb as child of the message m, so that when m gets deallocated at the end of - * retransmission queueing, the msgb gets deallocated with it. */ - m->encoded = msgb_alloc_c(m, OSMO_PFCP_MSGB_ALLOC_SIZE, "PFCP-tx"); - OSMO_ASSERT(m->encoded); - rc = osmo_pfcp_msg_encode(m->encoded, m); - if (rc) { - msgb_free(m->encoded); - m->encoded = NULL; - return rc; - } + msg = msgb_alloc_c(ep->iofd, OSMO_PFCP_MSGB_ALLOC_SIZE, "PFCP-tx"); + OSMO_ASSERT(msg); + + rc = osmo_pfcp_msg_encode(msg, m); + if (rc) { + msgb_free(msg); + return rc; }
- rc = sendto(ep->pfcp_fd.fd, msgb_data(m->encoded), msgb_length(m->encoded), 0, - (struct sockaddr *)&m->remote_addr, sizeof(m->remote_addr)); - if (rc != msgb_length(m->encoded)) { - OSMO_LOG_PFCP_MSG(m, LOGL_ERROR, "sendto() failed: rc = %d != length %u\n", - rc, msgb_length(m->encoded)); + rc = osmo_iofd_sendto_msgb(ep->iofd, msg, 0, &m->remote_addr); + if (rc < 0) { + OSMO_LOG_PFCP_MSG(m, LOGL_ERROR, "sendto() failed: rc = %d\n", rc); + msgb_free(msg); return -EIO; } return 0; @@ -442,50 +437,63 @@ }
/* call-back for PFCP socket file descriptor */ -static int osmo_pfcp_fd_cb(struct osmo_fd *ofd, unsigned int what) +static void osmo_pfcp_iofd_sendto_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg, + const struct osmo_sockaddr *daddr) +{ + if (OSMO_UNLIKELY(res <= 0)) { + char addrbuf[INET6_ADDRSTRLEN]; + LOGP(DLPFCP, LOGL_ERROR, "PFCP Tx to %s returned %d!\n", + osmo_sockaddr_to_str_buf(addrbuf, sizeof(addrbuf), daddr), res); + } +} + +static void osmo_pfcp_iofd_recvfrom_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg, + const struct osmo_sockaddr *saddr) { int rc; - struct osmo_pfcp_endpoint *ep = ofd->data; + struct osmo_pfcp_endpoint *ep = osmo_iofd_get_data(iofd);
- if (what & OSMO_FD_READ) { - struct osmo_sockaddr remote; - socklen_t remote_len = sizeof(remote); - struct msgb *msg = msgb_alloc_c(OTC_SELECT, OSMO_PFCP_MSGB_ALLOC_SIZE, "PFCP-rx"); - if (!msg) - return -ENOMEM; - - msg->l3h = msg->tail; - rc = recvfrom(ofd->fd, msg->tail, msgb_tailroom(msg), 0, (struct sockaddr *)&remote, &remote_len); - if (rc <= 0) - return -EIO; - msgb_put(msg, rc); - - OSMO_ASSERT(ep->cfg.rx_msg_cb); - - /* This may be a bundle of PFCP messages. Parse and receive each message received, by shifting l4h - * through the message bundle. */ - msg->l4h = msg->l3h; - while (msgb_l4len(msg)) { - struct osmo_gtlv_load tlv; - struct osmo_pfcp_msg *m = osmo_pfcp_msg_alloc_rx(OTC_SELECT, &remote); - m->encoded = msg; - - rc = osmo_pfcp_msg_decode_header(&tlv, m, msg); - if (rc < 0) - break; - msg->l4h += rc; - - rc = osmo_pfcp_msg_decode_tlv(m, &tlv); - /* If errors occurred, they have already been logged on DLPFCP. */ - if (rc == 0) - osmo_pfcp_endpoint_handle_rx(ep, m); - osmo_pfcp_msg_free(m); - } - msgb_free(msg); + if (OSMO_UNLIKELY(res <= 0)) { + char addrbuf[INET6_ADDRSTRLEN]; + LOGP(DLPFCP, LOGL_ERROR, "PFCP Rx from %s returned %d!\n", + osmo_sockaddr_to_str_buf(addrbuf, sizeof(addrbuf), saddr), res); + return; } - return 0; + + if (!msg) + return; + + msg->l3h = msgb_data(msg); + + OSMO_ASSERT(ep->cfg.rx_msg_cb); + + /* This may be a bundle of PFCP messages. Parse and receive each message received, by shifting l4h + * through the message bundle. */ + msg->l4h = msg->l3h; + while (msgb_l4len(msg)) { + struct osmo_gtlv_load tlv; + struct osmo_pfcp_msg *m = osmo_pfcp_msg_alloc_rx(OTC_SELECT, saddr); + m->encoded = msg; + + rc = osmo_pfcp_msg_decode_header(&tlv, m, msg); + if (rc < 0) + break; + msg->l4h += rc; + + rc = osmo_pfcp_msg_decode_tlv(m, &tlv); + /* If errors occurred, they have already been logged on DLPFCP. */ + if (rc == 0) + osmo_pfcp_endpoint_handle_rx(ep, m); + osmo_pfcp_msg_free(m); + } + msgb_free(msg); }
+struct osmo_io_ops ioops = { + .sendto_cb = &osmo_pfcp_iofd_sendto_cb, + .recvfrom_cb = &osmo_pfcp_iofd_recvfrom_cb, +}; + /*! bind a PFCP endpoint to its configured address (ep->cfg.local_addr). * \return 0 on success, negative on error. */ int osmo_pfcp_endpoint_bind(struct osmo_pfcp_endpoint *ep) @@ -500,9 +508,12 @@ }
/* create the new socket, binding to configured local address */ - ep->pfcp_fd.cb = osmo_pfcp_fd_cb; - ep->pfcp_fd.data = ep; - rc = osmo_sock_init_osa_ofd(&ep->pfcp_fd, SOCK_DGRAM, IPPROTO_UDP, &ep->cfg.local_addr, NULL, OSMO_SOCK_F_BIND); + rc = osmo_sock_init_osa(SOCK_DGRAM, IPPROTO_UDP, &ep->cfg.local_addr, NULL, OSMO_SOCK_F_BIND); + if (rc < 0) + return rc; + ep->iofd = osmo_iofd_setup(ep, rc, "pfcp", OSMO_IO_FD_MODE_RECVFROM_SENDTO, &ioops, ep); + osmo_iofd_register(ep->iofd, -1); + osmo_iofd_set_alloc_info(ep->iofd, OSMO_PFCP_MSGB_ALLOC_SIZE, 0); if (rc < 0) return rc; return 0; @@ -516,11 +527,8 @@ while ((qe = llist_first_entry_or_null(&ep->sent_responses, struct osmo_pfcp_queue_entry, entry))) osmo_pfcp_queue_del(qe);
- if (ep->pfcp_fd.fd != -1) { - osmo_fd_unregister(&ep->pfcp_fd); - close(ep->pfcp_fd.fd); - ep->pfcp_fd.fd = -1; - } + osmo_iofd_free(ep->iofd); + ep->iofd = NULL; }
void osmo_pfcp_endpoint_free(struct osmo_pfcp_endpoint **ep)