pespin has submitted this change. ( https://gerrit.osmocom.org/c/libosmo-netif/+/38911?usp=email )
Change subject: stream_cli: Support destroy object within user callback ......................................................................
stream_cli: Support destroy object within user callback
This commit improves the code in stream_cli to allow the user calling osmo_stream_cli_destroy() while in a callback.
For instance, the following scenario: read_cb(len=0) osmo_stream_cli_reconnect() osmo_stream_cli_close() disconnect_cb [user calls osmo_stream_cli_destroy()] [popped stack does read_cb()] [user uses freed msg]
This allows using stream_cli by users handling destruction and reconnect of stream_ci objects on their own, such as libosmo-abis e1_input/sign_link stack.
Change-Id: I952938474fa2780bf3c906cbdffb2d024b03c1b7 --- M src/stream_cli.c M tests/stream/stream_test.err 2 files changed, 149 insertions(+), 45 deletions(-)
Approvals: daniel: Looks good to me, approved Jenkins Builder: Verified laforge: Looks good to me, but someone else must approve
diff --git a/src/stream_cli.c b/src/stream_cli.c index 9fc8101..88d5f9e 100644 --- a/src/stream_cli.c +++ b/src/stream_cli.c @@ -83,6 +83,11 @@ #define OSMO_STREAM_CLI_F_RECONF (1 << 0) #define OSMO_STREAM_CLI_F_NODELAY (1 << 1)
+/* Mark whether the object is currently in a user callback. */ +#define IN_CB_MASK_CONNECT_CB (1 << 0) +#define IN_CB_MASK_DISCONNECT_CB (1 << 1) +#define IN_CB_MASK_READ_CB (1 << 2) + struct osmo_stream_cli { char *name; char sockname[OSMO_SOCK_NAME_MAXLEN]; @@ -114,12 +119,33 @@ int flags; int reconnect_timeout; struct osmo_sock_init2_multiaddr_pars ma_pars; + uint8_t in_cb_mask; /* IN_CB_MASK_* */ + bool delay_free; };
/*! \addtogroup stream_cli * @{ */
+/* return true if freed */ +static inline bool free_delayed_if_needed(struct osmo_stream_cli *cli) +{ + /* Nobody requested delayed free, skip */ + if (!cli->delay_free) + return false; + /* Check for other callbacks active in case we were e.g. in: + * read_cb() -> [user] -> osmo_steam_client_close() -> disconnect_cb() --> [user] --> osmo_stream_client_destroy() + * or: + * connect_cb() -> [user] -> osmo_steam_client_close() -> disconnect_cb() --> [user] --> osmo_stream_client_destroy() + */ + if (cli->in_cb_mask != 0) + return false; + + LOGSCLI(cli, LOGL_DEBUG, "free(delayed)\n"); + talloc_free(cli); + return true; +} + static void stream_cli_close_iofd(struct osmo_stream_cli *cli) { if (!cli->iofd) @@ -140,18 +166,21 @@
/*! Close an Osmocom Stream Client. * \param[in] cli Osmocom Stream Client to be closed + * \return true if stream was freed due to disconnect_cb, false otherwise * We unregister the socket fd from the osmocom select() loop * abstraction and close the socket */ -void osmo_stream_cli_close(struct osmo_stream_cli *cli) +static bool stream_cli_close(struct osmo_stream_cli *cli) { int old_state = cli->state; + LOGSCLI(cli, LOGL_DEBUG, "close()\n");
+ /* This guards against reentrant close through disconnect_cb(): */ if (cli->state == STREAM_CLI_STATE_CLOSED) - return; + return false; if (cli->state == STREAM_CLI_STATE_WAIT_RECONNECT) { osmo_timer_del(&cli->timer); cli->state = STREAM_CLI_STATE_CLOSED; - return; + return false; }
@@ -172,10 +201,45 @@ * Also, if reconnect is disabled by user, notify the user that connect() failed: */ if (old_state == STREAM_CLI_STATE_CONNECTED || (old_state == STREAM_CLI_STATE_CONNECTING && cli->reconnect_timeout < 0)) { - LOGSCLI(cli, LOGL_DEBUG, "connection closed\n"); + cli->in_cb_mask |= IN_CB_MASK_DISCONNECT_CB; if (cli->disconnect_cb) cli->disconnect_cb(cli); + cli->in_cb_mask &= ~IN_CB_MASK_DISCONNECT_CB; + return free_delayed_if_needed(cli); } + return false; +} + +/*! Close an Osmocom Stream Client. + * \param[in] cli Osmocom Stream Client to be closed + * We unregister the socket fd from the osmocom select() loop + * abstraction and close the socket */ +void osmo_stream_cli_close(struct osmo_stream_cli *cli) +{ + stream_cli_close(cli); +} + +/*! Re-connect an Osmocom Stream Client. + * If re-connection is enabled for this client + * (which is the case unless negative timeout was explicitly set via osmo_stream_cli_set_reconnect_timeout() call), + * we close any existing connection (if any) and schedule a re-connect timer */ +static bool stream_cli_reconnect(struct osmo_stream_cli *cli) +{ + bool freed = stream_cli_close(cli); + + if (freed) + return true; + + if (cli->reconnect_timeout < 0) { + LOGSCLI(cli, LOGL_INFO, "not reconnecting, disabled\n"); + return false; + } + + cli->state = STREAM_CLI_STATE_WAIT_RECONNECT; + LOGSCLI(cli, LOGL_INFO, "retrying reconnect in %d seconds...\n", + cli->reconnect_timeout); + osmo_timer_schedule(&cli->timer, cli->reconnect_timeout, 0); + return false; }
/*! Re-connect an Osmocom Stream Client. @@ -184,17 +248,7 @@ * we close any existing connection (if any) and schedule a re-connect timer */ void osmo_stream_cli_reconnect(struct osmo_stream_cli *cli) { - osmo_stream_cli_close(cli); - - if (cli->reconnect_timeout < 0) { - LOGSCLI(cli, LOGL_INFO, "not reconnecting, disabled\n"); - return; - } - - cli->state = STREAM_CLI_STATE_WAIT_RECONNECT; - LOGSCLI(cli, LOGL_INFO, "retrying reconnect in %d seconds...\n", - cli->reconnect_timeout); - osmo_timer_schedule(&cli->timer, cli->reconnect_timeout, 0); + stream_cli_reconnect(cli); }
/*! Check if Osmocom Stream Client is in connected state. @@ -247,12 +301,16 @@ return cli->iofd; }
-static void stream_cli_read(struct osmo_stream_cli *cli) +/* Return true if read_cb caused a delayed_free, hence cli not available anymore. */ +static bool stream_cli_read(struct osmo_stream_cli *cli) { LOGSCLI(cli, LOGL_DEBUG, "message received\n");
+ cli->in_cb_mask |= IN_CB_MASK_READ_CB; if (cli->read_cb) cli->read_cb(cli); + cli->in_cb_mask &= ~IN_CB_MASK_READ_CB; + return free_delayed_if_needed(cli); }
static int stream_cli_write(struct osmo_stream_cli *cli) @@ -321,7 +379,7 @@ return 0; } msgb_free(msg); - osmo_stream_cli_reconnect(cli); + stream_cli_reconnect(cli); return 0; }
@@ -357,13 +415,13 @@
if (ret < 0) { LOGSCLI(cli, LOGL_ERROR, "connect failed (%d)\n", res); - osmo_stream_cli_reconnect(cli); + stream_cli_reconnect(cli); return; } ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len); if (ret >= 0 && error > 0) { LOGSCLI(cli, LOGL_ERROR, "connect so_error (%d)\n", error); - osmo_stream_cli_reconnect(cli); + stream_cli_reconnect(cli); return; }
@@ -392,8 +450,11 @@ default: break; } + cli->in_cb_mask |= IN_CB_MASK_CONNECT_CB; if (cli->connect_cb) cli->connect_cb(cli); + cli->in_cb_mask &= ~IN_CB_MASK_CONNECT_CB; + free_delayed_if_needed(cli); }
static int osmo_stream_cli_fd_cb(struct osmo_fd *ofd, unsigned int what) @@ -407,7 +468,8 @@ case STREAM_CLI_STATE_CONNECTED: if (what & OSMO_FD_READ) { LOGSCLI(cli, LOGL_DEBUG, "connected read\n"); - stream_cli_read(cli); + if (stream_cli_read(cli) == true) + break; /* cli (and hence ofd) freed, done. */ } if (what & OSMO_FD_WRITE) { LOGSCLI(cli, LOGL_DEBUG, "connected write\n"); @@ -457,6 +519,7 @@ static void stream_cli_iofd_read_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg) { struct osmo_stream_cli *cli = osmo_iofd_get_data(iofd); + bool freed;
switch (cli->state) { case STREAM_CLI_STATE_CONNECTING: @@ -468,21 +531,29 @@ case -EPIPE: case -ECONNRESET: LOGSCLI(cli, LOGL_ERROR, "lost connection with srv (%d)\n", res); - osmo_stream_cli_reconnect(cli); + freed = stream_cli_reconnect(cli); break; case 0: LOGSCLI(cli, LOGL_NOTICE, "connection closed with srv\n"); - osmo_stream_cli_reconnect(cli); + freed = stream_cli_reconnect(cli); break; default: LOGSCLI(cli, LOGL_DEBUG, "received %d bytes from srv\n", res); + freed = false; break; } + if (freed) + return; /* msg was also freed as part of the talloc tree. */ /* Notify user of new data or error: */ - if (cli->iofd_read_cb) - cli->iofd_read_cb(cli, res, msg); - else + if (!cli->iofd_read_cb) { msgb_free(msg); + return; + } + cli->in_cb_mask |= IN_CB_MASK_READ_CB; + cli->iofd_read_cb(cli, res, msg); + cli->in_cb_mask &= ~IN_CB_MASK_READ_CB; + OSMO_ASSERT(cli->in_cb_mask == 0); + free_delayed_if_needed(cli); break; default: osmo_panic("%s() called with unexpected state %d\n", __func__, cli->state); @@ -499,8 +570,8 @@ break; case STREAM_CLI_STATE_CONNECTED: if (msg && res <= 0) { - osmo_stream_cli_reconnect(cli); LOGSCLI(cli, LOGL_ERROR, "received error %d in response to send\n", res); + stream_cli_reconnect(cli); } break; default: @@ -519,6 +590,7 @@ static void stream_cli_iofd_recvmsg_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg, const struct msghdr *msgh) { struct osmo_stream_cli *cli = osmo_iofd_get_data(iofd); + bool freed;
res = stream_iofd_sctp_recvmsg_trailer(iofd, msg, res, msgh);
@@ -532,20 +604,28 @@ case -EPIPE: case -ECONNRESET: LOGSCLI(cli, LOGL_ERROR, "lost connection with srv (%d)\n", res); - osmo_stream_cli_reconnect(cli); + freed = stream_cli_reconnect(cli); break; case 0: LOGSCLI(cli, LOGL_NOTICE, "connection closed with srv\n"); - osmo_stream_cli_reconnect(cli); + freed = stream_cli_reconnect(cli); break; default: + freed = false; break; } + if (freed) + return; /* msg was also freed as part of the talloc tree. */ /* Notify user of new data or error: */ - if (cli->iofd_read_cb) - cli->iofd_read_cb(cli, res, msg); - else + if (!cli->iofd_read_cb) { msgb_free(msg); + return; + } + cli->in_cb_mask |= IN_CB_MASK_READ_CB; + cli->iofd_read_cb(cli, res, msg); + cli->in_cb_mask &= ~IN_CB_MASK_READ_CB; + OSMO_ASSERT(cli->in_cb_mask == 0); + free_delayed_if_needed(cli); break; default: osmo_panic("%s() called with unexpected state %d\n", __func__, cli->state); @@ -860,10 +940,18 @@ * \param[in] cli Stream Client to destroy */ void osmo_stream_cli_destroy(struct osmo_stream_cli *cli) { - osmo_stream_cli_close(cli); + LOGSCLI(cli, LOGL_DEBUG, "destroy()\n"); + OSMO_ASSERT(!stream_cli_close(cli)); osmo_timer_del(&cli->timer); msgb_queue_free(&cli->tx_queue); - talloc_free(cli); + /* if we are in a user callback, delay freeing. */ + if (cli->in_cb_mask != 0) { + LOGSCLI(cli, LOGL_DEBUG, "delay free() in_cb_mask=0x%02x\n", cli->in_cb_mask); + cli->delay_free = true; + } else { + LOGSCLI(cli, LOGL_DEBUG, "free(destroy)\n"); + talloc_free(cli); + } }
/*! DEPRECATED: use osmo_stream_cli_set_reconnect_timeout() or osmo_stream_cli_reconnect() instead! @@ -877,8 +965,10 @@ int ret;
/* we are reconfiguring this socket, close existing first. */ - if ((cli->flags & OSMO_STREAM_CLI_F_RECONF) && cli->ofd.fd >= 0) - osmo_stream_cli_close(cli); + if ((cli->flags & OSMO_STREAM_CLI_F_RECONF) && cli->ofd.fd >= 0) { + if (stream_cli_close(cli) == true) + return -ENAVAIL; /* freed */ + }
cli->flags &= ~OSMO_STREAM_CLI_F_RECONF;
@@ -904,7 +994,7 @@ if (ret < 0) { LOGSCLI(cli, LOGL_ERROR, "connect: socket creation error (%d)\n", ret); if (reconnect) - osmo_stream_cli_reconnect(cli); + stream_cli_reconnect(cli); return ret; } osmo_fd_setup(&cli->ofd, ret, OSMO_FD_READ | OSMO_FD_WRITE, osmo_stream_cli_fd_cb, cli, 0); @@ -1031,8 +1121,10 @@ unsigned int local_addrcnt;
/* we are reconfiguring this socket, close existing first. */ - if ((cli->flags & OSMO_STREAM_CLI_F_RECONF) && osmo_stream_cli_get_fd(cli) >= 0) - osmo_stream_cli_close(cli); + if ((cli->flags & OSMO_STREAM_CLI_F_RECONF) && osmo_stream_cli_get_fd(cli) >= 0) { + if (stream_cli_close(cli) == true) + return -ENAVAIL; /* freed */ + }
cli->flags &= ~OSMO_STREAM_CLI_F_RECONF;
@@ -1075,7 +1167,7 @@
if (ret < 0) { LOGSCLI(cli, LOGL_ERROR, "connect: socket creation error (%d)\n", ret); - osmo_stream_cli_reconnect(cli); + stream_cli_reconnect(cli); return ret; }
@@ -1239,7 +1331,7 @@ LOGSCLI(cli, LOGL_ERROR, "lost connection with srv (%d)\n", errno); else LOGSCLI(cli, LOGL_ERROR, "recv failed (%d)\n", errno); - osmo_stream_cli_reconnect(cli); + stream_cli_reconnect(cli); return ret; } else if (ret == 0) { LOGSCLI(cli, LOGL_ERROR, "connection closed with srv\n"); diff --git a/tests/stream/stream_test.err b/tests/stream/stream_test.err index 5180409..8950706 100644 --- a/tests/stream/stream_test.err +++ b/tests/stream/stream_test.err @@ -1,3 +1,4 @@ +CLICONN(cli_test,){CLOSED} close() SRV(srv_link_test,127.0.0.11:1111) accept()ed new link from 127.0.0.1:8976 CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTING} connection established
@@ -32,7 +33,7 @@ CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTED} connected read CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTED} message received CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTED} connection closed with srv -CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CLOSED} connection closed +CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTED} close() CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){WAIT_RECONNECT} retrying reconnect in 9 seconds...
{11.000008} autoreconnecting test step 4 [client OK, server OK], FD reg 0 @@ -41,7 +42,7 @@ {11.000009} autoreconnecting test step 3 [client OK, server OK], FD reg 1 SRV(srv_link_test,127.0.0.11:1111) accept()ed new link from 127.0.0.1:8976 CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTING} connection established -CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CLOSED} connection closed +CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTED} close()
{11.000010} autoreconnecting test step 2 [client OK, server OK], FD reg 0 SRVCONN(srv_test,r=127.0.0.1:8976<->l=127.0.0.11:1111) connected read/write (what=0x1) @@ -49,6 +50,10 @@ SRVCONN(srv_test,r=127.0.0.1:8976<->l=127.0.0.11:1111) connection closed with client
{11.000011} autoreconnecting test step 1 [client OK, server NA], FD reg 0 +CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CLOSED} destroy() +CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CLOSED} close() +CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CLOSED} free(destroy) +CLICONN(cli_test,){CLOSED} close() SRV(srv_link_test,127.0.0.11:1111) accept()ed new link from 127.0.0.1:8976 CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTING} connection established
@@ -83,10 +88,13 @@ CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTED} connected read CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTED} message received CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTED} connection closed with srv -CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CLOSED} connection closed +CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTED} close() CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CLOSED} not reconnecting, disabled
{20.000019} non-reconnecting test step 0 [client OK, server OK], FD reg 0 +CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CLOSED} destroy() +CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CLOSED} close() +CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CLOSED} free(destroy) SRV(srv_link_test,127.0.0.11:1111) accept()ed new link from 127.0.0.1:8977 CLICONN(,r=127.0.0.11:1111<->l=127.0.0.1:8977){CONNECTING} connection established SRVCONN(,r=127.0.0.1:8977<->l=127.0.0.11:1111) received 24 bytes from client @@ -98,7 +106,9 @@ SRVCONN(,r=127.0.0.1:8977<->l=127.0.0.11:1111) received 10 bytes from client SRVCONN(,r=127.0.0.1:8977<->l=127.0.0.11:1111) received 10 bytes from client SRVCONN(,r=127.0.0.1:8977<->l=127.0.0.11:1111) received 10 bytes from client -CLICONN(,r=127.0.0.11:1111<->l=127.0.0.1:8977){CLOSED} connection closed +CLICONN(,r=127.0.0.11:1111<->l=127.0.0.1:8977){CONNECTED} destroy() +CLICONN(,r=127.0.0.11:1111<->l=127.0.0.1:8977){CONNECTED} close() +CLICONN(,r=127.0.0.11:1111<->l=127.0.0.1:8977){CLOSED} free(destroy) SRV(srv_link_test,127.0.0.11:1112) accept()ed new link from 127.0.0.1:8977 CLICONN(,r=127.0.0.11:1112<->l=127.0.0.1:8977){CONNECTING} connection established SRVCONN(,r=127.0.0.1:8977<->l=127.0.0.11:1112) connected write @@ -112,4 +122,6 @@ CLICONN(,r=127.0.0.11:1112<->l=127.0.0.1:8977){CONNECTED} received 10 bytes from srv CLICONN(,r=127.0.0.11:1112<->l=127.0.0.1:8977){CONNECTED} received 10 bytes from srv CLICONN(,r=127.0.0.11:1112<->l=127.0.0.1:8977){CONNECTED} received 10 bytes from srv -CLICONN(,r=127.0.0.11:1112<->l=127.0.0.1:8977){CLOSED} connection closed +CLICONN(,r=127.0.0.11:1112<->l=127.0.0.1:8977){CONNECTED} destroy() +CLICONN(,r=127.0.0.11:1112<->l=127.0.0.1:8977){CONNECTED} close() +CLICONN(,r=127.0.0.11:1112<->l=127.0.0.1:8977){CLOSED} free(destroy)