pespin submitted this change.

View Change

Approvals: daniel: Looks good to me, approved Jenkins Builder: Verified laforge: Looks good to me, but someone else must approve
stream_cli: Support destroy object within user callback

This commit improves the code in stream_cli to allow the user calling
osmo_stream_cli_destroy() while in a callback.

For instance, the following scenario:
read_cb(len=0)
osmo_stream_cli_reconnect()
osmo_stream_cli_close()
disconnect_cb
[user calls osmo_stream_cli_destroy()]
[popped stack does read_cb()]
[user uses freed msg]

This allows using stream_cli by users handling destruction and reconnect
of stream_ci objects on their own, such as libosmo-abis
e1_input/sign_link stack.

Change-Id: I952938474fa2780bf3c906cbdffb2d024b03c1b7
---
M src/stream_cli.c
M tests/stream/stream_test.err
2 files changed, 149 insertions(+), 45 deletions(-)

diff --git a/src/stream_cli.c b/src/stream_cli.c
index 9fc8101..88d5f9e 100644
--- a/src/stream_cli.c
+++ b/src/stream_cli.c
@@ -83,6 +83,11 @@
#define OSMO_STREAM_CLI_F_RECONF (1 << 0)
#define OSMO_STREAM_CLI_F_NODELAY (1 << 1)

+/* Mark whether the object is currently in a user callback. */
+#define IN_CB_MASK_CONNECT_CB (1 << 0)
+#define IN_CB_MASK_DISCONNECT_CB (1 << 1)
+#define IN_CB_MASK_READ_CB (1 << 2)
+
struct osmo_stream_cli {
char *name;
char sockname[OSMO_SOCK_NAME_MAXLEN];
@@ -114,12 +119,33 @@
int flags;
int reconnect_timeout;
struct osmo_sock_init2_multiaddr_pars ma_pars;
+ uint8_t in_cb_mask; /* IN_CB_MASK_* */
+ bool delay_free;
};

/*! \addtogroup stream_cli
* @{
*/

+/* return true if freed */
+static inline bool free_delayed_if_needed(struct osmo_stream_cli *cli)
+{
+ /* Nobody requested delayed free, skip */
+ if (!cli->delay_free)
+ return false;
+ /* Check for other callbacks active in case we were e.g. in:
+ * read_cb() -> [user] -> osmo_steam_client_close() -> disconnect_cb() --> [user] --> osmo_stream_client_destroy()
+ * or:
+ * connect_cb() -> [user] -> osmo_steam_client_close() -> disconnect_cb() --> [user] --> osmo_stream_client_destroy()
+ */
+ if (cli->in_cb_mask != 0)
+ return false;
+
+ LOGSCLI(cli, LOGL_DEBUG, "free(delayed)\n");
+ talloc_free(cli);
+ return true;
+}
+
static void stream_cli_close_iofd(struct osmo_stream_cli *cli)
{
if (!cli->iofd)
@@ -140,18 +166,21 @@

/*! Close an Osmocom Stream Client.
* \param[in] cli Osmocom Stream Client to be closed
+ * \return true if stream was freed due to disconnect_cb, false otherwise
* We unregister the socket fd from the osmocom select() loop
* abstraction and close the socket */
-void osmo_stream_cli_close(struct osmo_stream_cli *cli)
+static bool stream_cli_close(struct osmo_stream_cli *cli)
{
int old_state = cli->state;
+ LOGSCLI(cli, LOGL_DEBUG, "close()\n");

+ /* This guards against reentrant close through disconnect_cb(): */
if (cli->state == STREAM_CLI_STATE_CLOSED)
- return;
+ return false;
if (cli->state == STREAM_CLI_STATE_WAIT_RECONNECT) {
osmo_timer_del(&cli->timer);
cli->state = STREAM_CLI_STATE_CLOSED;
- return;
+ return false;
}


@@ -172,10 +201,45 @@
* Also, if reconnect is disabled by user, notify the user that connect() failed: */
if (old_state == STREAM_CLI_STATE_CONNECTED ||
(old_state == STREAM_CLI_STATE_CONNECTING && cli->reconnect_timeout < 0)) {
- LOGSCLI(cli, LOGL_DEBUG, "connection closed\n");
+ cli->in_cb_mask |= IN_CB_MASK_DISCONNECT_CB;
if (cli->disconnect_cb)
cli->disconnect_cb(cli);
+ cli->in_cb_mask &= ~IN_CB_MASK_DISCONNECT_CB;
+ return free_delayed_if_needed(cli);
}
+ return false;
+}
+
+/*! Close an Osmocom Stream Client.
+ * \param[in] cli Osmocom Stream Client to be closed
+ * We unregister the socket fd from the osmocom select() loop
+ * abstraction and close the socket */
+void osmo_stream_cli_close(struct osmo_stream_cli *cli)
+{
+ stream_cli_close(cli);
+}
+
+/*! Re-connect an Osmocom Stream Client.
+ * If re-connection is enabled for this client
+ * (which is the case unless negative timeout was explicitly set via osmo_stream_cli_set_reconnect_timeout() call),
+ * we close any existing connection (if any) and schedule a re-connect timer */
+static bool stream_cli_reconnect(struct osmo_stream_cli *cli)
+{
+ bool freed = stream_cli_close(cli);
+
+ if (freed)
+ return true;
+
+ if (cli->reconnect_timeout < 0) {
+ LOGSCLI(cli, LOGL_INFO, "not reconnecting, disabled\n");
+ return false;
+ }
+
+ cli->state = STREAM_CLI_STATE_WAIT_RECONNECT;
+ LOGSCLI(cli, LOGL_INFO, "retrying reconnect in %d seconds...\n",
+ cli->reconnect_timeout);
+ osmo_timer_schedule(&cli->timer, cli->reconnect_timeout, 0);
+ return false;
}

/*! Re-connect an Osmocom Stream Client.
@@ -184,17 +248,7 @@
* we close any existing connection (if any) and schedule a re-connect timer */
void osmo_stream_cli_reconnect(struct osmo_stream_cli *cli)
{
- osmo_stream_cli_close(cli);
-
- if (cli->reconnect_timeout < 0) {
- LOGSCLI(cli, LOGL_INFO, "not reconnecting, disabled\n");
- return;
- }
-
- cli->state = STREAM_CLI_STATE_WAIT_RECONNECT;
- LOGSCLI(cli, LOGL_INFO, "retrying reconnect in %d seconds...\n",
- cli->reconnect_timeout);
- osmo_timer_schedule(&cli->timer, cli->reconnect_timeout, 0);
+ stream_cli_reconnect(cli);
}

/*! Check if Osmocom Stream Client is in connected state.
@@ -247,12 +301,16 @@
return cli->iofd;
}

-static void stream_cli_read(struct osmo_stream_cli *cli)
+/* Return true if read_cb caused a delayed_free, hence cli not available anymore. */
+static bool stream_cli_read(struct osmo_stream_cli *cli)
{
LOGSCLI(cli, LOGL_DEBUG, "message received\n");

+ cli->in_cb_mask |= IN_CB_MASK_READ_CB;
if (cli->read_cb)
cli->read_cb(cli);
+ cli->in_cb_mask &= ~IN_CB_MASK_READ_CB;
+ return free_delayed_if_needed(cli);
}

static int stream_cli_write(struct osmo_stream_cli *cli)
@@ -321,7 +379,7 @@
return 0;
}
msgb_free(msg);
- osmo_stream_cli_reconnect(cli);
+ stream_cli_reconnect(cli);
return 0;
}

@@ -357,13 +415,13 @@

if (ret < 0) {
LOGSCLI(cli, LOGL_ERROR, "connect failed (%d)\n", res);
- osmo_stream_cli_reconnect(cli);
+ stream_cli_reconnect(cli);
return;
}
ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len);
if (ret >= 0 && error > 0) {
LOGSCLI(cli, LOGL_ERROR, "connect so_error (%d)\n", error);
- osmo_stream_cli_reconnect(cli);
+ stream_cli_reconnect(cli);
return;
}

@@ -392,8 +450,11 @@
default:
break;
}
+ cli->in_cb_mask |= IN_CB_MASK_CONNECT_CB;
if (cli->connect_cb)
cli->connect_cb(cli);
+ cli->in_cb_mask &= ~IN_CB_MASK_CONNECT_CB;
+ free_delayed_if_needed(cli);
}

static int osmo_stream_cli_fd_cb(struct osmo_fd *ofd, unsigned int what)
@@ -407,7 +468,8 @@
case STREAM_CLI_STATE_CONNECTED:
if (what & OSMO_FD_READ) {
LOGSCLI(cli, LOGL_DEBUG, "connected read\n");
- stream_cli_read(cli);
+ if (stream_cli_read(cli) == true)
+ break; /* cli (and hence ofd) freed, done. */
}
if (what & OSMO_FD_WRITE) {
LOGSCLI(cli, LOGL_DEBUG, "connected write\n");
@@ -457,6 +519,7 @@
static void stream_cli_iofd_read_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg)
{
struct osmo_stream_cli *cli = osmo_iofd_get_data(iofd);
+ bool freed;

switch (cli->state) {
case STREAM_CLI_STATE_CONNECTING:
@@ -468,21 +531,29 @@
case -EPIPE:
case -ECONNRESET:
LOGSCLI(cli, LOGL_ERROR, "lost connection with srv (%d)\n", res);
- osmo_stream_cli_reconnect(cli);
+ freed = stream_cli_reconnect(cli);
break;
case 0:
LOGSCLI(cli, LOGL_NOTICE, "connection closed with srv\n");
- osmo_stream_cli_reconnect(cli);
+ freed = stream_cli_reconnect(cli);
break;
default:
LOGSCLI(cli, LOGL_DEBUG, "received %d bytes from srv\n", res);
+ freed = false;
break;
}
+ if (freed)
+ return; /* msg was also freed as part of the talloc tree. */
/* Notify user of new data or error: */
- if (cli->iofd_read_cb)
- cli->iofd_read_cb(cli, res, msg);
- else
+ if (!cli->iofd_read_cb) {
msgb_free(msg);
+ return;
+ }
+ cli->in_cb_mask |= IN_CB_MASK_READ_CB;
+ cli->iofd_read_cb(cli, res, msg);
+ cli->in_cb_mask &= ~IN_CB_MASK_READ_CB;
+ OSMO_ASSERT(cli->in_cb_mask == 0);
+ free_delayed_if_needed(cli);
break;
default:
osmo_panic("%s() called with unexpected state %d\n", __func__, cli->state);
@@ -499,8 +570,8 @@
break;
case STREAM_CLI_STATE_CONNECTED:
if (msg && res <= 0) {
- osmo_stream_cli_reconnect(cli);
LOGSCLI(cli, LOGL_ERROR, "received error %d in response to send\n", res);
+ stream_cli_reconnect(cli);
}
break;
default:
@@ -519,6 +590,7 @@
static void stream_cli_iofd_recvmsg_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg, const struct msghdr *msgh)
{
struct osmo_stream_cli *cli = osmo_iofd_get_data(iofd);
+ bool freed;

res = stream_iofd_sctp_recvmsg_trailer(iofd, msg, res, msgh);

@@ -532,20 +604,28 @@
case -EPIPE:
case -ECONNRESET:
LOGSCLI(cli, LOGL_ERROR, "lost connection with srv (%d)\n", res);
- osmo_stream_cli_reconnect(cli);
+ freed = stream_cli_reconnect(cli);
break;
case 0:
LOGSCLI(cli, LOGL_NOTICE, "connection closed with srv\n");
- osmo_stream_cli_reconnect(cli);
+ freed = stream_cli_reconnect(cli);
break;
default:
+ freed = false;
break;
}
+ if (freed)
+ return; /* msg was also freed as part of the talloc tree. */
/* Notify user of new data or error: */
- if (cli->iofd_read_cb)
- cli->iofd_read_cb(cli, res, msg);
- else
+ if (!cli->iofd_read_cb) {
msgb_free(msg);
+ return;
+ }
+ cli->in_cb_mask |= IN_CB_MASK_READ_CB;
+ cli->iofd_read_cb(cli, res, msg);
+ cli->in_cb_mask &= ~IN_CB_MASK_READ_CB;
+ OSMO_ASSERT(cli->in_cb_mask == 0);
+ free_delayed_if_needed(cli);
break;
default:
osmo_panic("%s() called with unexpected state %d\n", __func__, cli->state);
@@ -860,10 +940,18 @@
* \param[in] cli Stream Client to destroy */
void osmo_stream_cli_destroy(struct osmo_stream_cli *cli)
{
- osmo_stream_cli_close(cli);
+ LOGSCLI(cli, LOGL_DEBUG, "destroy()\n");
+ OSMO_ASSERT(!stream_cli_close(cli));
osmo_timer_del(&cli->timer);
msgb_queue_free(&cli->tx_queue);
- talloc_free(cli);
+ /* if we are in a user callback, delay freeing. */
+ if (cli->in_cb_mask != 0) {
+ LOGSCLI(cli, LOGL_DEBUG, "delay free() in_cb_mask=0x%02x\n", cli->in_cb_mask);
+ cli->delay_free = true;
+ } else {
+ LOGSCLI(cli, LOGL_DEBUG, "free(destroy)\n");
+ talloc_free(cli);
+ }
}

/*! DEPRECATED: use osmo_stream_cli_set_reconnect_timeout() or osmo_stream_cli_reconnect() instead!
@@ -877,8 +965,10 @@
int ret;

/* we are reconfiguring this socket, close existing first. */
- if ((cli->flags & OSMO_STREAM_CLI_F_RECONF) && cli->ofd.fd >= 0)
- osmo_stream_cli_close(cli);
+ if ((cli->flags & OSMO_STREAM_CLI_F_RECONF) && cli->ofd.fd >= 0) {
+ if (stream_cli_close(cli) == true)
+ return -ENAVAIL; /* freed */
+ }

cli->flags &= ~OSMO_STREAM_CLI_F_RECONF;

@@ -904,7 +994,7 @@
if (ret < 0) {
LOGSCLI(cli, LOGL_ERROR, "connect: socket creation error (%d)\n", ret);
if (reconnect)
- osmo_stream_cli_reconnect(cli);
+ stream_cli_reconnect(cli);
return ret;
}
osmo_fd_setup(&cli->ofd, ret, OSMO_FD_READ | OSMO_FD_WRITE, osmo_stream_cli_fd_cb, cli, 0);
@@ -1031,8 +1121,10 @@
unsigned int local_addrcnt;

/* we are reconfiguring this socket, close existing first. */
- if ((cli->flags & OSMO_STREAM_CLI_F_RECONF) && osmo_stream_cli_get_fd(cli) >= 0)
- osmo_stream_cli_close(cli);
+ if ((cli->flags & OSMO_STREAM_CLI_F_RECONF) && osmo_stream_cli_get_fd(cli) >= 0) {
+ if (stream_cli_close(cli) == true)
+ return -ENAVAIL; /* freed */
+ }

cli->flags &= ~OSMO_STREAM_CLI_F_RECONF;

@@ -1075,7 +1167,7 @@

if (ret < 0) {
LOGSCLI(cli, LOGL_ERROR, "connect: socket creation error (%d)\n", ret);
- osmo_stream_cli_reconnect(cli);
+ stream_cli_reconnect(cli);
return ret;
}

@@ -1239,7 +1331,7 @@
LOGSCLI(cli, LOGL_ERROR, "lost connection with srv (%d)\n", errno);
else
LOGSCLI(cli, LOGL_ERROR, "recv failed (%d)\n", errno);
- osmo_stream_cli_reconnect(cli);
+ stream_cli_reconnect(cli);
return ret;
} else if (ret == 0) {
LOGSCLI(cli, LOGL_ERROR, "connection closed with srv\n");
diff --git a/tests/stream/stream_test.err b/tests/stream/stream_test.err
index 5180409..8950706 100644
--- a/tests/stream/stream_test.err
+++ b/tests/stream/stream_test.err
@@ -1,3 +1,4 @@
+CLICONN(cli_test,){CLOSED} close()
SRV(srv_link_test,127.0.0.11:1111) accept()ed new link from 127.0.0.1:8976
CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTING} connection established

@@ -32,7 +33,7 @@
CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTED} connected read
CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTED} message received
CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTED} connection closed with srv
-CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CLOSED} connection closed
+CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTED} close()
CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){WAIT_RECONNECT} retrying reconnect in 9 seconds...

{11.000008} autoreconnecting test step 4 [client OK, server OK], FD reg 0
@@ -41,7 +42,7 @@
{11.000009} autoreconnecting test step 3 [client OK, server OK], FD reg 1
SRV(srv_link_test,127.0.0.11:1111) accept()ed new link from 127.0.0.1:8976
CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTING} connection established
-CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CLOSED} connection closed
+CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTED} close()

{11.000010} autoreconnecting test step 2 [client OK, server OK], FD reg 0
SRVCONN(srv_test,r=127.0.0.1:8976<->l=127.0.0.11:1111) connected read/write (what=0x1)
@@ -49,6 +50,10 @@
SRVCONN(srv_test,r=127.0.0.1:8976<->l=127.0.0.11:1111) connection closed with client

{11.000011} autoreconnecting test step 1 [client OK, server NA], FD reg 0
+CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CLOSED} destroy()
+CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CLOSED} close()
+CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CLOSED} free(destroy)
+CLICONN(cli_test,){CLOSED} close()
SRV(srv_link_test,127.0.0.11:1111) accept()ed new link from 127.0.0.1:8976
CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTING} connection established

@@ -83,10 +88,13 @@
CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTED} connected read
CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTED} message received
CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTED} connection closed with srv
-CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CLOSED} connection closed
+CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTED} close()
CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CLOSED} not reconnecting, disabled

{20.000019} non-reconnecting test step 0 [client OK, server OK], FD reg 0
+CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CLOSED} destroy()
+CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CLOSED} close()
+CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CLOSED} free(destroy)
SRV(srv_link_test,127.0.0.11:1111) accept()ed new link from 127.0.0.1:8977
CLICONN(,r=127.0.0.11:1111<->l=127.0.0.1:8977){CONNECTING} connection established
SRVCONN(,r=127.0.0.1:8977<->l=127.0.0.11:1111) received 24 bytes from client
@@ -98,7 +106,9 @@
SRVCONN(,r=127.0.0.1:8977<->l=127.0.0.11:1111) received 10 bytes from client
SRVCONN(,r=127.0.0.1:8977<->l=127.0.0.11:1111) received 10 bytes from client
SRVCONN(,r=127.0.0.1:8977<->l=127.0.0.11:1111) received 10 bytes from client
-CLICONN(,r=127.0.0.11:1111<->l=127.0.0.1:8977){CLOSED} connection closed
+CLICONN(,r=127.0.0.11:1111<->l=127.0.0.1:8977){CONNECTED} destroy()
+CLICONN(,r=127.0.0.11:1111<->l=127.0.0.1:8977){CONNECTED} close()
+CLICONN(,r=127.0.0.11:1111<->l=127.0.0.1:8977){CLOSED} free(destroy)
SRV(srv_link_test,127.0.0.11:1112) accept()ed new link from 127.0.0.1:8977
CLICONN(,r=127.0.0.11:1112<->l=127.0.0.1:8977){CONNECTING} connection established
SRVCONN(,r=127.0.0.1:8977<->l=127.0.0.11:1112) connected write
@@ -112,4 +122,6 @@
CLICONN(,r=127.0.0.11:1112<->l=127.0.0.1:8977){CONNECTED} received 10 bytes from srv
CLICONN(,r=127.0.0.11:1112<->l=127.0.0.1:8977){CONNECTED} received 10 bytes from srv
CLICONN(,r=127.0.0.11:1112<->l=127.0.0.1:8977){CONNECTED} received 10 bytes from srv
-CLICONN(,r=127.0.0.11:1112<->l=127.0.0.1:8977){CLOSED} connection closed
+CLICONN(,r=127.0.0.11:1112<->l=127.0.0.1:8977){CONNECTED} destroy()
+CLICONN(,r=127.0.0.11:1112<->l=127.0.0.1:8977){CONNECTED} close()
+CLICONN(,r=127.0.0.11:1112<->l=127.0.0.1:8977){CLOSED} free(destroy)

To view, visit change 38911. To unsubscribe, or for help writing mail filters, visit settings.

Gerrit-MessageType: merged
Gerrit-Project: libosmo-netif
Gerrit-Branch: master
Gerrit-Change-Id: I952938474fa2780bf3c906cbdffb2d024b03c1b7
Gerrit-Change-Number: 38911
Gerrit-PatchSet: 6
Gerrit-Owner: pespin <pespin@sysmocom.de>
Gerrit-Reviewer: Jenkins Builder
Gerrit-Reviewer: daniel <dwillmann@sysmocom.de>
Gerrit-Reviewer: fixeria <vyanitskiy@sysmocom.de>
Gerrit-Reviewer: laforge <laforge@osmocom.org>
Gerrit-Reviewer: osmith <osmith@sysmocom.de>
Gerrit-Reviewer: pespin <pespin@sysmocom.de>