pespin has uploaded this change for review. ( https://gerrit.osmocom.org/c/libosmo-netif/+/38911?usp=email )
Change subject: stream_cli: Support destroy object within user callback ......................................................................
stream_cli: Support destroy object within user callback
read_cb(len=0) -> osmo_stream_cli_reconnect() -> osmo_stream_cli_close() -> disconnect_cb -> [user calls osmo_stream_cli_destroy()] -> [popped stack does read_cb()] -> [user uses freed msg]
Change-Id: I952938474fa2780bf3c906cbdffb2d024b03c1b7 --- M src/stream_cli.c 1 file changed, 174 insertions(+), 88 deletions(-)
git pull ssh://gerrit.osmocom.org:29418/libosmo-netif refs/changes/11/38911/1
diff --git a/src/stream_cli.c b/src/stream_cli.c index e70bc35..79d0e8d 100644 --- a/src/stream_cli.c +++ b/src/stream_cli.c @@ -83,6 +83,11 @@ #define OSMO_STREAM_CLI_F_RECONF (1 << 0) #define OSMO_STREAM_CLI_F_NODELAY (1 << 1)
+/* Mark whether the object is currently in user a callback. */ +#define IN_CB_MASK_CONNECT_CB (1 << 0) +#define IN_CB_MASK_DISCONNECT_CB (1 << 1) +#define IN_CB_MASK_READ_CB (1 << 2) + struct osmo_stream_cli { char *name; char sockname[OSMO_SOCK_NAME_MAXLEN]; @@ -114,31 +119,128 @@ int flags; int reconnect_timeout; struct osmo_sock_init2_multiaddr_pars ma_pars; + uint8_t in_cb_mask; /* IN_CB_MASK_* */ + bool delay_free; };
-void osmo_stream_cli_close(struct osmo_stream_cli *cli);
/*! \addtogroup stream_cli * @{ */
+/* return true if freed */ +static inline bool free_delayed_if_needed(struct osmo_stream_cli *cli) +{ + /* Nobody requested delayed free, skip */ + if (!cli->delay_free) + return false; + /* Check for other callbacks active in case we were e.g. in: + * read_cb() -> [user] -> osmo_steam_client_close() -> disconnect_cb() --> [user] --> osmo_stream_client_destroy() + * or: + * connect_cb() -> [user] -> osmo_steam_client_close() -> disconnect_cb() --> [user] --> osmo_stream_client_destroy() + */ + if (cli->in_cb_mask != 0) + return false; + + LOGSCLI(cli, LOGL_DEBUG, "free(delayed)\n"); + talloc_free(cli); + return true; +} + +static void stream_cli_close_iofd(struct osmo_stream_cli *cli) +{ + if (!cli->iofd) + return; + + osmo_iofd_free(cli->iofd); + cli->iofd = NULL; +} + +static void stream_cli_close_ofd(struct osmo_stream_cli *cli) +{ + if (cli->ofd.fd == -1) + return; + osmo_fd_unregister(&cli->ofd); + close(cli->ofd.fd); + cli->ofd.fd = -1; +} + +/*! Close an Osmocom Stream Client. + * \param[in] cli Osmocom Stream Client to be closed + * \return true if stream was freed due to disconnect_cb, false otherwise + * We unregister the socket fd from the osmocom select() loop + * abstraction and close the socket */ +static bool stream_cli_close(struct osmo_stream_cli *cli) +{ + int old_state = cli->state; + + /* This guards against reentrant close through disconnect_cb(): */ + if (cli->state == STREAM_CLI_STATE_CLOSED) + return false; + if (cli->state == STREAM_CLI_STATE_WAIT_RECONNECT) { + osmo_timer_del(&cli->timer); + cli->state = STREAM_CLI_STATE_CLOSED; + return false; + } + + + switch (cli->mode) { + case OSMO_STREAM_MODE_OSMO_FD: + stream_cli_close_ofd(cli); + break; + case OSMO_STREAM_MODE_OSMO_IO: + stream_cli_close_iofd(cli); + break; + default: + OSMO_ASSERT(false); + } + + cli->state = STREAM_CLI_STATE_CLOSED; + + /* If conn was established, notify the disconnection to the user: + * Also, if reconnect is disabled by user, notify the user that connect() failed: */ + if (old_state == STREAM_CLI_STATE_CONNECTED || + (old_state == STREAM_CLI_STATE_CONNECTING && cli->reconnect_timeout < 0)) { + LOGSCLI(cli, LOGL_DEBUG, "connection closed\n"); + cli->in_cb_mask |= IN_CB_MASK_DISCONNECT_CB; + if (cli->disconnect_cb) + cli->disconnect_cb(cli); + cli->in_cb_mask &= ~IN_CB_MASK_DISCONNECT_CB; + return free_delayed_if_needed(cli); + } + return false; +} + +/*! Re-connect an Osmocom Stream Client. + * If re-connection is enabled for this client + * (which is the case unless negative timeout was explicitly set via osmo_stream_cli_set_reconnect_timeout() call), + * we close any existing connection (if any) and schedule a re-connect timer */ +static bool stream_cli_reconnect(struct osmo_stream_cli *cli) +{ + bool freed = stream_cli_close(cli); + + if (freed) + return true; + + if (cli->reconnect_timeout < 0) { + LOGSCLI(cli, LOGL_INFO, "not reconnecting, disabled\n"); + return false; + } + + cli->state = STREAM_CLI_STATE_WAIT_RECONNECT; + LOGSCLI(cli, LOGL_INFO, "retrying reconnect in %d seconds...\n", + cli->reconnect_timeout); + osmo_timer_schedule(&cli->timer, cli->reconnect_timeout, 0); + return false; +} + /*! Re-connect an Osmocom Stream Client. * If re-connection is enabled for this client * (which is the case unless negative timeout was explicitly set via osmo_stream_cli_set_reconnect_timeout() call), * we close any existing connection (if any) and schedule a re-connect timer */ void osmo_stream_cli_reconnect(struct osmo_stream_cli *cli) { - osmo_stream_cli_close(cli); - - if (cli->reconnect_timeout < 0) { - LOGSCLI(cli, LOGL_INFO, "not reconnecting, disabled\n"); - return; - } - - cli->state = STREAM_CLI_STATE_WAIT_RECONNECT; - LOGSCLI(cli, LOGL_INFO, "retrying reconnect in %d seconds...\n", - cli->reconnect_timeout); - osmo_timer_schedule(&cli->timer, cli->reconnect_timeout, 0); + stream_cli_reconnect(cli); }
/*! Check if Osmocom Stream Client is in connected state. @@ -161,62 +263,13 @@ cli->state == STREAM_CLI_STATE_CONNECTED; }
-static void osmo_stream_cli_close_iofd(struct osmo_stream_cli *cli) -{ - if (!cli->iofd) - return; - - osmo_iofd_free(cli->iofd); - cli->iofd = NULL; -} - -static void osmo_stream_cli_close_ofd(struct osmo_stream_cli *cli) -{ - if (cli->ofd.fd == -1) - return; - osmo_fd_unregister(&cli->ofd); - close(cli->ofd.fd); - cli->ofd.fd = -1; -} - /*! Close an Osmocom Stream Client. * \param[in] cli Osmocom Stream Client to be closed * We unregister the socket fd from the osmocom select() loop * abstraction and close the socket */ void osmo_stream_cli_close(struct osmo_stream_cli *cli) { - int old_state = cli->state; - - if (cli->state == STREAM_CLI_STATE_CLOSED) - return; - if (cli->state == STREAM_CLI_STATE_WAIT_RECONNECT) { - osmo_timer_del(&cli->timer); - cli->state = STREAM_CLI_STATE_CLOSED; - return; - } - - - switch (cli->mode) { - case OSMO_STREAM_MODE_OSMO_FD: - osmo_stream_cli_close_ofd(cli); - break; - case OSMO_STREAM_MODE_OSMO_IO: - osmo_stream_cli_close_iofd(cli); - break; - default: - OSMO_ASSERT(false); - } - - cli->state = STREAM_CLI_STATE_CLOSED; - - /* If conn was established, notify the disconnection to the user: - * Also, if reconnect is disabled by user, notify the user that connect() failed: */ - if (old_state == STREAM_CLI_STATE_CONNECTED || - (old_state == STREAM_CLI_STATE_CONNECTING && cli->reconnect_timeout < 0)) { - LOGSCLI(cli, LOGL_DEBUG, "connection closed\n"); - if (cli->disconnect_cb) - cli->disconnect_cb(cli); - } + stream_cli_close(cli); }
/*! Retrieve file descriptor of the stream client socket. @@ -249,15 +302,19 @@ return cli->iofd; }
-static void osmo_stream_cli_read(struct osmo_stream_cli *cli) +/* Return true if read_cb caused a delayed_free, hence cli not available anymore. */ +static bool stream_cli_read(struct osmo_stream_cli *cli) { LOGSCLI(cli, LOGL_DEBUG, "message received\n");
+ cli->in_cb_mask |= IN_CB_MASK_READ_CB; if (cli->read_cb) cli->read_cb(cli); + cli->in_cb_mask &= ~IN_CB_MASK_READ_CB; + return free_delayed_if_needed(cli); }
-static int osmo_stream_cli_write(struct osmo_stream_cli *cli) +static int stream_cli_write(struct osmo_stream_cli *cli) { #ifdef HAVE_LIBSCTP struct sctp_sndrcvinfo sinfo; @@ -323,7 +380,7 @@ return 0; } msgb_free(msg); - osmo_stream_cli_reconnect(cli); + stream_cli_reconnect(cli); return 0; }
@@ -359,13 +416,13 @@
if (ret < 0) { LOGSCLI(cli, LOGL_ERROR, "connect failed (%d)\n", res); - osmo_stream_cli_reconnect(cli); + stream_cli_reconnect(cli); return; } ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len); if (ret >= 0 && error > 0) { LOGSCLI(cli, LOGL_ERROR, "connect so_error (%d)\n", error); - osmo_stream_cli_reconnect(cli); + stream_cli_reconnect(cli); return; }
@@ -394,8 +451,11 @@ default: break; } + cli->in_cb_mask |= IN_CB_MASK_CONNECT_CB; if (cli->connect_cb) cli->connect_cb(cli); + cli->in_cb_mask &= ~IN_CB_MASK_CONNECT_CB; + free_delayed_if_needed(cli); }
static int osmo_stream_cli_fd_cb(struct osmo_fd *ofd, unsigned int what) @@ -409,11 +469,12 @@ case STREAM_CLI_STATE_CONNECTED: if (what & OSMO_FD_READ) { LOGSCLI(cli, LOGL_DEBUG, "connected read\n"); - osmo_stream_cli_read(cli); + if (stream_cli_read(cli) == true) + break; /* cli (and hence ofd) freed, done. */ } if (what & OSMO_FD_WRITE) { LOGSCLI(cli, LOGL_DEBUG, "connected write\n"); - osmo_stream_cli_write(cli); + stream_cli_write(cli); } break; default: @@ -459,6 +520,7 @@ static void stream_cli_iofd_read_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg) { struct osmo_stream_cli *cli = osmo_iofd_get_data(iofd); + bool freed;
switch (cli->state) { case STREAM_CLI_STATE_CONNECTING: @@ -470,21 +532,29 @@ case -EPIPE: case -ECONNRESET: LOGSCLI(cli, LOGL_ERROR, "lost connection with srv (%d)\n", res); - osmo_stream_cli_reconnect(cli); + freed = stream_cli_reconnect(cli); break; case 0: LOGSCLI(cli, LOGL_NOTICE, "connection closed with srv\n"); - osmo_stream_cli_reconnect(cli); + freed = stream_cli_reconnect(cli); break; default: LOGSCLI(cli, LOGL_DEBUG, "received %d bytes from srv\n", res); + freed = false; break; } + if (freed) + return; /* msg was also freed as part of the talloc tree. */ /* Notify user of new data or error: */ - if (cli->iofd_read_cb) - cli->iofd_read_cb(cli, res, msg); - else + if (!cli->iofd_read_cb) { msgb_free(msg); + return; + } + cli->in_cb_mask |= IN_CB_MASK_READ_CB; + cli->iofd_read_cb(cli, res, msg); + cli->in_cb_mask &= ~IN_CB_MASK_READ_CB; + OSMO_ASSERT(cli->in_cb_mask == 0); + free_delayed_if_needed(cli); break; default: osmo_panic("%s() called with unexpected state %d\n", __func__, cli->state); @@ -501,8 +571,8 @@ break; case STREAM_CLI_STATE_CONNECTED: if (msg && res <= 0) { - osmo_stream_cli_reconnect(cli); LOGSCLI(cli, LOGL_ERROR, "received error %d in response to send\n", res); + stream_cli_reconnect(cli); } break; default: @@ -521,6 +591,7 @@ static void stream_cli_iofd_recvmsg_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg, const struct msghdr *msgh) { struct osmo_stream_cli *cli = osmo_iofd_get_data(iofd); + bool freed;
res = stream_iofd_sctp_recvmsg_trailer(iofd, msg, res, msgh);
@@ -534,20 +605,28 @@ case -EPIPE: case -ECONNRESET: LOGSCLI(cli, LOGL_ERROR, "lost connection with srv (%d)\n", res); - osmo_stream_cli_reconnect(cli); + freed = stream_cli_reconnect(cli); break; case 0: LOGSCLI(cli, LOGL_NOTICE, "connection closed with srv\n"); - osmo_stream_cli_reconnect(cli); + freed = stream_cli_reconnect(cli); break; default: + freed = 0; break; } + if (freed) + return; /* msg was also freed as part of the talloc tree. */ /* Notify user of new data or error: */ - if (cli->iofd_read_cb) - cli->iofd_read_cb(cli, res, msg); - else + if (!cli->iofd_read_cb) { msgb_free(msg); + return; + } + cli->in_cb_mask |= IN_CB_MASK_READ_CB; + cli->iofd_read_cb(cli, res, msg); + cli->in_cb_mask &= ~IN_CB_MASK_READ_CB; + OSMO_ASSERT(cli->in_cb_mask == 0); + free_delayed_if_needed(cli); break; default: osmo_panic("%s() called with unexpected state %d\n", __func__, cli->state); @@ -863,10 +942,17 @@ void osmo_stream_cli_destroy(struct osmo_stream_cli *cli) { LOGSCLI(cli, LOGL_DEBUG, "destroy()\n"); - osmo_stream_cli_close(cli); + OSMO_ASSERT(!stream_cli_close(cli)); /* BUG IN APP, TRYING TO DESTROY() WHILE ALREADY IN DESTROY()*/ osmo_timer_del(&cli->timer); msgb_queue_free(&cli->tx_queue); - talloc_free(cli); + /* if we are in a user callback, delay freeing. */ + if (cli->in_cb_mask != 0) { + LOGSCLI(cli, LOGL_DEBUG, "delay free() in_cb_mask=0x%02x\n", cli->in_cb_mask); + cli->delay_free = true; + } else { + LOGSCLI(cli, LOGL_DEBUG, "free(destroy)\n"); + talloc_free(cli); + } }
/*! DEPRECATED: use osmo_stream_cli_set_reconnect_timeout() or osmo_stream_cli_reconnect() instead! @@ -881,7 +967,7 @@
/* we are reconfiguring this socket, close existing first. */ if ((cli->flags & OSMO_STREAM_CLI_F_RECONF) && cli->ofd.fd >= 0) - osmo_stream_cli_close(cli); + stream_cli_close(cli);
cli->flags &= ~OSMO_STREAM_CLI_F_RECONF;
@@ -907,7 +993,7 @@ if (ret < 0) { LOGSCLI(cli, LOGL_ERROR, "connect: socket creation error (%d)\n", ret); if (reconnect) - osmo_stream_cli_reconnect(cli); + stream_cli_reconnect(cli); return ret; } osmo_fd_setup(&cli->ofd, ret, OSMO_FD_READ | OSMO_FD_WRITE, osmo_stream_cli_fd_cb, cli, 0); @@ -1035,7 +1121,7 @@
/* we are reconfiguring this socket, close existing first. */ if ((cli->flags & OSMO_STREAM_CLI_F_RECONF) && osmo_stream_cli_get_fd(cli) >= 0) - osmo_stream_cli_close(cli); + stream_cli_close(cli);
cli->flags &= ~OSMO_STREAM_CLI_F_RECONF;
@@ -1078,7 +1164,7 @@
if (ret < 0) { LOGSCLI(cli, LOGL_ERROR, "connect: socket creation error (%d)\n", ret); - osmo_stream_cli_reconnect(cli); + stream_cli_reconnect(cli); return ret; }
@@ -1098,7 +1184,7 @@ break; case OSMO_STREAM_MODE_OSMO_IO: /* Be sure that previous osmo_io instance is freed before creating a new one. */ - osmo_stream_cli_close_iofd(cli); + stream_cli_close_iofd(cli); #ifdef HAVE_LIBSCTP if (cli->proto == IPPROTO_SCTP) { cli->iofd = osmo_iofd_setup(cli, fd, cli->name, OSMO_IO_FD_MODE_RECVMSG_SENDMSG, @@ -1242,7 +1328,7 @@ LOGSCLI(cli, LOGL_ERROR, "lost connection with srv (%d)\n", errno); else LOGSCLI(cli, LOGL_ERROR, "recv failed (%d)\n", errno); - osmo_stream_cli_reconnect(cli); + stream_cli_reconnect(cli); return ret; } else if (ret == 0) { LOGSCLI(cli, LOGL_ERROR, "connection closed with srv\n");