diff options
author | Daniel Willmann <dwillmann@sysmocom.de> | 2023-03-16 16:04:43 +0100 |
---|---|---|
committer | Daniel Willmann <dwillmann@sysmocom.de> | 2023-06-14 17:33:35 +0200 |
commit | 0e7028f742dedefd9fccc2d5b26875fca4036f58 (patch) | |
tree | 1229ae9cca938a0f2c41a462e0940a8c6cb6ded2 | |
parent | 383f721cf79f4570c0c91d52de9c05f453dce21e (diff) |
Add osmo_io support to osmo_stream_cli and osmo_stream_srv
Change-Id: I2f52c7107c392b6f4b0bf2a84f8c873c084a200c
-rw-r--r-- | include/osmocom/netif/stream.h | 5 | ||||
-rw-r--r-- | src/stream.c | 349 |
2 files changed, 325 insertions, 29 deletions
diff --git a/include/osmocom/netif/stream.h b/include/osmocom/netif/stream.h index b9a7c6d..b506142 100644 --- a/include/osmocom/netif/stream.h +++ b/include/osmocom/netif/stream.h @@ -45,6 +45,9 @@ void osmo_stream_srv_link_close(struct osmo_stream_srv_link *link); struct osmo_stream_srv; struct osmo_stream_srv *osmo_stream_srv_create(void *ctx, struct osmo_stream_srv_link *link, int fd, int (*read_cb)(struct osmo_stream_srv *conn), int (*closed_cb)(struct osmo_stream_srv *conn), void *data); +struct osmo_stream_srv *osmo_stream_srv_create2(void *ctx, const char *name, struct osmo_stream_srv_link *link, int fd, void *data); +void osmo_stream_srv_set_read_cb(struct osmo_stream_srv *conn, int (*read_cb)(struct osmo_stream_srv *conn, struct msgb *msg)); +void osmo_stream_srv_set_closed_cb(struct osmo_stream_srv *conn, int (*closed_cb)(struct osmo_stream_srv *conn)); void *osmo_stream_srv_get_data(struct osmo_stream_srv *conn); struct osmo_stream_srv_link *osmo_stream_srv_get_master(struct osmo_stream_srv *conn); struct osmo_fd *osmo_stream_srv_get_ofd(struct osmo_stream_srv *srv); @@ -79,10 +82,12 @@ struct osmo_fd *osmo_stream_cli_get_ofd(struct osmo_stream_cli *cli); void osmo_stream_cli_set_connect_cb(struct osmo_stream_cli *cli, int (*connect_cb)(struct osmo_stream_cli *cli)); void osmo_stream_cli_set_disconnect_cb(struct osmo_stream_cli *cli, int (*disconnect_cb)(struct osmo_stream_cli *cli)); void osmo_stream_cli_set_read_cb(struct osmo_stream_cli *cli, int (*read_cb)(struct osmo_stream_cli *cli)); +void osmo_stream_cli_set_read_cb2(struct osmo_stream_cli *cli, int (*read_cb)(struct osmo_stream_cli *cli, struct msgb *msg)); void osmo_stream_cli_reconnect(struct osmo_stream_cli *cli); bool osmo_stream_cli_is_connected(struct osmo_stream_cli *cli); struct osmo_stream_cli *osmo_stream_cli_create(void *ctx); +struct osmo_stream_cli *osmo_stream_cli_create2(void *ctx, const char *name); void osmo_stream_cli_destroy(struct osmo_stream_cli *cli); int osmo_stream_cli_open(struct osmo_stream_cli *cli); diff --git a/src/stream.c b/src/stream.c index 23efe24..d52b7a0 100644 --- a/src/stream.c +++ b/src/stream.c @@ -37,6 +37,8 @@ #include <osmocom/core/utils.h> #include <osmocom/gsm/tlv.h> #include <osmocom/core/msgb.h> +#include <osmocom/core/osmo_io.h> +#include <osmocom/core/panic.h> #include <osmocom/core/logging.h> #include <osmocom/core/talloc.h> #include <osmocom/core/socket.h> @@ -247,8 +249,19 @@ static const struct value_string stream_cli_state_names[] = { #define OSMO_STREAM_MAX_ADDRS 1 #endif +enum osmo_stream_mode { + OSMO_STREAM_MODE_UNKNOWN, + OSMO_STREAM_MODE_OSMO_FD, + OSMO_STREAM_MODE_OSMO_IO, +}; + struct osmo_stream_cli { - struct osmo_fd ofd; + char *name; + enum osmo_stream_mode mode; + union { + struct osmo_fd ofd; + struct osmo_io_fd *iofd; + }; struct llist_head tx_queue; struct osmo_timer_list timer; enum osmo_stream_cli_state state; @@ -264,6 +277,7 @@ struct osmo_stream_cli { int (*connect_cb)(struct osmo_stream_cli *cli); int (*disconnect_cb)(struct osmo_stream_cli *cli); int (*read_cb)(struct osmo_stream_cli *cli); + int (*iofd_read_cb)(struct osmo_stream_cli *cli, struct msgb *msg); int (*write_cb)(struct osmo_stream_cli *cli); void *data; int flags; @@ -300,6 +314,23 @@ bool osmo_stream_cli_is_connected(struct osmo_stream_cli *cli) return cli->state == STREAM_CLI_STATE_CONNECTED; } +static void osmo_stream_cli_close_iofd(struct osmo_stream_cli *cli) +{ + if (!cli->iofd) + return; + + osmo_iofd_close(cli->iofd); +} + +static void osmo_stream_cli_close_ofd(struct osmo_stream_cli *cli) +{ + if (cli->ofd.fd == -1) + return; + osmo_fd_unregister(&cli->ofd); + close(cli->ofd.fd); + cli->ofd.fd = -1; +} + /*! \brief Close an Osmocom Stream Client * \param[in] cli Osmocom Stream Client to be closed * We unregister the socket fd from the osmocom select() loop @@ -314,9 +345,17 @@ void osmo_stream_cli_close(struct osmo_stream_cli *cli) return; } - osmo_fd_unregister(&cli->ofd); - close(cli->ofd.fd); - cli->ofd.fd = -1; + + switch (cli->mode) { + case OSMO_STREAM_MODE_OSMO_FD: + osmo_stream_cli_close_ofd(cli); + break; + case OSMO_STREAM_MODE_OSMO_IO: + osmo_stream_cli_close_iofd(cli); + break; + default: + OSMO_ASSERT(false); + } if (cli->state == STREAM_CLI_STATE_CONNECTED) { LOGSCLI(cli, LOGL_DEBUG, "connection closed\n"); @@ -329,7 +368,16 @@ void osmo_stream_cli_close(struct osmo_stream_cli *cli) static inline int osmo_stream_cli_fd(const struct osmo_stream_cli *cli) { - return cli->ofd.fd; + switch (cli->mode) { + case OSMO_STREAM_MODE_OSMO_FD: + return cli->ofd.fd; + case OSMO_STREAM_MODE_OSMO_IO: + if (cli->iofd) + return osmo_iofd_get_fd(cli->iofd); + default: + break; + } + return -EINVAL; } static void osmo_stream_cli_read(struct osmo_stream_cli *cli) @@ -445,7 +493,7 @@ static void stream_cli_handle_connecting(struct osmo_stream_cli *cli, int res) /* If messages got enqueued while 'connecting', keep WRITE flag up to dispatch them upon next main loop step */ - if (llist_empty(&cli->tx_queue)) + if (cli->mode == OSMO_STREAM_MODE_OSMO_FD && llist_empty(&cli->tx_queue)) osmo_fd_write_disable(&cli->ofd); LOGSCLI(cli, LOGL_DEBUG, "connection established\n"); @@ -510,13 +558,79 @@ struct osmo_stream_cli *osmo_stream_cli_create(void *ctx) if (!cli) return NULL; + cli->name = ""; + cli->mode = OSMO_STREAM_MODE_UNKNOWN; cli->sk_domain = AF_UNSPEC; cli->sk_type = SOCK_STREAM; cli->proto = IPPROTO_TCP; - cli->ofd.fd = -1; - cli->ofd.priv_nr = 0; /* XXX */ - cli->ofd.cb = osmo_stream_cli_fd_cb; - cli->ofd.data = cli; + cli->state = STREAM_CLI_STATE_CLOSED; + osmo_timer_setup(&cli->timer, cli_timer_cb, cli); + cli->reconnect_timeout = 5; /* default is 5 seconds. */ + INIT_LLIST_HEAD(&cli->tx_queue); + + return cli; +} + +static void stream_cli_iofd_read_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg) +{ + struct osmo_stream_cli *cli = osmo_iofd_get_data(iofd); + + switch (cli->state) { + case STREAM_CLI_STATE_CONNECTING: + stream_cli_handle_connecting(cli, res); + break; + case STREAM_CLI_STATE_CONNECTED: + if (res == 0) + osmo_stream_cli_reconnect(cli); + else if (cli->iofd_read_cb) + cli->iofd_read_cb(cli, msg); + break; + default: + osmo_panic("osmo_stream_cli_write_cb() called with unexpected state %d\n", cli->state); + } +} + +static void stream_cli_iofd_write_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg) +{ + struct osmo_stream_cli *cli = osmo_iofd_get_data(iofd); + + switch (cli->state) { + case STREAM_CLI_STATE_CONNECTING: + stream_cli_handle_connecting(cli, res); + break; + case STREAM_CLI_STATE_CONNECTED: + if (msg && res <= 0) { + osmo_stream_cli_reconnect(cli); + LOGSCLI(cli, LOGL_ERROR, "received error %d in response to send\n", res); + } + break; + default: + osmo_panic("osmo_stream_cli_write_cb() called with unexpected state %d\n", cli->state); + } +} + +static struct osmo_io_ops osmo_stream_cli_ioops = { + .read_cb = stream_cli_iofd_read_cb, + .write_cb = stream_cli_iofd_write_cb, + + .segmentation_cb = NULL, +}; + + +struct osmo_stream_cli *osmo_stream_cli_create2(void *ctx, const char *name) +{ + struct osmo_stream_cli *cli; + + cli = talloc_zero(ctx, struct osmo_stream_cli); + if (!cli) + return NULL; + + cli->name = talloc_strdup(cli, name); + cli->mode = OSMO_STREAM_MODE_UNKNOWN; + cli->sk_domain = AF_UNSPEC; + cli->sk_type = SOCK_STREAM; + cli->proto = IPPROTO_TCP; + cli->state = STREAM_CLI_STATE_CLOSED; osmo_timer_setup(&cli->timer, cli_timer_cb, cli); cli->reconnect_timeout = 5; /* default is 5 seconds. */ @@ -712,6 +826,7 @@ char *osmo_stream_cli_get_sockname(const struct osmo_stream_cli *cli) struct osmo_fd * osmo_stream_cli_get_ofd(struct osmo_stream_cli *cli) { + OSMO_ASSERT(cli->mode == OSMO_STREAM_MODE_OSMO_FD); return &cli->ofd; } @@ -735,15 +850,35 @@ void osmo_stream_cli_set_disconnect_cb(struct osmo_stream_cli *cli, } /*! \brief Set the call-back function called to read from the stream client socket + * Only for osmo_stream_cli created with osmo_stream_cli_create() * \param[in] cli Stream Client to modify * \param[in] read_cb Call-back function to be called when we want to read */ void osmo_stream_cli_set_read_cb(struct osmo_stream_cli *cli, int (*read_cb)(struct osmo_stream_cli *cli)) { + OSMO_ASSERT(cli->mode != OSMO_STREAM_MODE_OSMO_IO); + cli->mode = OSMO_STREAM_MODE_OSMO_FD; + cli->ofd.fd = -1; + cli->ofd.priv_nr = 0; + cli->ofd.cb = osmo_stream_cli_fd_cb; + cli->ofd.data = cli; cli->read_cb = read_cb; } +/*! \brief Set the call-back function called to read from the stream client socket + * Only use this function for osmo_stream_cli created with osmo_stream_cli_create2() + * \param[in] cli Stream Client to modify + * \param[in] read_cb Call-back function to be called when data was read from the socket */ +void +osmo_stream_cli_set_read_cb2(struct osmo_stream_cli *cli, + int (*read_cb)(struct osmo_stream_cli *cli, struct msgb *msg)) +{ + OSMO_ASSERT(cli->mode != OSMO_STREAM_MODE_OSMO_FD); + cli->mode = OSMO_STREAM_MODE_OSMO_IO; + cli->iofd_read_cb = read_cb; +} + /*! \brief Destroy a Osmocom stream client (includes close) * \param[in] cli Stream Client to destroy */ void osmo_stream_cli_destroy(struct osmo_stream_cli *cli) @@ -882,17 +1017,33 @@ int osmo_stream_cli_open(struct osmo_stream_cli *cli) goto error_close_socket; } - osmo_fd_setup(&cli->ofd, fd, OSMO_FD_READ | OSMO_FD_WRITE, cli->ofd.cb, cli->ofd.data, cli->ofd.priv_nr); - if (osmo_fd_register(&cli->ofd) < 0) - goto error_close_socket; + switch (cli->mode) { + case OSMO_STREAM_MODE_OSMO_FD: + osmo_fd_setup(&cli->ofd, fd, OSMO_FD_READ | OSMO_FD_WRITE, cli->ofd.cb, cli->ofd.data, cli->ofd.priv_nr); + if (osmo_fd_register(&cli->ofd) < 0) + goto error_close_socket; + break; + case OSMO_STREAM_MODE_OSMO_IO: + if (!cli->iofd) + cli->iofd = osmo_iofd_setup(cli, fd, cli->name, OSMO_IO_FD_MODE_READ_WRITE, &osmo_stream_cli_ioops, cli); + if (!cli->iofd) + goto error_close_socket; + + if (osmo_iofd_register(cli->iofd, fd) < 0) + goto error_close_socket; + break; + default: + OSMO_ASSERT(false); + } cli->state = STREAM_CLI_STATE_CONNECTING; return 0; error_close_socket: cli->state = STREAM_CLI_STATE_CLOSED; - close(cli->ofd.fd); - cli->ofd.fd = -1; + close(fd); + if (cli->mode == OSMO_STREAM_MODE_OSMO_FD) + cli->ofd.fd = -1; return -EIO; } @@ -911,8 +1062,18 @@ void osmo_stream_cli_send(struct osmo_stream_cli *cli, struct msgb *msg) { OSMO_ASSERT(cli); OSMO_ASSERT(msg); - msgb_enqueue(&cli->tx_queue, msg); - osmo_fd_write_enable(&cli->ofd); + + switch (cli->mode) { + case OSMO_STREAM_MODE_OSMO_FD: + msgb_enqueue(&cli->tx_queue, msg); + osmo_fd_write_enable(&cli->ofd); + break; + case OSMO_STREAM_MODE_OSMO_IO: + osmo_iofd_write_msgb(cli->iofd, msg); + break; + default: + OSMO_ASSERT(false); + } } /*! \brief Receive data via an Osmocom stream client @@ -944,11 +1105,20 @@ int osmo_stream_cli_recv(struct osmo_stream_cli *cli, struct msgb *msg) void osmo_stream_cli_clear_tx_queue(struct osmo_stream_cli *cli) { - msgb_queue_free(&cli->tx_queue); - /* If in state 'connecting', keep WRITE flag up to receive - * socket connection signal and then transition to STATE_CONNECTED: */ - if (cli->state == STREAM_CLI_STATE_CONNECTED) - osmo_fd_write_disable(&cli->ofd); + switch (cli->mode) { + case OSMO_STREAM_MODE_OSMO_FD: + msgb_queue_free(&cli->tx_queue); + /* If in state 'connecting', keep WRITE flag up to receive + * socket connection signal and then transition to STATE_CONNECTED: */ + if (cli->state == STREAM_CLI_STATE_CONNECTED) + osmo_fd_write_disable(&cli->ofd); + break; + case OSMO_STREAM_MODE_OSMO_IO: + osmo_iofd_txqueue_clear(cli->iofd); + break; + default: + OSMO_ASSERT(false); + } } /* @@ -1322,14 +1492,56 @@ void osmo_stream_srv_link_close(struct osmo_stream_srv_link *link) struct osmo_stream_srv { struct osmo_stream_srv_link *srv; - struct osmo_fd ofd; + enum osmo_stream_mode mode; + union { + struct osmo_fd ofd; + struct osmo_io_fd *iofd; + }; struct llist_head tx_queue; int (*closed_cb)(struct osmo_stream_srv *peer); int (*read_cb)(struct osmo_stream_srv *peer); + int (*iofd_read_cb)(struct osmo_stream_srv *peer, struct msgb *msg); void *data; int flags; }; +static void stream_srv_iofd_read_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg) +{ + struct osmo_stream_srv *conn = osmo_iofd_get_data(iofd); + LOGP(DLINP, LOGL_DEBUG, "message received (res=%d)\n", res); + + if (conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY) { + LOGP(DLINP, LOGL_INFO, "Connection is being flushed and closed; ignoring received message\n"); + msgb_free(msg); + return; + } + + if (res <= 0) { + osmo_stream_srv_set_flush_and_destroy(conn); + if (osmo_iofd_txqueue_len(iofd) == 0) + osmo_stream_srv_destroy(conn); + } else if (conn->iofd_read_cb) { + conn->iofd_read_cb(conn, msg); + } +} + +static void stream_srv_iofd_write_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg) +{ + struct osmo_stream_srv *conn = osmo_iofd_get_data(iofd); + LOGP(DLINP, LOGL_DEBUG, "connected write\n"); + + if (res == -1) + LOGP(DLINP, LOGL_ERROR, "error to send: %s\n", strerror(errno)); + + if (osmo_iofd_txqueue_len(iofd) == 0) + if (conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY) + osmo_stream_srv_destroy(conn); +} + +static struct osmo_io_ops srv_ioops = { + .read_cb = stream_srv_iofd_read_cb, + .write_cb = stream_srv_iofd_write_cb, +}; static int osmo_stream_srv_read(struct osmo_stream_srv *conn) { int rc = 0; @@ -1429,6 +1641,10 @@ static int osmo_stream_srv_cb(struct osmo_fd *ofd, unsigned int what) /*! \brief Create a Stream Server inside the specified link * \param[in] ctx talloc allocation context from which to allocate * \param[in] link Stream Server Link to which we belong + * \param[in] fd system file descriptor of the new connection + * \param[in] read_cb Call-back to call when the socket is readable + * \param[in] closed_cb Call-back to call when the connection is closed + * \param[in] data User data to save in the new Stream Server struct * \returns Stream Server in case of success; NULL on error */ struct osmo_stream_srv * osmo_stream_srv_create(void *ctx, struct osmo_stream_srv_link *link, @@ -1444,6 +1660,7 @@ osmo_stream_srv_create(void *ctx, struct osmo_stream_srv_link *link, if (conn == NULL) return NULL; + conn->mode = OSMO_STREAM_MODE_OSMO_FD; conn->srv = link; osmo_fd_setup(&conn->ofd, fd, OSMO_FD_READ, osmo_stream_srv_cb, conn, 0); conn->read_cb = read_cb; @@ -1459,6 +1676,62 @@ osmo_stream_srv_create(void *ctx, struct osmo_stream_srv_link *link, return conn; } +/*! \brief Create a Stream Server inside the specified link + * \param[in] ctx talloc allocation context from which to allocate + * \param[in] name name of the connection + * \param[in] link Stream Server Link to which we belong + * \param[in] fd system file descriptor of the new connection + * \param[in] data User data to save in the new Stream Server struct + * \returns Stream Server in case of success; NULL on error */ +struct osmo_stream_srv * +osmo_stream_srv_create2(void *ctx, const char *name, + struct osmo_stream_srv_link *link, int fd, void *data) +{ + struct osmo_stream_srv *conn; + + OSMO_ASSERT(link); + + conn = talloc_zero(ctx, struct osmo_stream_srv); + if (conn == NULL) + return NULL; + + conn->mode = OSMO_STREAM_MODE_OSMO_IO; + conn->srv = link; + conn->iofd = osmo_iofd_setup(conn, fd, name, OSMO_IO_FD_MODE_READ_WRITE, &srv_ioops, conn); + if (!conn->iofd) { + talloc_free(conn); + return NULL; + } + conn->data = data; + + if (osmo_iofd_register(conn->iofd, fd) < 0) { + LOGP(DLINP, LOGL_ERROR, "could not register FD %d\n", fd); + talloc_free(conn); + return NULL; + } + + return conn; +} + +/*! \brief Set the call-back function when data was read from the stream server socket + * Only for osmo_stream_srv created with osmo_stream_srv_create2() + * \param[in] conn Stream Server to modify + * \param[in] read_cb Call-back function to be called when data was read */ +void osmo_stream_srv_set_read_cb(struct osmo_stream_srv *conn, int (*read_cb)(struct osmo_stream_srv *conn, struct msgb *msg)) +{ + OSMO_ASSERT(conn && conn->mode == OSMO_STREAM_MODE_OSMO_IO); + conn->iofd_read_cb = read_cb; +} + +/*! \brief Set the call-back function called when the stream server socket was closed + * \param[in] conn Stream Server to modify + * \param[in] closed_cb Call-back function to be called when the connection was closed */ +void osmo_stream_srv_set_closed_cb(struct osmo_stream_srv *conn, int (*closed_cb)(struct osmo_stream_srv *conn)) +{ + OSMO_ASSERT(conn); + conn->closed_cb = closed_cb; +} + /*! \brief Prepare to send out all pending messages on the connection's Tx queue * and then automatically destroy the stream with osmo_stream_srv_destroy(). * This function disables queuing of new messages on the connection and also @@ -1512,12 +1785,21 @@ struct osmo_stream_srv_link *osmo_stream_srv_get_master(struct osmo_stream_srv * * \param[in] conn Stream Server to be destroyed */ void osmo_stream_srv_destroy(struct osmo_stream_srv *conn) { - osmo_fd_unregister(&conn->ofd); - close(conn->ofd.fd); - conn->ofd.fd = -1; + switch (conn->mode) { + case OSMO_STREAM_MODE_OSMO_FD: + osmo_fd_unregister(&conn->ofd); + close(conn->ofd.fd); + msgb_queue_free(&conn->tx_queue); + conn->ofd.fd = -1; + break; + case OSMO_STREAM_MODE_OSMO_IO: + osmo_iofd_free(conn->iofd); + break; + default: + OSMO_ASSERT(false); + } if (conn->closed_cb) conn->closed_cb(conn); - msgb_queue_free(&conn->tx_queue); talloc_free(conn); } @@ -1534,8 +1816,17 @@ void osmo_stream_srv_send(struct osmo_stream_srv *conn, struct msgb *msg) return; } - msgb_enqueue(&conn->tx_queue, msg); - osmo_fd_write_enable(&conn->ofd); + switch (conn->mode) { + case OSMO_STREAM_MODE_OSMO_FD: + msgb_enqueue(&conn->tx_queue, msg); + osmo_fd_write_enable(&conn->ofd); + break; + case OSMO_STREAM_MODE_OSMO_IO: + osmo_iofd_write_msgb(conn->iofd, msg); + break; + default: + OSMO_ASSERT(false); + } } #ifdef HAVE_LIBSCTP |