arehbein has uploaded this change for review. ( https://gerrit.osmocom.org/c/libosmo-netif/+/33193 )
Change subject: Add osmo_io support to osmo_stream_cli and osmo_stream_srv ......................................................................
Add osmo_io support to osmo_stream_cli and osmo_stream_srv
Change-Id: I2f52c7107c392b6f4b0bf2a84f8c873c084a200c --- M include/osmocom/netif/stream.h M src/stream.c 2 files changed, 403 insertions(+), 36 deletions(-)
git pull ssh://gerrit.osmocom.org:29418/libosmo-netif refs/changes/93/33193/1
diff --git a/include/osmocom/netif/stream.h b/include/osmocom/netif/stream.h index a52e7c4..0e983a4 100644 --- a/include/osmocom/netif/stream.h +++ b/include/osmocom/netif/stream.h @@ -45,6 +45,7 @@ struct osmo_stream_srv;
struct osmo_stream_srv *osmo_stream_srv_create(void *ctx, struct osmo_stream_srv_link *link, int fd, int (*cb)(struct osmo_stream_srv *conn), int (*closed_cb)(struct osmo_stream_srv *conn), void *data); +struct osmo_stream_srv *osmo_stream_srv_create_iofd(void *ctx, const char *name, struct osmo_stream_srv_link *link, int fd, int (*read_cb)(struct osmo_stream_srv *conn, struct msgb *msg), int (*closed_cb)(struct osmo_stream_srv *conn), void *data); void *osmo_stream_srv_get_data(struct osmo_stream_srv *conn); struct osmo_stream_srv_link *osmo_stream_srv_get_master(struct osmo_stream_srv *conn); struct osmo_fd *osmo_stream_srv_get_ofd(struct osmo_stream_srv *srv); @@ -79,10 +80,12 @@ void osmo_stream_cli_set_connect_cb(struct osmo_stream_cli *cli, int (*connect_cb)(struct osmo_stream_cli *cli)); void osmo_stream_cli_set_disconnect_cb(struct osmo_stream_cli *cli, int (*disconnect_cb)(struct osmo_stream_cli *cli)); void osmo_stream_cli_set_read_cb(struct osmo_stream_cli *cli, int (*read_cb)(struct osmo_stream_cli *cli)); +void osmo_stream_cli_set_iofd_read_cb(struct osmo_stream_cli *cli, int (*read_cb)(struct osmo_stream_cli *cli, struct msgb* msg)); void osmo_stream_cli_reconnect(struct osmo_stream_cli *cli); bool osmo_stream_cli_is_connected(struct osmo_stream_cli *cli);
struct osmo_stream_cli *osmo_stream_cli_create(void *ctx); +struct osmo_stream_cli *osmo_stream_cli_create_iofd(void *ctx, const char *name); void osmo_stream_cli_destroy(struct osmo_stream_cli *cli);
int osmo_stream_cli_open(struct osmo_stream_cli *cli); diff --git a/src/stream.c b/src/stream.c index 6afb7ca..6388928 100644 --- a/src/stream.c +++ b/src/stream.c @@ -37,6 +37,8 @@ #include <osmocom/core/utils.h> #include <osmocom/gsm/tlv.h> #include <osmocom/core/msgb.h> +#include <osmocom/core/osmo_io.h> +#include <osmocom/core/panic.h> #include <osmocom/core/logging.h> #include <osmocom/core/talloc.h> #include <osmocom/core/socket.h> @@ -247,8 +249,17 @@ #define OSMO_STREAM_MAX_ADDRS 1 #endif
+enum osmo_stream_mode { + OSMO_STREAM_MODE_OSMO_FD, + OSMO_STREAM_MODE_OSMO_IO, +}; + struct osmo_stream_cli { - struct osmo_fd ofd; + enum osmo_stream_mode mode; + union { + struct osmo_fd ofd; + struct osmo_io_fd *iofd; + }; struct llist_head tx_queue; struct osmo_timer_list timer; enum osmo_stream_cli_state state; @@ -264,6 +275,7 @@ int (*connect_cb)(struct osmo_stream_cli *cli); int (*disconnect_cb)(struct osmo_stream_cli *cli); int (*read_cb)(struct osmo_stream_cli *cli); + int (*iofd_read_cb)(struct osmo_stream_cli *cli, struct msgb *msg); int (*write_cb)(struct osmo_stream_cli *cli); void *data; int flags; @@ -300,17 +312,40 @@ return cli->state == STREAM_CLI_STATE_CONNECTED; }
-/*! \brief Close an Osmocom Stream Client - * \param[in] cli Osmocom Stream Client to be closed - * We unregister the socket fd from the osmocom select() loop - * abstraction and close the socket */ -void osmo_stream_cli_close(struct osmo_stream_cli *cli) +static void osmo_stream_cli_close_iofd(struct osmo_stream_cli *cli) +{ + if (!cli->iofd) + return; + + osmo_iofd_close(cli->iofd); +} + +static void osmo_stream_cli_close_ofd(struct osmo_stream_cli *cli) { if (cli->ofd.fd == -1) return; osmo_fd_unregister(&cli->ofd); close(cli->ofd.fd); cli->ofd.fd = -1; +} + +/*! \brief Close an Osmocom Stream Client + * \param[in] cli Osmocom Stream Client to be closed + * We unregister the socket fd from the osmocom select() loop + * abstraction and close the socket */ +void osmo_stream_cli_close(struct osmo_stream_cli *cli) +{ + if (cli->state == STREAM_CLI_STATE_CLOSED) + return; + + switch (cli->mode) { + case OSMO_STREAM_MODE_OSMO_FD: + osmo_stream_cli_close_ofd(cli); + break; + case OSMO_STREAM_MODE_OSMO_IO: + osmo_stream_cli_close_iofd(cli); + break; + }
if (cli->state == STREAM_CLI_STATE_CONNECTED) { LOGSCLI(cli, LOGL_DEBUG, "connection closed\n"); @@ -323,7 +358,14 @@
static inline int osmo_stream_cli_fd(const struct osmo_stream_cli *cli) { - return cli->ofd.fd; + switch (cli->mode) { + case OSMO_STREAM_MODE_OSMO_FD: + return cli->ofd.fd; + case OSMO_STREAM_MODE_OSMO_IO: + return osmo_iofd_get_fd(cli->iofd); + default: + return -ENOTSUP; + } }
static void osmo_stream_cli_read(struct osmo_stream_cli *cli) @@ -331,7 +373,7 @@ LOGSCLI(cli, LOGL_DEBUG, "message received\n");
if (cli->read_cb) - cli->read_cb(cli); + cli->read_cb(cli); // TODO: Return value? }
static int osmo_stream_cli_write(struct osmo_stream_cli *cli) @@ -457,12 +499,12 @@ break; } if (cli->connect_cb) - cli->connect_cb(cli); + cli->connect_cb(cli); // TODO: Check return value? break; case STREAM_CLI_STATE_CONNECTED: if (what & OSMO_FD_READ) { LOGSCLI(cli, LOGL_DEBUG, "connected read\n"); - osmo_stream_cli_read(cli); + osmo_stream_cli_read(cli); // TODO: Check return value and return early? } if (what & OSMO_FD_WRITE) { LOGSCLI(cli, LOGL_DEBUG, "connected write\n"); @@ -492,6 +534,7 @@ if (!cli) return NULL;
+ cli->mode = OSMO_STREAM_MODE_OSMO_FD; cli->sk_domain = AF_UNSPEC; cli->sk_type = SOCK_STREAM; cli->proto = IPPROTO_TCP; @@ -507,6 +550,146 @@ return cli; }
+static inline void osmo_stream_cli_handle_connecting(struct osmo_stream_cli *cli, int fd) +{ + int ret, error; + socklen_t len = sizeof(error); + + ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len); + if (ret >= 0 && error > 0) { + osmo_stream_cli_reconnect(cli); + return; + } + + LOGSCLI(cli, LOGL_DEBUG, "connection done.\n"); + cli->state = STREAM_CLI_STATE_CONNECTED; + switch (cli->sk_domain) { + case AF_UNIX: + _setsockopt_nosigpipe(cli); + break; + case AF_UNSPEC: + case AF_INET: + case AF_INET6: + if (cli->proto == IPPROTO_SCTP) { + _setsockopt_nosigpipe(cli); + sctp_sock_activate_events(fd); + } + break; + default: + break; + } + if (cli->connect_cb) + cli->connect_cb(cli); +} + +static void handle_connecting(struct osmo_stream_cli *cli, int res) +{ + int error, ret = res; + socklen_t len = sizeof(error); + + int fd = osmo_stream_cli_fd(cli); + + if (ret < 0) { + osmo_stream_cli_reconnect(cli); + return; + } + ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len); + if (ret >= 0 && error > 0) { + osmo_stream_cli_reconnect(cli); + return; + } + + LOGSCLI(cli, LOGL_DEBUG, "connection done.\n"); + cli->state = STREAM_CLI_STATE_CONNECTED; + switch (cli->sk_domain) { + case AF_UNIX: + _setsockopt_nosigpipe(cli); + break; + case AF_UNSPEC: + case AF_INET: + case AF_INET6: + if (cli->proto == IPPROTO_SCTP) { + //FIXME!! + _setsockopt_nosigpipe(cli); + sctp_sock_activate_events(fd); + } + break; + default: + break; + } + if (cli->connect_cb) + cli->connect_cb(cli); +} + +static void stream_cli_iofd_read_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg) +{ + struct osmo_stream_cli *cli = osmo_iofd_get_data(iofd); + + switch (cli->state) { + case STREAM_CLI_STATE_CONNECTING: + handle_connecting(cli, res); + break; + case STREAM_CLI_STATE_CONNECTED: + if (res == 0) + osmo_stream_cli_reconnect(cli); + else if (cli->iofd_read_cb) + cli->iofd_read_cb(cli, msg); + break; + default: + osmo_panic("osmo_stream_cli_write_cb() called with unexpected state %d\n", cli->state); + } +} + +static void stream_cli_iofd_write_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg) +{ + struct osmo_stream_cli *cli = osmo_iofd_get_data(iofd); + + switch (cli->state) { + case STREAM_CLI_STATE_CONNECTING: + handle_connecting(cli, res); + break; + case STREAM_CLI_STATE_CONNECTED: + if (msg && res <= 0) { + osmo_stream_cli_reconnect(cli); + LOGSCLI(cli, LOGL_ERROR, "error %d to send\n", res); + } + break; + default: + osmo_panic("osmo_stream_cli_write_cb() called with unexpected state %d\n", cli->state); + } +} + +static struct osmo_io_ops osmo_stream_cli_ioops = { + .read_cb = stream_cli_iofd_read_cb, + .write_cb = stream_cli_iofd_write_cb, + + .segmentation_cb = NULL, +}; + + +struct osmo_stream_cli *osmo_stream_cli_create_iofd(void *ctx, const char *name) +{ + struct osmo_stream_cli *cli; + + cli = talloc_zero(ctx, struct osmo_stream_cli); + if (!cli) + return NULL; + + cli->mode = OSMO_STREAM_MODE_OSMO_IO; + cli->sk_domain = AF_UNSPEC; + cli->sk_type = SOCK_STREAM; + cli->proto = IPPROTO_TCP; + + cli->iofd = osmo_iofd_setup(ctx, -1, name, OSMO_IO_FD_MODE_READ_WRITE, &osmo_stream_cli_ioops, cli); + + cli->state = STREAM_CLI_STATE_CLOSED; + osmo_timer_setup(&cli->timer, cli_timer_cb, cli); + cli->reconnect_timeout = 5; /* default is 5 seconds. */ + INIT_LLIST_HEAD(&cli->tx_queue); + + return cli; +} + /*! \brief Set the remote address to which we connect * \param[in] cli Stream Client to modify * \param[in] addr Remote IP address @@ -694,9 +877,20 @@ struct osmo_fd * osmo_stream_cli_get_ofd(struct osmo_stream_cli *cli) { + OSMO_ASSERT(cli->mode == OSMO_STREAM_MODE_OSMO_FD); return &cli->ofd; }
+/*! \brief Get OsmoIO File Descriptor of the stream client socket + * \param[in] cli Stream Client to modify + * \returns Pointer to \ref osmo_iofd */ +struct osmo_io_fd * +osmo_stream_cli_get_iofd(struct osmo_stream_cli *cli) +{ + OSMO_ASSERT(cli->mode == OSMO_STREAM_MODE_OSMO_IO); + return cli->iofd; +} + /*! \brief Set the call-back function called on connect of the stream client socket * \param[in] cli Stream Client to modify * \param[in] connect_cb Call-back function to be called upon connect */ @@ -717,15 +911,29 @@ }
/*! \brief Set the call-back function called to read from the stream client socket + * Only for osmo_stream_cli created with osmo_stream_cli_create() * \param[in] cli Stream Client to modify * \param[in] read_cb Call-back function to be called when we want to read */ void osmo_stream_cli_set_read_cb(struct osmo_stream_cli *cli, int (*read_cb)(struct osmo_stream_cli *cli)) { + OSMO_ASSERT(cli->mode == OSMO_STREAM_MODE_OSMO_FD); cli->read_cb = read_cb; }
+/*! \brief Set the call-back function called to read from the stream client socket + * Only for oamo_stream_cli created with osmo_stream_cli_create_iofd() + * \param[in] cli Stream Client to modify + * \param[in] read_cb Call-back function to be called when we want to read */ +void +osmo_stream_cli_set_iofd_read_cb(struct osmo_stream_cli *cli, + int (*read_cb)(struct osmo_stream_cli *cli, struct msgb* msg)) +{ + OSMO_ASSERT(cli->mode == OSMO_STREAM_MODE_OSMO_IO); + cli->iofd_read_cb = read_cb; +} + /*! \brief Destroy a Osmocom stream client (includes close) * \param[in] cli Stream Client to destroy */ void osmo_stream_cli_destroy(struct osmo_stream_cli *cli) @@ -864,9 +1072,21 @@ goto error_close_socket; }
- osmo_fd_setup(&cli->ofd, fd, OSMO_FD_READ | OSMO_FD_WRITE, cli->ofd.cb, cli->ofd.data, cli->ofd.priv_nr); - if (osmo_fd_register(&cli->ofd) < 0) - goto error_close_socket; + switch (cli->mode) { + case OSMO_STREAM_MODE_OSMO_FD: + osmo_fd_setup(&cli->ofd, fd, OSMO_FD_READ | OSMO_FD_WRITE, cli->ofd.cb, cli->ofd.data, cli->ofd.priv_nr); + if (osmo_fd_register(&cli->ofd) < 0) + goto error_close_socket; + break; + case OSMO_STREAM_MODE_OSMO_IO: + if (osmo_iofd_register(cli->iofd, fd) < 0) + goto error_close_socket; + osmo_iofd_read_enable(cli->iofd); + osmo_iofd_write_enable(cli->iofd); + break; + default: + OSMO_ASSERT(false); + }
cli->state = STREAM_CLI_STATE_CONNECTING; return 0; @@ -874,7 +1094,7 @@ error_close_socket: cli->state = STREAM_CLI_STATE_CLOSED; close(cli->ofd.fd); - cli->ofd.fd = -1; + cli->ofd.fd = -1; // XXX return -EIO; }
@@ -893,8 +1113,17 @@ { OSMO_ASSERT(cli); OSMO_ASSERT(msg); - msgb_enqueue(&cli->tx_queue, msg); - osmo_fd_write_enable(&cli->ofd); + + switch (cli->mode) { + case OSMO_STREAM_MODE_OSMO_FD: + msgb_enqueue(&cli->tx_queue, msg); + osmo_fd_write_enable(&cli->ofd); + break; + case OSMO_STREAM_MODE_OSMO_IO: + osmo_iofd_write_msgb(cli->iofd, msg); + osmo_iofd_write_enable(cli->iofd); + break; + } }
/*! \brief Receive data via an Osmocom stream client @@ -926,11 +1155,19 @@
void osmo_stream_cli_clear_tx_queue(struct osmo_stream_cli *cli) { - msgb_queue_free(&cli->tx_queue); - /* If in state 'connecting', keep WRITE flag up to receive - * socket connection signal and then transition to STATE_CONNECTED: */ - if (cli->state == STREAM_CLI_STATE_CONNECTED) - osmo_fd_write_disable(&cli->ofd); + switch (cli->mode) { + case OSMO_STREAM_MODE_OSMO_FD: + msgb_queue_free(&cli->tx_queue); + /* If in state 'connecting', keep WRITE flag up to receive + * socket connection signal and then transition to STATE_CONNECTED: */ + if (cli->state == STREAM_CLI_STATE_CONNECTED) + osmo_fd_write_disable(&cli->ofd); + break; + case OSMO_STREAM_MODE_OSMO_IO: + osmo_iofd_txqueue_clear(cli->iofd); + osmo_iofd_write_disable(cli->iofd); + break; + } }
/* @@ -941,7 +1178,11 @@ #define OSMO_STREAM_SRV_F_NODELAY (1 << 1)
struct osmo_stream_srv_link { - struct osmo_fd ofd; + enum osmo_stream_mode mode; + union { + struct osmo_fd ofd; + struct osmo_io_fd *iofd; + }; char *addr[OSMO_STREAM_MAX_ADDRS]; uint8_t addrcnt; uint16_t port; @@ -953,7 +1194,7 @@ int flags; };
-static int osmo_stream_srv_fd_cb(struct osmo_fd *ofd, unsigned int what) +static int osmo_stream_srv_ofd_cb(struct osmo_fd *ofd, unsigned int what) { int ret; int sock_fd; @@ -1038,7 +1279,23 @@ link->sk_domain = AF_UNSPEC; link->sk_type = SOCK_STREAM; link->proto = IPPROTO_TCP; - osmo_fd_setup(&link->ofd, -1, OSMO_FD_READ | OSMO_FD_WRITE, osmo_stream_srv_fd_cb, link, 0); + osmo_fd_setup(&link->ofd, -1, OSMO_FD_READ | OSMO_FD_WRITE, osmo_stream_srv_ofd_cb, link, 0); + + return link; +} + +struct osmo_stream_srv_link *osmo_stream_srv_link_create_iofd(void *ctx, const char *name) +{ + struct osmo_stream_srv_link *link; + + link = talloc_zero(ctx, struct osmo_stream_srv_link); + if (!link) + return NULL; + + link->sk_domain = AF_UNSPEC; + link->sk_type = SOCK_STREAM; + link->proto = IPPROTO_TCP; + osmo_fd_setup(&link->ofd, -1, OSMO_FD_READ | OSMO_FD_WRITE, osmo_stream_srv_ofd_cb, link, 0);
return link; } @@ -1304,10 +1561,15 @@
struct osmo_stream_srv { struct osmo_stream_srv_link *srv; - struct osmo_fd ofd; + enum osmo_stream_mode mode; + union { + struct osmo_fd ofd; + struct osmo_io_fd *iofd; + }; struct llist_head tx_queue; int (*closed_cb)(struct osmo_stream_srv *peer); - int (*cb)(struct osmo_stream_srv *peer); + int (*read_cb)(struct osmo_stream_srv *peer); + int (*iofd_read_cb)(struct osmo_stream_srv *peer, struct msgb *msg); void *data; int flags; }; @@ -1323,8 +1585,8 @@ return 0; }
- if (conn->cb) - rc = conn->cb(conn); + if (conn->read_cb) + rc = conn->read_cb(conn);
return rc; } @@ -1415,12 +1677,13 @@ struct osmo_stream_srv * osmo_stream_srv_create(void *ctx, struct osmo_stream_srv_link *link, int fd, - int (*cb)(struct osmo_stream_srv *conn), + int (*read_cb)(struct osmo_stream_srv *conn), int (*closed_cb)(struct osmo_stream_srv *conn), void *data) { struct osmo_stream_srv *conn;
OSMO_ASSERT(link); + OSMO_ASSERT(link->mode == OSMO_STREAM_MODE_OSMO_FD);
conn = talloc_zero(ctx, struct osmo_stream_srv); if (conn == NULL) { @@ -1430,7 +1693,7 @@ } conn->srv = link; osmo_fd_setup(&conn->ofd, fd, OSMO_FD_READ, osmo_stream_srv_cb, conn, 0); - conn->cb = cb; + conn->read_cb = read_cb; conn->closed_cb = closed_cb; conn->data = data; INIT_LLIST_HEAD(&conn->tx_queue); @@ -1443,6 +1706,79 @@ return conn; }
+static void stream_srv_iofd_read_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg) +{ + struct osmo_stream_srv *conn = osmo_iofd_get_data(iofd); + LOGP(DLINP, LOGL_DEBUG, "message received (res=%d)\n", res); + + if (conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY) { + LOGP(DLINP, LOGL_DEBUG, "Connection is being flushed and closed; ignoring received message\n"); + msgb_free(msg); + return; + } + + if (res <= 0) { + osmo_stream_srv_set_flush_and_destroy(conn); + if (osmo_iofd_txqueue_len(iofd) == 0) + osmo_stream_srv_destroy(conn); + } else if (conn->iofd_read_cb) { + conn->iofd_read_cb(conn, msg); // TODO: Handle return value? + } + + return; +} + +static void stream_srv_iofd_write_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg) +{ + struct osmo_stream_srv *conn = osmo_iofd_get_data(iofd); + LOGP(DLINP, LOGL_DEBUG, "connected write\n"); + + if (res == -1) + LOGP(DLINP, LOGL_ERROR, "error to send: %s\n", strerror(errno)); + + if (osmo_iofd_txqueue_len(iofd) == 0) + if (conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY) + osmo_stream_srv_destroy(conn); +} + +static struct osmo_io_ops srv_ioops = { + .read_cb = stream_srv_iofd_read_cb, + .write_cb = stream_srv_iofd_write_cb, +}; + +struct osmo_stream_srv * +osmo_stream_srv_create_iofd(void *ctx, const char *name, + struct osmo_stream_srv_link *link, int fd, + int (*read_cb)(struct osmo_stream_srv *conn, struct msgb *msg), + int (*closed_cb)(struct osmo_stream_srv *conn), void *data) +{ + struct osmo_stream_srv *conn; + + OSMO_ASSERT(link); + //OSMO_ASSERT(link->mode == OSMO_STREAM_MODE_OSMO_IO); + + conn = talloc_zero(ctx, struct osmo_stream_srv); + if (conn == NULL) { + LOGP(DLINP, LOGL_ERROR, "cannot allocate new peer in srv, " + "reason=`%s'\n", strerror(errno)); + return NULL; + } + conn->mode = OSMO_STREAM_MODE_OSMO_IO; + conn->srv = link; + conn->iofd = osmo_iofd_setup(conn, fd, name, OSMO_IO_FD_MODE_READ_WRITE, &srv_ioops, conn); + conn->iofd_read_cb = read_cb; + conn->closed_cb = closed_cb; + conn->data = data; + + if (osmo_iofd_register(conn->iofd, fd) < 0) { + LOGP(DLINP, LOGL_ERROR, "could not register FD\n"); + talloc_free(conn); + return NULL; + } + osmo_iofd_read_enable(conn->iofd); + return conn; +} + /*! \brief Prepare to send out all pending messages on the connection's Tx queue * and then automatically destroy the stream with osmo_stream_srv_destroy(). * This function disables queuing of new messages on the connection and also @@ -1480,6 +1816,11 @@ return &conn->ofd; }
+struct osmo_io_fd * +osmo_stream_srv_get_iofd(struct osmo_stream_srv *conn) +{ + return conn->iofd; +} /*! \brief Get the master (Link) from a Stream Server * \param[in] conn Stream Server of which we want to know the Link * \returns Link through which the given Stream Server is established */ @@ -1496,12 +1837,19 @@ * \param[in] conn Stream Server to be destroyed */ void osmo_stream_srv_destroy(struct osmo_stream_srv *conn) { - osmo_fd_unregister(&conn->ofd); - close(conn->ofd.fd); - conn->ofd.fd = -1; + switch (conn->mode) { + case OSMO_STREAM_MODE_OSMO_FD: + osmo_fd_unregister(&conn->ofd); + close(conn->ofd.fd); + msgb_queue_free(&conn->tx_queue); + conn->ofd.fd = -1; + break; + case OSMO_STREAM_MODE_OSMO_IO: + osmo_iofd_free(conn->iofd); + break; + } if (conn->closed_cb) conn->closed_cb(conn); - msgb_queue_free(&conn->tx_queue); talloc_free(conn); }
@@ -1518,8 +1866,15 @@ return; }
- msgb_enqueue(&conn->tx_queue, msg); - osmo_fd_write_enable(&conn->ofd); + switch (conn->mode) { + case OSMO_STREAM_MODE_OSMO_FD: + msgb_enqueue(&conn->tx_queue, msg); + osmo_fd_write_enable(&conn->ofd); + break; + case OSMO_STREAM_MODE_OSMO_IO: + osmo_iofd_write_msgb(conn->iofd, msg); + break; + } }
#ifdef HAVE_LIBSCTP