arehbein has uploaded this change for review. (
https://gerrit.osmocom.org/c/libosmo-netif/+/33193 )
Change subject: Add osmo_io support to osmo_stream_cli and osmo_stream_srv
......................................................................
Add osmo_io support to osmo_stream_cli and osmo_stream_srv
Change-Id: I2f52c7107c392b6f4b0bf2a84f8c873c084a200c
---
M include/osmocom/netif/stream.h
M src/stream.c
2 files changed, 403 insertions(+), 36 deletions(-)
git pull ssh://gerrit.osmocom.org:29418/libosmo-netif refs/changes/93/33193/1
diff --git a/include/osmocom/netif/stream.h b/include/osmocom/netif/stream.h
index a52e7c4..0e983a4 100644
--- a/include/osmocom/netif/stream.h
+++ b/include/osmocom/netif/stream.h
@@ -45,6 +45,7 @@
struct osmo_stream_srv;
struct osmo_stream_srv *osmo_stream_srv_create(void *ctx, struct osmo_stream_srv_link
*link, int fd, int (*cb)(struct osmo_stream_srv *conn), int (*closed_cb)(struct
osmo_stream_srv *conn), void *data);
+struct osmo_stream_srv *osmo_stream_srv_create_iofd(void *ctx, const char *name, struct
osmo_stream_srv_link *link, int fd, int (*read_cb)(struct osmo_stream_srv *conn, struct
msgb *msg), int (*closed_cb)(struct osmo_stream_srv *conn), void *data);
void *osmo_stream_srv_get_data(struct osmo_stream_srv *conn);
struct osmo_stream_srv_link *osmo_stream_srv_get_master(struct osmo_stream_srv *conn);
struct osmo_fd *osmo_stream_srv_get_ofd(struct osmo_stream_srv *srv);
@@ -79,10 +80,12 @@
void osmo_stream_cli_set_connect_cb(struct osmo_stream_cli *cli, int (*connect_cb)(struct
osmo_stream_cli *cli));
void osmo_stream_cli_set_disconnect_cb(struct osmo_stream_cli *cli, int
(*disconnect_cb)(struct osmo_stream_cli *cli));
void osmo_stream_cli_set_read_cb(struct osmo_stream_cli *cli, int (*read_cb)(struct
osmo_stream_cli *cli));
+void osmo_stream_cli_set_iofd_read_cb(struct osmo_stream_cli *cli, int (*read_cb)(struct
osmo_stream_cli *cli, struct msgb* msg));
void osmo_stream_cli_reconnect(struct osmo_stream_cli *cli);
bool osmo_stream_cli_is_connected(struct osmo_stream_cli *cli);
struct osmo_stream_cli *osmo_stream_cli_create(void *ctx);
+struct osmo_stream_cli *osmo_stream_cli_create_iofd(void *ctx, const char *name);
void osmo_stream_cli_destroy(struct osmo_stream_cli *cli);
int osmo_stream_cli_open(struct osmo_stream_cli *cli);
diff --git a/src/stream.c b/src/stream.c
index 6afb7ca..6388928 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -37,6 +37,8 @@
#include <osmocom/core/utils.h>
#include <osmocom/gsm/tlv.h>
#include <osmocom/core/msgb.h>
+#include <osmocom/core/osmo_io.h>
+#include <osmocom/core/panic.h>
#include <osmocom/core/logging.h>
#include <osmocom/core/talloc.h>
#include <osmocom/core/socket.h>
@@ -247,8 +249,17 @@
#define OSMO_STREAM_MAX_ADDRS 1
#endif
+enum osmo_stream_mode {
+ OSMO_STREAM_MODE_OSMO_FD,
+ OSMO_STREAM_MODE_OSMO_IO,
+};
+
struct osmo_stream_cli {
- struct osmo_fd ofd;
+ enum osmo_stream_mode mode;
+ union {
+ struct osmo_fd ofd;
+ struct osmo_io_fd *iofd;
+ };
struct llist_head tx_queue;
struct osmo_timer_list timer;
enum osmo_stream_cli_state state;
@@ -264,6 +275,7 @@
int (*connect_cb)(struct osmo_stream_cli *cli);
int (*disconnect_cb)(struct osmo_stream_cli *cli);
int (*read_cb)(struct osmo_stream_cli *cli);
+ int (*iofd_read_cb)(struct osmo_stream_cli *cli, struct msgb *msg);
int (*write_cb)(struct osmo_stream_cli *cli);
void *data;
int flags;
@@ -300,17 +312,40 @@
return cli->state == STREAM_CLI_STATE_CONNECTED;
}
-/*! \brief Close an Osmocom Stream Client
- * \param[in] cli Osmocom Stream Client to be closed
- * We unregister the socket fd from the osmocom select() loop
- * abstraction and close the socket */
-void osmo_stream_cli_close(struct osmo_stream_cli *cli)
+static void osmo_stream_cli_close_iofd(struct osmo_stream_cli *cli)
+{
+ if (!cli->iofd)
+ return;
+
+ osmo_iofd_close(cli->iofd);
+}
+
+static void osmo_stream_cli_close_ofd(struct osmo_stream_cli *cli)
{
if (cli->ofd.fd == -1)
return;
osmo_fd_unregister(&cli->ofd);
close(cli->ofd.fd);
cli->ofd.fd = -1;
+}
+
+/*! \brief Close an Osmocom Stream Client
+ * \param[in] cli Osmocom Stream Client to be closed
+ * We unregister the socket fd from the osmocom select() loop
+ * abstraction and close the socket */
+void osmo_stream_cli_close(struct osmo_stream_cli *cli)
+{
+ if (cli->state == STREAM_CLI_STATE_CLOSED)
+ return;
+
+ switch (cli->mode) {
+ case OSMO_STREAM_MODE_OSMO_FD:
+ osmo_stream_cli_close_ofd(cli);
+ break;
+ case OSMO_STREAM_MODE_OSMO_IO:
+ osmo_stream_cli_close_iofd(cli);
+ break;
+ }
if (cli->state == STREAM_CLI_STATE_CONNECTED) {
LOGSCLI(cli, LOGL_DEBUG, "connection closed\n");
@@ -323,7 +358,14 @@
static inline int osmo_stream_cli_fd(const struct osmo_stream_cli *cli)
{
- return cli->ofd.fd;
+ switch (cli->mode) {
+ case OSMO_STREAM_MODE_OSMO_FD:
+ return cli->ofd.fd;
+ case OSMO_STREAM_MODE_OSMO_IO:
+ return osmo_iofd_get_fd(cli->iofd);
+ default:
+ return -ENOTSUP;
+ }
}
static void osmo_stream_cli_read(struct osmo_stream_cli *cli)
@@ -331,7 +373,7 @@
LOGSCLI(cli, LOGL_DEBUG, "message received\n");
if (cli->read_cb)
- cli->read_cb(cli);
+ cli->read_cb(cli); // TODO: Return value?
}
static int osmo_stream_cli_write(struct osmo_stream_cli *cli)
@@ -457,12 +499,12 @@
break;
}
if (cli->connect_cb)
- cli->connect_cb(cli);
+ cli->connect_cb(cli); // TODO: Check return value?
break;
case STREAM_CLI_STATE_CONNECTED:
if (what & OSMO_FD_READ) {
LOGSCLI(cli, LOGL_DEBUG, "connected read\n");
- osmo_stream_cli_read(cli);
+ osmo_stream_cli_read(cli); // TODO: Check return value and return early?
}
if (what & OSMO_FD_WRITE) {
LOGSCLI(cli, LOGL_DEBUG, "connected write\n");
@@ -492,6 +534,7 @@
if (!cli)
return NULL;
+ cli->mode = OSMO_STREAM_MODE_OSMO_FD;
cli->sk_domain = AF_UNSPEC;
cli->sk_type = SOCK_STREAM;
cli->proto = IPPROTO_TCP;
@@ -507,6 +550,146 @@
return cli;
}
+static inline void osmo_stream_cli_handle_connecting(struct osmo_stream_cli *cli, int
fd)
+{
+ int ret, error;
+ socklen_t len = sizeof(error);
+
+ ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len);
+ if (ret >= 0 && error > 0) {
+ osmo_stream_cli_reconnect(cli);
+ return;
+ }
+
+ LOGSCLI(cli, LOGL_DEBUG, "connection done.\n");
+ cli->state = STREAM_CLI_STATE_CONNECTED;
+ switch (cli->sk_domain) {
+ case AF_UNIX:
+ _setsockopt_nosigpipe(cli);
+ break;
+ case AF_UNSPEC:
+ case AF_INET:
+ case AF_INET6:
+ if (cli->proto == IPPROTO_SCTP) {
+ _setsockopt_nosigpipe(cli);
+ sctp_sock_activate_events(fd);
+ }
+ break;
+ default:
+ break;
+ }
+ if (cli->connect_cb)
+ cli->connect_cb(cli);
+}
+
+static void handle_connecting(struct osmo_stream_cli *cli, int res)
+{
+ int error, ret = res;
+ socklen_t len = sizeof(error);
+
+ int fd = osmo_stream_cli_fd(cli);
+
+ if (ret < 0) {
+ osmo_stream_cli_reconnect(cli);
+ return;
+ }
+ ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len);
+ if (ret >= 0 && error > 0) {
+ osmo_stream_cli_reconnect(cli);
+ return;
+ }
+
+ LOGSCLI(cli, LOGL_DEBUG, "connection done.\n");
+ cli->state = STREAM_CLI_STATE_CONNECTED;
+ switch (cli->sk_domain) {
+ case AF_UNIX:
+ _setsockopt_nosigpipe(cli);
+ break;
+ case AF_UNSPEC:
+ case AF_INET:
+ case AF_INET6:
+ if (cli->proto == IPPROTO_SCTP) {
+ //FIXME!!
+ _setsockopt_nosigpipe(cli);
+ sctp_sock_activate_events(fd);
+ }
+ break;
+ default:
+ break;
+ }
+ if (cli->connect_cb)
+ cli->connect_cb(cli);
+}
+
+static void stream_cli_iofd_read_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg)
+{
+ struct osmo_stream_cli *cli = osmo_iofd_get_data(iofd);
+
+ switch (cli->state) {
+ case STREAM_CLI_STATE_CONNECTING:
+ handle_connecting(cli, res);
+ break;
+ case STREAM_CLI_STATE_CONNECTED:
+ if (res == 0)
+ osmo_stream_cli_reconnect(cli);
+ else if (cli->iofd_read_cb)
+ cli->iofd_read_cb(cli, msg);
+ break;
+ default:
+ osmo_panic("osmo_stream_cli_write_cb() called with unexpected state %d\n",
cli->state);
+ }
+}
+
+static void stream_cli_iofd_write_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg)
+{
+ struct osmo_stream_cli *cli = osmo_iofd_get_data(iofd);
+
+ switch (cli->state) {
+ case STREAM_CLI_STATE_CONNECTING:
+ handle_connecting(cli, res);
+ break;
+ case STREAM_CLI_STATE_CONNECTED:
+ if (msg && res <= 0) {
+ osmo_stream_cli_reconnect(cli);
+ LOGSCLI(cli, LOGL_ERROR, "error %d to send\n", res);
+ }
+ break;
+ default:
+ osmo_panic("osmo_stream_cli_write_cb() called with unexpected state %d\n",
cli->state);
+ }
+}
+
+static struct osmo_io_ops osmo_stream_cli_ioops = {
+ .read_cb = stream_cli_iofd_read_cb,
+ .write_cb = stream_cli_iofd_write_cb,
+
+ .segmentation_cb = NULL,
+};
+
+
+struct osmo_stream_cli *osmo_stream_cli_create_iofd(void *ctx, const char *name)
+{
+ struct osmo_stream_cli *cli;
+
+ cli = talloc_zero(ctx, struct osmo_stream_cli);
+ if (!cli)
+ return NULL;
+
+ cli->mode = OSMO_STREAM_MODE_OSMO_IO;
+ cli->sk_domain = AF_UNSPEC;
+ cli->sk_type = SOCK_STREAM;
+ cli->proto = IPPROTO_TCP;
+
+ cli->iofd = osmo_iofd_setup(ctx, -1, name, OSMO_IO_FD_MODE_READ_WRITE,
&osmo_stream_cli_ioops, cli);
+
+ cli->state = STREAM_CLI_STATE_CLOSED;
+ osmo_timer_setup(&cli->timer, cli_timer_cb, cli);
+ cli->reconnect_timeout = 5; /* default is 5 seconds. */
+ INIT_LLIST_HEAD(&cli->tx_queue);
+
+ return cli;
+}
+
/*! \brief Set the remote address to which we connect
* \param[in] cli Stream Client to modify
* \param[in] addr Remote IP address
@@ -694,9 +877,20 @@
struct osmo_fd *
osmo_stream_cli_get_ofd(struct osmo_stream_cli *cli)
{
+ OSMO_ASSERT(cli->mode == OSMO_STREAM_MODE_OSMO_FD);
return &cli->ofd;
}
+/*! \brief Get OsmoIO File Descriptor of the stream client socket
+ * \param[in] cli Stream Client to modify
+ * \returns Pointer to \ref osmo_iofd */
+struct osmo_io_fd *
+osmo_stream_cli_get_iofd(struct osmo_stream_cli *cli)
+{
+ OSMO_ASSERT(cli->mode == OSMO_STREAM_MODE_OSMO_IO);
+ return cli->iofd;
+}
+
/*! \brief Set the call-back function called on connect of the stream client socket
* \param[in] cli Stream Client to modify
* \param[in] connect_cb Call-back function to be called upon connect */
@@ -717,15 +911,29 @@
}
/*! \brief Set the call-back function called to read from the stream client socket
+ * Only for osmo_stream_cli created with osmo_stream_cli_create()
* \param[in] cli Stream Client to modify
* \param[in] read_cb Call-back function to be called when we want to read */
void
osmo_stream_cli_set_read_cb(struct osmo_stream_cli *cli,
int (*read_cb)(struct osmo_stream_cli *cli))
{
+ OSMO_ASSERT(cli->mode == OSMO_STREAM_MODE_OSMO_FD);
cli->read_cb = read_cb;
}
+/*! \brief Set the call-back function called to read from the stream client socket
+ * Only for oamo_stream_cli created with osmo_stream_cli_create_iofd()
+ * \param[in] cli Stream Client to modify
+ * \param[in] read_cb Call-back function to be called when we want to read */
+void
+osmo_stream_cli_set_iofd_read_cb(struct osmo_stream_cli *cli,
+ int (*read_cb)(struct osmo_stream_cli *cli, struct msgb* msg))
+{
+ OSMO_ASSERT(cli->mode == OSMO_STREAM_MODE_OSMO_IO);
+ cli->iofd_read_cb = read_cb;
+}
+
/*! \brief Destroy a Osmocom stream client (includes close)
* \param[in] cli Stream Client to destroy */
void osmo_stream_cli_destroy(struct osmo_stream_cli *cli)
@@ -864,9 +1072,21 @@
goto error_close_socket;
}
- osmo_fd_setup(&cli->ofd, fd, OSMO_FD_READ | OSMO_FD_WRITE, cli->ofd.cb,
cli->ofd.data, cli->ofd.priv_nr);
- if (osmo_fd_register(&cli->ofd) < 0)
- goto error_close_socket;
+ switch (cli->mode) {
+ case OSMO_STREAM_MODE_OSMO_FD:
+ osmo_fd_setup(&cli->ofd, fd, OSMO_FD_READ | OSMO_FD_WRITE, cli->ofd.cb,
cli->ofd.data, cli->ofd.priv_nr);
+ if (osmo_fd_register(&cli->ofd) < 0)
+ goto error_close_socket;
+ break;
+ case OSMO_STREAM_MODE_OSMO_IO:
+ if (osmo_iofd_register(cli->iofd, fd) < 0)
+ goto error_close_socket;
+ osmo_iofd_read_enable(cli->iofd);
+ osmo_iofd_write_enable(cli->iofd);
+ break;
+ default:
+ OSMO_ASSERT(false);
+ }
cli->state = STREAM_CLI_STATE_CONNECTING;
return 0;
@@ -874,7 +1094,7 @@
error_close_socket:
cli->state = STREAM_CLI_STATE_CLOSED;
close(cli->ofd.fd);
- cli->ofd.fd = -1;
+ cli->ofd.fd = -1; // XXX
return -EIO;
}
@@ -893,8 +1113,17 @@
{
OSMO_ASSERT(cli);
OSMO_ASSERT(msg);
- msgb_enqueue(&cli->tx_queue, msg);
- osmo_fd_write_enable(&cli->ofd);
+
+ switch (cli->mode) {
+ case OSMO_STREAM_MODE_OSMO_FD:
+ msgb_enqueue(&cli->tx_queue, msg);
+ osmo_fd_write_enable(&cli->ofd);
+ break;
+ case OSMO_STREAM_MODE_OSMO_IO:
+ osmo_iofd_write_msgb(cli->iofd, msg);
+ osmo_iofd_write_enable(cli->iofd);
+ break;
+ }
}
/*! \brief Receive data via an Osmocom stream client
@@ -926,11 +1155,19 @@
void osmo_stream_cli_clear_tx_queue(struct osmo_stream_cli *cli)
{
- msgb_queue_free(&cli->tx_queue);
- /* If in state 'connecting', keep WRITE flag up to receive
- * socket connection signal and then transition to STATE_CONNECTED: */
- if (cli->state == STREAM_CLI_STATE_CONNECTED)
- osmo_fd_write_disable(&cli->ofd);
+ switch (cli->mode) {
+ case OSMO_STREAM_MODE_OSMO_FD:
+ msgb_queue_free(&cli->tx_queue);
+ /* If in state 'connecting', keep WRITE flag up to receive
+ * socket connection signal and then transition to STATE_CONNECTED: */
+ if (cli->state == STREAM_CLI_STATE_CONNECTED)
+ osmo_fd_write_disable(&cli->ofd);
+ break;
+ case OSMO_STREAM_MODE_OSMO_IO:
+ osmo_iofd_txqueue_clear(cli->iofd);
+ osmo_iofd_write_disable(cli->iofd);
+ break;
+ }
}
/*
@@ -941,7 +1178,11 @@
#define OSMO_STREAM_SRV_F_NODELAY (1 << 1)
struct osmo_stream_srv_link {
- struct osmo_fd ofd;
+ enum osmo_stream_mode mode;
+ union {
+ struct osmo_fd ofd;
+ struct osmo_io_fd *iofd;
+ };
char *addr[OSMO_STREAM_MAX_ADDRS];
uint8_t addrcnt;
uint16_t port;
@@ -953,7 +1194,7 @@
int flags;
};
-static int osmo_stream_srv_fd_cb(struct osmo_fd *ofd, unsigned int what)
+static int osmo_stream_srv_ofd_cb(struct osmo_fd *ofd, unsigned int what)
{
int ret;
int sock_fd;
@@ -1038,7 +1279,23 @@
link->sk_domain = AF_UNSPEC;
link->sk_type = SOCK_STREAM;
link->proto = IPPROTO_TCP;
- osmo_fd_setup(&link->ofd, -1, OSMO_FD_READ | OSMO_FD_WRITE,
osmo_stream_srv_fd_cb, link, 0);
+ osmo_fd_setup(&link->ofd, -1, OSMO_FD_READ | OSMO_FD_WRITE,
osmo_stream_srv_ofd_cb, link, 0);
+
+ return link;
+}
+
+struct osmo_stream_srv_link *osmo_stream_srv_link_create_iofd(void *ctx, const char
*name)
+{
+ struct osmo_stream_srv_link *link;
+
+ link = talloc_zero(ctx, struct osmo_stream_srv_link);
+ if (!link)
+ return NULL;
+
+ link->sk_domain = AF_UNSPEC;
+ link->sk_type = SOCK_STREAM;
+ link->proto = IPPROTO_TCP;
+ osmo_fd_setup(&link->ofd, -1, OSMO_FD_READ | OSMO_FD_WRITE,
osmo_stream_srv_ofd_cb, link, 0);
return link;
}
@@ -1304,10 +1561,15 @@
struct osmo_stream_srv {
struct osmo_stream_srv_link *srv;
- struct osmo_fd ofd;
+ enum osmo_stream_mode mode;
+ union {
+ struct osmo_fd ofd;
+ struct osmo_io_fd *iofd;
+ };
struct llist_head tx_queue;
int (*closed_cb)(struct osmo_stream_srv *peer);
- int (*cb)(struct osmo_stream_srv *peer);
+ int (*read_cb)(struct osmo_stream_srv *peer);
+ int (*iofd_read_cb)(struct osmo_stream_srv *peer, struct msgb *msg);
void *data;
int flags;
};
@@ -1323,8 +1585,8 @@
return 0;
}
- if (conn->cb)
- rc = conn->cb(conn);
+ if (conn->read_cb)
+ rc = conn->read_cb(conn);
return rc;
}
@@ -1415,12 +1677,13 @@
struct osmo_stream_srv *
osmo_stream_srv_create(void *ctx, struct osmo_stream_srv_link *link,
int fd,
- int (*cb)(struct osmo_stream_srv *conn),
+ int (*read_cb)(struct osmo_stream_srv *conn),
int (*closed_cb)(struct osmo_stream_srv *conn), void *data)
{
struct osmo_stream_srv *conn;
OSMO_ASSERT(link);
+ OSMO_ASSERT(link->mode == OSMO_STREAM_MODE_OSMO_FD);
conn = talloc_zero(ctx, struct osmo_stream_srv);
if (conn == NULL) {
@@ -1430,7 +1693,7 @@
}
conn->srv = link;
osmo_fd_setup(&conn->ofd, fd, OSMO_FD_READ, osmo_stream_srv_cb, conn, 0);
- conn->cb = cb;
+ conn->read_cb = read_cb;
conn->closed_cb = closed_cb;
conn->data = data;
INIT_LLIST_HEAD(&conn->tx_queue);
@@ -1443,6 +1706,79 @@
return conn;
}
+static void stream_srv_iofd_read_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg)
+{
+ struct osmo_stream_srv *conn = osmo_iofd_get_data(iofd);
+ LOGP(DLINP, LOGL_DEBUG, "message received (res=%d)\n", res);
+
+ if (conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY) {
+ LOGP(DLINP, LOGL_DEBUG, "Connection is being flushed and closed; ignoring received
message\n");
+ msgb_free(msg);
+ return;
+ }
+
+ if (res <= 0) {
+ osmo_stream_srv_set_flush_and_destroy(conn);
+ if (osmo_iofd_txqueue_len(iofd) == 0)
+ osmo_stream_srv_destroy(conn);
+ } else if (conn->iofd_read_cb) {
+ conn->iofd_read_cb(conn, msg); // TODO: Handle return value?
+ }
+
+ return;
+}
+
+static void stream_srv_iofd_write_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg)
+{
+ struct osmo_stream_srv *conn = osmo_iofd_get_data(iofd);
+ LOGP(DLINP, LOGL_DEBUG, "connected write\n");
+
+ if (res == -1)
+ LOGP(DLINP, LOGL_ERROR, "error to send: %s\n", strerror(errno));
+
+ if (osmo_iofd_txqueue_len(iofd) == 0)
+ if (conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY)
+ osmo_stream_srv_destroy(conn);
+}
+
+static struct osmo_io_ops srv_ioops = {
+ .read_cb = stream_srv_iofd_read_cb,
+ .write_cb = stream_srv_iofd_write_cb,
+};
+
+struct osmo_stream_srv *
+osmo_stream_srv_create_iofd(void *ctx, const char *name,
+ struct osmo_stream_srv_link *link, int fd,
+ int (*read_cb)(struct osmo_stream_srv *conn, struct msgb *msg),
+ int (*closed_cb)(struct osmo_stream_srv *conn), void *data)
+{
+ struct osmo_stream_srv *conn;
+
+ OSMO_ASSERT(link);
+ //OSMO_ASSERT(link->mode == OSMO_STREAM_MODE_OSMO_IO);
+
+ conn = talloc_zero(ctx, struct osmo_stream_srv);
+ if (conn == NULL) {
+ LOGP(DLINP, LOGL_ERROR, "cannot allocate new peer in srv, "
+ "reason=`%s'\n", strerror(errno));
+ return NULL;
+ }
+ conn->mode = OSMO_STREAM_MODE_OSMO_IO;
+ conn->srv = link;
+ conn->iofd = osmo_iofd_setup(conn, fd, name, OSMO_IO_FD_MODE_READ_WRITE,
&srv_ioops, conn);
+ conn->iofd_read_cb = read_cb;
+ conn->closed_cb = closed_cb;
+ conn->data = data;
+
+ if (osmo_iofd_register(conn->iofd, fd) < 0) {
+ LOGP(DLINP, LOGL_ERROR, "could not register FD\n");
+ talloc_free(conn);
+ return NULL;
+ }
+ osmo_iofd_read_enable(conn->iofd);
+ return conn;
+}
+
/*! \brief Prepare to send out all pending messages on the connection's Tx queue
* and then automatically destroy the stream with osmo_stream_srv_destroy().
* This function disables queuing of new messages on the connection and also
@@ -1480,6 +1816,11 @@
return &conn->ofd;
}
+struct osmo_io_fd *
+osmo_stream_srv_get_iofd(struct osmo_stream_srv *conn)
+{
+ return conn->iofd;
+}
/*! \brief Get the master (Link) from a Stream Server
* \param[in] conn Stream Server of which we want to know the Link
* \returns Link through which the given Stream Server is established */
@@ -1496,12 +1837,19 @@
* \param[in] conn Stream Server to be destroyed */
void osmo_stream_srv_destroy(struct osmo_stream_srv *conn)
{
- osmo_fd_unregister(&conn->ofd);
- close(conn->ofd.fd);
- conn->ofd.fd = -1;
+ switch (conn->mode) {
+ case OSMO_STREAM_MODE_OSMO_FD:
+ osmo_fd_unregister(&conn->ofd);
+ close(conn->ofd.fd);
+ msgb_queue_free(&conn->tx_queue);
+ conn->ofd.fd = -1;
+ break;
+ case OSMO_STREAM_MODE_OSMO_IO:
+ osmo_iofd_free(conn->iofd);
+ break;
+ }
if (conn->closed_cb)
conn->closed_cb(conn);
- msgb_queue_free(&conn->tx_queue);
talloc_free(conn);
}
@@ -1518,8 +1866,15 @@
return;
}
- msgb_enqueue(&conn->tx_queue, msg);
- osmo_fd_write_enable(&conn->ofd);
+ switch (conn->mode) {
+ case OSMO_STREAM_MODE_OSMO_FD:
+ msgb_enqueue(&conn->tx_queue, msg);
+ osmo_fd_write_enable(&conn->ofd);
+ break;
+ case OSMO_STREAM_MODE_OSMO_IO:
+ osmo_iofd_write_msgb(conn->iofd, msg);
+ break;
+ }
}
#ifdef HAVE_LIBSCTP
--
To view, visit
https://gerrit.osmocom.org/c/libosmo-netif/+/33193
To unsubscribe, or for help writing mail filters, visit
https://gerrit.osmocom.org/settings
Gerrit-Project: libosmo-netif
Gerrit-Branch: master
Gerrit-Change-Id: I2f52c7107c392b6f4b0bf2a84f8c873c084a200c
Gerrit-Change-Number: 33193
Gerrit-PatchSet: 1
Gerrit-Owner: arehbein <arehbein(a)sysmocom.de>
Gerrit-CC: daniel <dwillmann(a)sysmocom.de>
Gerrit-MessageType: newchange