diff options
Diffstat (limited to 'src/stream_srv.c')
-rw-r--r-- | src/stream_srv.c | 1215 |
1 files changed, 1215 insertions, 0 deletions
diff --git a/src/stream_srv.c b/src/stream_srv.c new file mode 100644 index 0000000..dad6b7a --- /dev/null +++ b/src/stream_srv.c @@ -0,0 +1,1215 @@ +/* (C) 2011 by Pablo Neira Ayuso <pablo@gnumonks.org> + * (C) 2015-2016 by Harald Welte <laforge@gnumonks.org> + * (C) 2023 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de> + * All Rights Reserved. + * + * SPDX-License-Identifier: GPL-2.0+ + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + */ + +#include <stdio.h> +#include <unistd.h> +#include <stdlib.h> +#include <errno.h> +#include <string.h> +#include <time.h> +#include <sys/fcntl.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <sys/ioctl.h> +#include <arpa/inet.h> +#include <netinet/in.h> +#include <netinet/tcp.h> + +#include <osmocom/core/timer.h> +#include <osmocom/core/select.h> +#include <osmocom/core/utils.h> +#include <osmocom/gsm/tlv.h> +#include <osmocom/core/msgb.h> +#include <osmocom/core/osmo_io.h> +#include <osmocom/core/panic.h> +#include <osmocom/core/logging.h> +#include <osmocom/core/talloc.h> +#include <osmocom/core/socket.h> + +#include <osmocom/netif/stream.h> +#include <osmocom/netif/stream_private.h> + +#include "config.h" + +#include <osmocom/netif/sctp.h> + +/*! \file stream_srv.c */ + +#define LOGSLNK(link, level, fmt, args...) \ + LOGP(DLINP, level, "SRV(%s,%s) " fmt, \ + link->name ? : "", \ + link->sockname, \ + ## args) + +#define LOGSSRV(srv, level, fmt, args...) \ + LOGP(DLINP, level, "SRVCONN(%s,%s) " fmt, \ + srv->name ? : "", \ + srv->sockname, \ + ## args) +/* + * Server side. + */ + +#define OSMO_STREAM_SRV_F_RECONF (1 << 0) +#define OSMO_STREAM_SRV_F_NODELAY (1 << 1) + +struct osmo_stream_srv_link { + struct osmo_fd ofd; + char *name; + char sockname[OSMO_SOCK_MULTIADDR_PEER_STR_MAXLEN]; + char *addr[OSMO_STREAM_MAX_ADDRS]; + uint8_t addrcnt; + uint16_t port; + int sk_domain; + int sk_type; + uint16_t proto; + osmo_stream_srv_link_accept_cb_t accept_cb; + void *data; + int flags; + struct osmo_sock_init2_multiaddr_pars ma_pars; +}; + +static int _setsockopt_nosigpipe(struct osmo_stream_srv_link *link, int new_fd) +{ +#ifdef SO_NOSIGPIPE + int ret; + int val = 1; + ret = setsockopt(new_fd, SOL_SOCKET, SO_NOSIGPIPE, (void *)&val, sizeof(val)); + if (ret < 0) + LOGSLNK(link, LOGL_ERROR, "Failed setting SO_NOSIGPIPE: %s\n", strerror(errno)); + return ret; +#else + return 0; +#endif +} + +static int osmo_stream_srv_link_ofd_cb(struct osmo_fd *ofd, unsigned int what) +{ + int ret; + int sock_fd; + struct osmo_sockaddr osa; + socklen_t sa_len = sizeof(osa.u.sas); + struct osmo_stream_srv_link *link = ofd->data; + + ret = accept(ofd->fd, &osa.u.sa, &sa_len); + if (ret < 0) { + LOGSLNK(link, LOGL_ERROR, "failed to accept from origin peer, reason=`%s'\n", + strerror(errno)); + return ret; + } + sock_fd = ret; + + switch (osa.u.sa.sa_family) { + case AF_UNIX: + LOGSLNK(link, LOGL_INFO, "accept()ed new link on fd %d\n", + sock_fd); + _setsockopt_nosigpipe(link, sock_fd); + break; + case AF_INET6: + case AF_INET: + LOGSLNK(link, LOGL_INFO, "accept()ed new link from %s\n", + osmo_sockaddr_to_str(&osa)); + + if (link->proto == IPPROTO_SCTP) { + _setsockopt_nosigpipe(link, sock_fd); + ret = stream_sctp_sock_activate_events(sock_fd); + if (ret < 0) + goto error_close_socket; + } + break; + default: + LOGSLNK(link, LOGL_ERROR, "accept()ed unexpected address family %d\n", + osa.u.sa.sa_family); + goto error_close_socket; + } + + if (link->flags & OSMO_STREAM_SRV_F_NODELAY) { + ret = stream_setsockopt_nodelay(sock_fd, link->proto, 1); + if (ret < 0) + goto error_close_socket; + } + + if (!link->accept_cb) { + ret = -ENOTSUP; + goto error_close_socket; + } + + ret = link->accept_cb(link, sock_fd); + if (ret) + goto error_close_socket; + return 0; + +error_close_socket: + close(sock_fd); + return ret; +} + +/*! \addtogroup stream_srv + * @{ + */ + +/*! Create an Osmocom Stream Server Link. + * A Stream Server Link is the listen()+accept() "parent" to individual connections from remote clients. + * \param[in] ctx talloc allocation context + * \returns Stream Server Link with default values (AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP) + */ +struct osmo_stream_srv_link *osmo_stream_srv_link_create(void *ctx) +{ + struct osmo_stream_srv_link *link; + + link = talloc_zero(ctx, struct osmo_stream_srv_link); + if (!link) + return NULL; + + link->sk_domain = AF_UNSPEC; + link->sk_type = SOCK_STREAM; + link->proto = IPPROTO_TCP; + osmo_fd_setup(&link->ofd, -1, OSMO_FD_READ | OSMO_FD_WRITE, osmo_stream_srv_link_ofd_cb, link, 0); + + link->ma_pars.sctp.version = 0; + + return link; +} + +/*! Set a name on the srv_link object (used during logging). + * \param[in] link server link whose name is to be set. The name is copied into the osmo_stream_srv_link, so + * the caller memory is not required to be valid beyond the call of this function. + * \param[in] name the name to be set on link + */ +void osmo_stream_srv_link_set_name(struct osmo_stream_srv_link *link, const char *name) +{ + osmo_talloc_replace_string(link, &link->name, name); +} + +/*! Retrieve name previously set on the srv_link object (see osmo_stream_srv_link_set_name()). + * \param[in] link server link whose name is to be retrieved + * \returns The name to be set on link; NULL if never set + */ +const char *osmo_stream_srv_link_get_name(const struct osmo_stream_srv_link *link) +{ + return link->name; +} + +/*! Set the NODELAY socket option to avoid Nagle-like behavior. + * Setting this to nodelay=true will automatically set the NODELAY + * socket option on any socket established via this server link, before + * calling the accept_cb() + * \param[in] link server link whose sockets are to be configured + * \param[in] nodelay whether to set (true) NODELAY after accept + */ +void osmo_stream_srv_link_set_nodelay(struct osmo_stream_srv_link *link, bool nodelay) +{ + if (nodelay) + link->flags |= OSMO_STREAM_SRV_F_NODELAY; + else + link->flags &= ~OSMO_STREAM_SRV_F_NODELAY; +} + +/*! Set the local address to which we bind. + * Any changes to this setting will only become active upon next (re)connect. + * \param[in] link Stream Server Link to modify + * \param[in] addr Local IP address + */ +void osmo_stream_srv_link_set_addr(struct osmo_stream_srv_link *link, + const char *addr) +{ + osmo_stream_srv_link_set_addrs(link, &addr, 1); +} + +/*! Set the local address set to which we bind. + * Useful for protocols allowing bind on more than one address (such as SCTP) + * Any changes to this setting will only become active upon next (re)connect. + * \param[in] link Stream Server Link to modify + * \param[in] addr Local IP address + * \return negative on error, 0 on success + */ +int osmo_stream_srv_link_set_addrs(struct osmo_stream_srv_link *link, const char **addr, size_t addrcnt) +{ + int i = 0; + + if (addrcnt > OSMO_STREAM_MAX_ADDRS) + return -EINVAL; + + for (; i < addrcnt; i++) + osmo_talloc_replace_string(link, &link->addr[i], addr[i]); + for (; i < link->addrcnt; i++) { + talloc_free(link->addr[i]); + link->addr[i] = NULL; + } + + link->addrcnt = addrcnt; + link->flags |= OSMO_STREAM_SRV_F_RECONF; + return 0; +} + +/*! Set the local port number to which we bind. + * Any changes to this setting will only become active upon next (re)connect. + * \param[in] link Stream Server Link to modify + * \param[in] port Local port number + */ +void osmo_stream_srv_link_set_port(struct osmo_stream_srv_link *link, + uint16_t port) +{ + link->port = port; + link->flags |= OSMO_STREAM_SRV_F_RECONF; +} + +/*! Set the protocol for the stream server link. + * Any changes to this setting will only become active upon next (re)connect. + * \param[in] link Stream Server Link to modify + * \param[in] proto Protocol (like IPPROTO_TCP (default), IPPROTO_SCTP, ...) + */ +void +osmo_stream_srv_link_set_proto(struct osmo_stream_srv_link *link, + uint16_t proto) +{ + link->proto = proto; + link->flags |= OSMO_STREAM_SRV_F_RECONF; +} + + +/*! Set the socket type for the stream server link. + * Any changes to this setting will only become active upon next (re)connect. + * \param[in] link Stream Server Link to modify + * \param[in] type Socket Type (like SOCK_STREAM (default), SOCK_SEQPACKET, ...) + * \returns zero on success, negative on error. + */ +int osmo_stream_srv_link_set_type(struct osmo_stream_srv_link *link, int type) +{ + switch (type) { + case SOCK_STREAM: + case SOCK_SEQPACKET: + break; + default: + return -ENOTSUP; + } + link->sk_type = type; + link->flags |= OSMO_STREAM_SRV_F_RECONF; + return 0; +} + +/*! Set the socket type for the stream server link. + * Any changes to this setting will only become active upon next (re)connect. + * \param[in] link Stream Server Link to modify + * \param[in] type Socket Domain (like AF_UNSPEC (default for IP), AF_UNIX, AF_INET, ...) + * \returns zero on success, negative on error. + */ +int osmo_stream_srv_link_set_domain(struct osmo_stream_srv_link *link, int domain) +{ + switch (domain) { + case AF_UNSPEC: + case AF_INET: + case AF_INET6: + case AF_UNIX: + break; + default: + return -ENOTSUP; + } + link->sk_domain = domain; + link->flags |= OSMO_STREAM_SRV_F_RECONF; + return 0; +} + +/*! Set application private data of the stream server link. + * \param[in] link Stream Server Link to modify + * \param[in] data User-specific data (available in call-back functions) */ +void +osmo_stream_srv_link_set_data(struct osmo_stream_srv_link *link, + void *data) +{ + link->data = data; +} + +/*! Retrieve application private data of the stream server link. + * \param[in] link Stream Server Link to modify + * \returns Application private data, as set by \ref osmo_stream_cli_set_data() */ +void *osmo_stream_srv_link_get_data(struct osmo_stream_srv_link *link) +{ + return link->data; +} + +/* Similar to osmo_sock_multiaddr_get_name_buf(), but aimed at listening sockets (only local part): */ +static char *get_local_sockname_buf(char *buf, size_t buf_len, const struct osmo_stream_srv_link *link) +{ + struct osmo_strbuf sb = { .buf = buf, .len = buf_len }; + int rc; + + if (buf_len > 0) + buf[0] = '\0'; + + switch (link->sk_domain) { + case AF_UNSPEC: + /* we assume INET(6) by default upon link creation: */ + case AF_INET: + case AF_INET6: + { + char hostbuf[OSMO_STREAM_MAX_ADDRS][INET6_ADDRSTRLEN]; + size_t num_hostbuf = ARRAY_SIZE(hostbuf); + char portbuf[6]; + bool need_more_bufs; + rc = osmo_sock_multiaddr_get_ip_and_port(link->ofd.fd, link->proto, &hostbuf[0][0], + &num_hostbuf, sizeof(hostbuf[0]), + portbuf, sizeof(portbuf), true); + if (rc < 0) + return NULL; + need_more_bufs = num_hostbuf > ARRAY_SIZE(hostbuf); + if (need_more_bufs) + num_hostbuf = ARRAY_SIZE(hostbuf); + OSMO_STRBUF_APPEND(sb, osmo_multiaddr_ip_and_port_snprintf, + &hostbuf[0][0], num_hostbuf, sizeof(hostbuf[0]), portbuf); + if (need_more_bufs) + OSMO_STRBUF_PRINTF(sb, "<need-more-bufs!>"); + return buf; + } + case AF_UNIX: + { + struct osmo_sockaddr osa; + struct sockaddr_un *sun; + socklen_t len = sizeof(osa.u.sas); + rc = getsockname(link->ofd.fd, &osa.u.sa, &len); + if (rc < 0) { + OSMO_STRBUF_PRINTF(sb, "<error-in-getsockname>"); + return buf; + } + /* Make sure sun_path is NULL terminated: */ + sun = (struct sockaddr_un *)&osa.u.sa; + sun->sun_path[sizeof(sun->sun_path) - 1] = '\0'; + OSMO_STRBUF_PRINTF(sb, "%s", sun->sun_path); + return buf; + } + default: + return NULL; + } +} + +/*! Retrieve description of the stream server link e. g. 127.0.0.1:1234. + * Calling this function will build a string that describes the socket in terms of its local/remote + * address/port. The returned name is stored in a static buffer; it is hence not re-entrant or thread-safe. + * \param[in] link Stream Server Link to examine + * \returns Link description or NULL in case of error */ +char *osmo_stream_srv_link_get_sockname(const struct osmo_stream_srv_link *link) +{ + static char buf[sizeof(link->sockname)]; + + if (!get_local_sockname_buf(buf, sizeof(buf), link)) + return NULL; + return buf; +} + +/*! Retrieve Osmocom File Descriptor of the stream server link. + * \param[in] link Stream Server Link + * \returns Pointer to \ref osmo_fd */ +struct osmo_fd * +osmo_stream_srv_link_get_ofd(struct osmo_stream_srv_link *link) +{ + return &link->ofd; +} + +/*! Retrieve File Descriptor of the stream server link. + * \param[in] conn Stream Server Link + * \returns file descriptor or negative on error */ +int osmo_stream_srv_link_get_fd(const struct osmo_stream_srv_link *link) +{ + return link->ofd.fd; +} + +/*! Set the accept() call-back of the stream server link. + * The provided call-back will be called whenever a new inbound connection + * is accept()ed. The call-back then typically creates a new osmo_stream_srv. + * If the call-back returns a negative value, the file descriptor will be closed. + * \param[in] link Stream Server Link + * \param[in] accept_cb Call-back function executed upon accept() */ +void osmo_stream_srv_link_set_accept_cb(struct osmo_stream_srv_link *link, + int (*accept_cb)(struct osmo_stream_srv_link *link, int fd)) + +{ + link->accept_cb = accept_cb; +} + +/*! Destroy the stream server link. Closes + Releases Memory. + * \param[in] link Stream Server Link */ +void osmo_stream_srv_link_destroy(struct osmo_stream_srv_link *link) +{ + osmo_stream_srv_link_close(link); + talloc_free(link); +} + +/*! Open the stream server link. This actually initializes the + * underlying socket and binds it to the configured ip/port. + * \param[in] link Stream Server Link to open + * \return negative on error, 0 on success */ +int osmo_stream_srv_link_open(struct osmo_stream_srv_link *link) +{ + int ret; + + if (link->ofd.fd >= 0) { + /* No reconfigure needed for existing socket, we are fine */ + if (!(link->flags & OSMO_STREAM_SRV_F_RECONF)) + return 0; + /* we are reconfiguring this socket, close existing first. */ + osmo_stream_srv_link_close(link); + } + + link->flags &= ~OSMO_STREAM_SRV_F_RECONF; + + switch (link->sk_domain) { + case AF_UNIX: + ret = osmo_sock_unix_init(link->sk_type, 0, link->addr[0], OSMO_SOCK_F_BIND); + break; + case AF_UNSPEC: + case AF_INET: + case AF_INET6: + switch (link->proto) { +#ifdef HAVE_LIBSCTP + case IPPROTO_SCTP: + ret = osmo_sock_init2_multiaddr2(link->sk_domain, link->sk_type, link->proto, + (const char **)link->addr, link->addrcnt, link->port, + NULL, 0, 0, OSMO_SOCK_F_BIND, &link->ma_pars); + break; +#endif + default: + ret = osmo_sock_init(link->sk_domain, link->sk_type, link->proto, + link->addr[0], link->port, OSMO_SOCK_F_BIND); + } + break; + default: + ret = -ENOTSUP; + } + if (ret < 0) + return ret; + + link->ofd.fd = ret; + if (osmo_fd_register(&link->ofd) < 0) { + close(ret); + link->ofd.fd = -1; + return -EIO; + } + + get_local_sockname_buf(link->sockname, sizeof(link->sockname), link); + return 0; +} + +/*! Check whether the stream server link is opened. + * \param[in] link Stream Server Link to check */ +bool osmo_stream_srv_link_is_opened(const struct osmo_stream_srv_link *link) +{ + if (!link) + return false; + + if (link->ofd.fd == -1) + return false; + + return true; +} + +/*! Close the stream server link and unregister from select loop. + * Does not destroy the server link, merely closes it! + * \param[in] link Stream Server Link to close */ +void osmo_stream_srv_link_close(struct osmo_stream_srv_link *link) +{ + if (!osmo_stream_srv_link_is_opened(link)) + return; + + osmo_fd_unregister(&link->ofd); + close(link->ofd.fd); + link->ofd.fd = -1; +} + +/*! Set given parameter of stream_srv_link to given value. + * \param[in] cli stream client on which to set parameter. + * \param[in] par identifier of the parameter to be set. + * \param[in] val value of the parameter to be set. + * \param[in] val_len length of the parameter value. + * \returns 0 in success; negative -errno on error. */ +int osmo_stream_srv_link_set_param(struct osmo_stream_srv_link *link, enum osmo_stream_srv_link_param par, + void *val, size_t val_len) +{ + OSMO_ASSERT(link); + uint8_t val8; + + switch (par) { + case OSMO_STREAM_SRV_LINK_PAR_SCTP_SOCKOPT_AUTH_SUPPORTED: + if (!val || val_len != sizeof(uint8_t)) + return -EINVAL; + val8 = *(uint8_t *)val; + link->ma_pars.sctp.sockopt_auth_supported.set = true; + link->ma_pars.sctp.sockopt_auth_supported.abort_on_failure = val8 > 1; + link->ma_pars.sctp.sockopt_auth_supported.value = (val8 == 1 || val8 == 3) ? 1 : 0; + break; + case OSMO_STREAM_SRV_LINK_PAR_SCTP_SOCKOPT_ASCONF_SUPPORTED: + if (!val || val_len != sizeof(uint8_t)) + return -EINVAL; + val8 = *(uint8_t *)val; + link->ma_pars.sctp.sockopt_asconf_supported.set = true; + link->ma_pars.sctp.sockopt_asconf_supported.abort_on_failure = val8 > 1; + link->ma_pars.sctp.sockopt_asconf_supported.value = (val8 == 1 || val8 == 3) ? 1 : 0; + break; + case OSMO_STREAM_SRV_LINK_PAR_SCTP_INIT_NUM_OSTREAMS: + if (!val || val_len != sizeof(uint16_t)) + return -EINVAL; + link->ma_pars.sctp.sockopt_initmsg.set = true; + link->ma_pars.sctp.sockopt_initmsg.num_ostreams_present = true; + link->ma_pars.sctp.sockopt_initmsg.num_ostreams_value = *(uint16_t *)val; + break; + case OSMO_STREAM_SRV_LINK_PAR_SCTP_INIT_MAX_INSTREAMS: + if (!val || val_len != sizeof(uint16_t)) + return -EINVAL; + link->ma_pars.sctp.sockopt_initmsg.set = true; + link->ma_pars.sctp.sockopt_initmsg.max_instreams_present = true; + link->ma_pars.sctp.sockopt_initmsg.max_instreams_value = *(uint16_t *)val; + break; + default: + return -ENOENT; + }; + return 0; +} + +/*! @} */ + +#define OSMO_STREAM_SRV_F_FLUSH_DESTROY (1 << 0) + +struct osmo_stream_srv { + struct osmo_stream_srv_link *srv; + char *name; + char sockname[OSMO_SOCK_NAME_MAXLEN]; + enum osmo_stream_mode mode; + union { + struct osmo_fd ofd; + struct osmo_io_fd *iofd; + }; + struct llist_head tx_queue; + osmo_stream_srv_closed_cb_t closed_cb; + osmo_stream_srv_read_cb_t read_cb; + osmo_stream_srv_read_cb2_t iofd_read_cb; + void *data; + int flags; +}; + +/*! \addtogroup stream_srv + * @{ + */ + +static void stream_srv_iofd_read_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg) +{ + struct osmo_stream_srv *conn = osmo_iofd_get_data(iofd); + + switch (res) { + case -EPIPE: + case -ECONNRESET: + LOGSSRV(conn, LOGL_ERROR, "lost connection with client (%d)\n", res); + break; + case 0: + LOGSSRV(conn, LOGL_NOTICE, "connection closed with client\n"); + break; + default: + LOGSSRV(conn, LOGL_DEBUG, "received %d bytes from client\n", res); + break; + } + if (OSMO_UNLIKELY(conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY)) { + LOGSSRV(conn, LOGL_INFO, "Connection is being flushed and closed; ignoring received message\n"); + msgb_free(msg); + if (osmo_iofd_txqueue_len(iofd) == 0) + osmo_stream_srv_destroy(conn); + return; + } + + if (conn->iofd_read_cb) + conn->iofd_read_cb(conn, res, msg); + else + msgb_free(msg); +} + +static void stream_srv_iofd_write_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg) +{ + struct osmo_stream_srv *conn = osmo_iofd_get_data(iofd); + LOGSSRV(conn, LOGL_DEBUG, "connected write\n"); + + if (res < 0) + LOGSSRV(conn, LOGL_ERROR, "error to send: %s\n", strerror(errno)); + + if (osmo_iofd_txqueue_len(iofd) == 0) + if (conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY) + osmo_stream_srv_destroy(conn); +} + +static const struct osmo_io_ops srv_ioops = { + .read_cb = stream_srv_iofd_read_cb, + .write_cb = stream_srv_iofd_write_cb, +}; + +#ifdef HAVE_LIBSCTP +static void stream_srv_iofd_recvmsg_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg, const struct msghdr *msgh) +{ + struct osmo_stream_srv *conn = osmo_iofd_get_data(iofd); + LOGSSRV(conn, LOGL_DEBUG, "message received (res=%d)\n", res); + + res = stream_iofd_sctp_recvmsg_trailer(iofd, msg, res, msgh); + + switch (res) { + case -EPIPE: + case -ECONNRESET: + LOGSSRV(conn, LOGL_ERROR, "lost connection with client (%d)\n", res); + break; + case 0: + LOGSSRV(conn, LOGL_NOTICE, "connection closed with client\n"); + break; + default: + if (OSMO_LIKELY(res > 0)) + LOGSSRV(conn, LOGL_DEBUG, "received %u bytes from client\n", res); + break; + } + if (OSMO_UNLIKELY(conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY)) { + LOGSSRV(conn, LOGL_INFO, "Connection is being flushed and closed; ignoring received message\n"); + msgb_free(msg); + if (osmo_iofd_txqueue_len(iofd) == 0) + osmo_stream_srv_destroy(conn); + return; + } + + if (conn->iofd_read_cb) + conn->iofd_read_cb(conn, res, msg); + else + msgb_free(msg); +} + +static const struct osmo_io_ops srv_ioops_sctp = { + .recvmsg_cb = stream_srv_iofd_recvmsg_cb, + .sendmsg_cb = stream_srv_iofd_write_cb, +}; +#endif + +static int osmo_stream_srv_read(struct osmo_stream_srv *conn) +{ + int rc = 0; + + LOGSSRV(conn, LOGL_DEBUG, "message received\n"); + + if (conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY) { + LOGSSRV(conn, LOGL_INFO, "Connection is being flushed and closed; ignoring received message\n"); + return 0; + } + + if (conn->read_cb) + rc = conn->read_cb(conn); + + return rc; +} + +static void osmo_stream_srv_write(struct osmo_stream_srv *conn) +{ +#ifdef HAVE_LIBSCTP + struct sctp_sndrcvinfo sinfo; +#endif + struct msgb *msg; + int ret; + + if (llist_empty(&conn->tx_queue)) { + osmo_fd_write_disable(&conn->ofd); + return; + } + msg = llist_first_entry(&conn->tx_queue, struct msgb, list); + llist_del(&msg->list); + + LOGSSRV(conn, LOGL_DEBUG, "sending %u bytes of data\n", msg->len); + + switch (conn->srv->sk_domain) { + case AF_UNIX: + ret = send(conn->ofd.fd, msgb_data(msg), msgb_length(msg), 0); + break; + case AF_INET: + case AF_INET6: + case AF_UNSPEC: + switch (conn->srv->proto) { +#ifdef HAVE_LIBSCTP + case IPPROTO_SCTP: + memset(&sinfo, 0, sizeof(sinfo)); + sinfo.sinfo_ppid = htonl(msgb_sctp_ppid(msg)); + sinfo.sinfo_stream = msgb_sctp_stream(msg); + ret = sctp_send(conn->ofd.fd, msgb_data(msg), msgb_length(msg), + &sinfo, MSG_NOSIGNAL); + break; +#endif + case IPPROTO_TCP: + default: + ret = send(conn->ofd.fd, msgb_data(msg), msgb_length(msg), 0); + break; + } + break; + default: + ret = -1; + errno = ENOTSUP; + } + + if (ret >= 0 && ret < msgb_length(msg)) { + LOGSSRV(conn, LOGL_ERROR, "short send: %d < exp %u\n", ret, msgb_length(msg)); + /* Update msgb and re-add it at the start of the queue: */ + msgb_pull(msg, ret); + llist_add(&msg->list, &conn->tx_queue); + return; + } + + if (ret == -1) {/* send(): On error -1 is returned, and errno is set appropriately */ + int err = errno; + LOGSSRV(conn, LOGL_ERROR, "send(len=%u) error: %s\n", msgb_length(msg), strerror(err)); + if (err == EAGAIN) { + /* Re-add at the start of the queue to re-attempt: */ + llist_add(&msg->list, &conn->tx_queue); + return; + } + msgb_free(msg); + osmo_stream_srv_destroy(conn); + return; + } + + msgb_free(msg); + + if (llist_empty(&conn->tx_queue)) { + osmo_fd_write_disable(&conn->ofd); + if (conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY) + osmo_stream_srv_destroy(conn); + } +} + +static int osmo_stream_srv_cb(struct osmo_fd *ofd, unsigned int what) +{ + struct osmo_stream_srv *conn = ofd->data; + int rc = 0; + + LOGSSRV(conn, LOGL_DEBUG, "connected read/write (what=0x%x)\n", what); + if (what & OSMO_FD_READ) + rc = osmo_stream_srv_read(conn); + if (rc != -EBADF && (what & OSMO_FD_WRITE)) + osmo_stream_srv_write(conn); + + return rc; +} + + +/*! Create a legacy osmo_fd mode Stream Server inside the specified link. + * + * This is the function an application traditionally calls from within the + * accept_cb call-back of the osmo_stream_srv_link. It creates a new + * osmo_stream_srv within that link. + * + * New users/programs should use osmo_stream_srv_create2 to operate in osmo_io + * mode instead. + * + * \param[in] ctx talloc allocation context from which to allocate + * \param[in] link Stream Server Link to which we belong + * \param[in] fd system file descriptor of the new connection + * \param[in] read_cb Call-back to call when the socket is readable + * \param[in] closed_cb Call-back to call when the connection is closed + * \param[in] data User data to save in the new Stream Server struct + * \returns Stream Server in case of success; NULL on error */ +struct osmo_stream_srv * +osmo_stream_srv_create(void *ctx, struct osmo_stream_srv_link *link, int fd, + osmo_stream_srv_read_cb_t read_cb, + osmo_stream_srv_closed_cb_t closed_cb, + void *data) +{ + struct osmo_stream_srv *conn; + + OSMO_ASSERT(link); + + conn = talloc_zero(ctx, struct osmo_stream_srv); + if (conn == NULL) + return NULL; + + conn->mode = OSMO_STREAM_MODE_OSMO_FD; + conn->srv = link; + osmo_fd_setup(&conn->ofd, fd, OSMO_FD_READ, osmo_stream_srv_cb, conn, 0); + conn->read_cb = read_cb; + conn->closed_cb = closed_cb; + conn->data = data; + INIT_LLIST_HEAD(&conn->tx_queue); + + osmo_sock_get_name_buf(conn->sockname, sizeof(conn->sockname), fd); + + if (osmo_fd_register(&conn->ofd) < 0) { + LOGSSRV(conn, LOGL_ERROR, "could not register FD\n"); + talloc_free(conn); + return NULL; + } + return conn; +} + +/*! Create an osmo_iofd mode Stream Server inside the specified link. + * + * This is the function an application typically calls from within the + * accept_cb call-back of the osmo_stream_srv_link. It creates a new + * osmo_stream_srv in osmo_io mode within that link. + * + * \param[in] ctx talloc allocation context from which to allocate + * \param[in] link Stream Server Link to which we belong + * \param[in] fd system file descriptor of the new connection + * \param[in] data User data to save in the new Stream Server struct + * \returns Stream Server in case of success; NULL on error */ +struct osmo_stream_srv * +osmo_stream_srv_create2(void *ctx, struct osmo_stream_srv_link *link, int fd, void *data) +{ + struct osmo_stream_srv *conn; + + OSMO_ASSERT(link); + + conn = talloc_zero(ctx, struct osmo_stream_srv); + if (conn == NULL) + return NULL; + + conn->mode = OSMO_STREAM_MODE_OSMO_IO; + conn->srv = link; + + osmo_sock_get_name_buf(conn->sockname, sizeof(conn->sockname), fd); + + if (link->proto == IPPROTO_SCTP) { + conn->iofd = osmo_iofd_setup(conn, fd, conn->sockname, OSMO_IO_FD_MODE_RECVMSG_SENDMSG, + &srv_ioops_sctp, conn); + if (conn->iofd) + osmo_iofd_set_cmsg_size(conn->iofd, CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))); + } else { + conn->iofd = osmo_iofd_setup(conn, fd, conn->sockname, OSMO_IO_FD_MODE_READ_WRITE, + &srv_ioops, conn); + } + if (!conn->iofd) { + talloc_free(conn); + return NULL; + } + conn->data = data; + + if (osmo_iofd_register(conn->iofd, fd) < 0) { + LOGSSRV(conn, LOGL_ERROR, "could not register FD %d\n", fd); + talloc_free(conn); + return NULL; + } + + return conn; +} + +/*! Set a name on the srv object (used during logging). + * \param[in] conn server whose name is to be set. The name is copied into the osmo_stream_srv_link, so + * the caller memory is not required to be valid beyond the call of this function. + * \param[in] name the name to be set on conn + */ +void osmo_stream_srv_set_name(struct osmo_stream_srv *conn, const char *name) +{ + osmo_talloc_replace_string(conn, &conn->name, name); + if (conn->mode == OSMO_STREAM_MODE_OSMO_IO && conn->iofd) + osmo_iofd_set_name(conn->iofd, name); +} + +/*! Retrieve name previously set on the srv object (see osmo_stream_srv_set_name()). + * \param[in] conn server whose name is to be retrieved + * \returns The name to be set on conn; NULL if never set + */ +const char *osmo_stream_srv_get_name(const struct osmo_stream_srv *conn) +{ + return conn->name; +} + +/*! Set the call-back function for incoming data on an osmo_io stream_srv. + * + * This function only works with osmo_stream_srv in osmo_io mode, created by osmo_stream_srv_create2()! + * + * Whenever data is received on the osmo_stram_srv, the read_cb call-back function of the user application is + * called. + * + * \param[in] conn Stream Server to modify + * \param[in] read_cb Call-back function to be called when data was read */ +void osmo_stream_srv_set_read_cb(struct osmo_stream_srv *conn, + osmo_stream_srv_read_cb2_t read_cb) +{ + OSMO_ASSERT(conn && conn->mode == OSMO_STREAM_MODE_OSMO_IO); + conn->iofd_read_cb = read_cb; +} + +/*! Set the call-back function called when the stream server socket was closed. + * Whenever the socket was closed (network error, client disconnect, etc.), the user-provided + * call-back function given here is called. This is typically used by the application to clean up any of its + * internal state related to this specific client/connection. + * \param[in] conn Stream Server to modify + * \param[in] closed_cb Call-back function to be called when the connection was closed */ +void osmo_stream_srv_set_closed_cb(struct osmo_stream_srv *conn, + osmo_stream_srv_closed_cb_t closed_cb) +{ + OSMO_ASSERT(conn); + conn->closed_cb = closed_cb; +} + +/*! Prepare to send out all pending messages on the connection's Tx queue. + * and then automatically destroy the stream with osmo_stream_srv_destroy(). + * This function disables queuing of new messages on the connection and also + * disables reception of new messages on the connection. + * \param[in] conn Stream Server to modify */ +void osmo_stream_srv_set_flush_and_destroy(struct osmo_stream_srv *conn) +{ + conn->flags |= OSMO_STREAM_SRV_F_FLUSH_DESTROY; +} + +/*! Set application private data of the stream server. + * \param[in] conn Stream Server to modify + * \param[in] data User-specific data (available in call-back functions) */ +void +osmo_stream_srv_set_data(struct osmo_stream_srv *conn, + void *data) +{ + conn->data = data; +} + +/*! Set the segmentation callback for target osmo_stream_srv structure. + * + * A segmentation call-back can optionally be used when a packet based protocol (like TCP) is used within a + * STREAM style socket that does not preserve message boundaries within the stream. If a segmentation + * call-back is given, the osmo_stream_srv library code will makes sure that the read_cb called only for + * complete single messages, and not arbitrary segments of the stream. + * + * This function only works with osmo_stream_srv in osmo_io mode, created by osmo_stream_srv_create2()! + * The connection has to have been established prior to calling this function. + * + * \param[in,out] conn Target Stream Server to modify + * \param[in] segmentation_cb Segmentation callback to be set */ +void osmo_stream_srv_set_segmentation_cb(struct osmo_stream_srv *conn, + osmo_stream_srv_segmentation_cb_t segmentation_cb) +{ + /* Note that the following implies that iofd != NULL, since + * osmo_stream_srv_create2() creates the iofd member, too */ + OSMO_ASSERT(conn->mode == OSMO_STREAM_MODE_OSMO_IO); + /* Copy default settings */ + struct osmo_io_ops conn_ops; + osmo_iofd_get_ioops(conn->iofd, &conn_ops); + /* Set segmentation cb for this connection */ + conn_ops.segmentation_cb = segmentation_cb; + osmo_iofd_set_ioops(conn->iofd, &conn_ops); +} + +/*! Retrieve application private data of the stream server + * \param[in] conn Stream Server + * \returns Application private data, as set by \ref osmo_stream_srv_set_data() */ +void *osmo_stream_srv_get_data(struct osmo_stream_srv *conn) +{ + return conn->data; +} + +/*! Retrieve the stream server socket description. + * The returned name is stored in a static buffer; it is hence not re-entrant or thread-safe! + * \param[in] cli Stream Server to examine + * \returns Socket description or NULL in case of error */ +const char *osmo_stream_srv_get_sockname(const struct osmo_stream_srv *conn) +{ + static char buf[OSMO_STREAM_MAX_ADDRS * OSMO_SOCK_NAME_MAXLEN]; + + osmo_sock_multiaddr_get_name_buf(buf, sizeof(buf), + osmo_stream_srv_get_fd(conn), conn->srv->proto); + + return buf; +} + +/*! Retrieve Osmocom File Descriptor of a stream server in osmo_fd mode. + * \param[in] conn Stream Server + * \returns Pointer to \ref osmo_fd */ +struct osmo_fd * +osmo_stream_srv_get_ofd(struct osmo_stream_srv *conn) +{ + OSMO_ASSERT(conn->mode == OSMO_STREAM_MODE_OSMO_FD); + return &conn->ofd; +} + +/*! Retrieve File Descriptor of the stream server + * \param[in] conn Stream Server + * \returns file descriptor or negative on error */ +int +osmo_stream_srv_get_fd(const struct osmo_stream_srv *conn) +{ + switch (conn->mode) { + case OSMO_STREAM_MODE_OSMO_FD: + return conn->ofd.fd; + case OSMO_STREAM_MODE_OSMO_IO: + if (conn->iofd) + return osmo_iofd_get_fd(conn->iofd); + default: + break; + } + return -EINVAL; +} + +/*! Retrieve osmo_io descriptor of the stream server socket. + * This function must not be called on a stream server in legacy osmo_fd mode! + * \param[in] srv Stream Server of which we want to obtain the osmo_io descriptor + * \returns osmo_io_fd of stream server. */ +struct osmo_io_fd * +osmo_stream_srv_get_iofd(const struct osmo_stream_srv *srv) +{ + OSMO_ASSERT(srv->mode == OSMO_STREAM_MODE_OSMO_IO); + return srv->iofd; +} + +/*! Retrieve the master (Link) from a Stream Server. + * \param[in] conn Stream Server of which we want to know the Link + * \returns Link through which the given Stream Server is established */ +struct osmo_stream_srv_link *osmo_stream_srv_get_master(struct osmo_stream_srv *conn) +{ + return conn->srv; +} + +/*! Destroy given Stream Server. + * This function closes the Stream Server socket, unregisters from the underlying I/O mechanism, invokes the + * connection's closed_cb() callback to allow API users to clean up any associated state they have for this + * connection, and then de-allocates associated memory. + * \param[in] conn Stream Server to be destroyed */ +void osmo_stream_srv_destroy(struct osmo_stream_srv *conn) +{ + switch (conn->mode) { + case OSMO_STREAM_MODE_OSMO_FD: + osmo_fd_unregister(&conn->ofd); + close(conn->ofd.fd); + msgb_queue_free(&conn->tx_queue); + conn->ofd.fd = -1; + break; + case OSMO_STREAM_MODE_OSMO_IO: + osmo_iofd_free(conn->iofd); + conn->iofd = NULL; + break; + default: + OSMO_ASSERT(false); + } + if (conn->closed_cb) + conn->closed_cb(conn); + talloc_free(conn); +} + +/*! Enqueue data to be sent via an Osmocom stream server. + * \param[in] conn Stream Server through which we want to send + * \param[in] msg Message buffer to enqueue in transmit queue */ +void osmo_stream_srv_send(struct osmo_stream_srv *conn, struct msgb *msg) +{ + int rc; + + OSMO_ASSERT(conn); + OSMO_ASSERT(msg); + if (conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY) { + LOGSSRV(conn, LOGL_DEBUG, "Connection is being flushed and closed; ignoring new outgoing message\n"); + msgb_free(msg); + return; + } + + switch (conn->mode) { + case OSMO_STREAM_MODE_OSMO_FD: + msgb_enqueue(&conn->tx_queue, msg); + osmo_fd_write_enable(&conn->ofd); + break; + case OSMO_STREAM_MODE_OSMO_IO: + if (conn->srv->proto == IPPROTO_SCTP) + rc = stream_iofd_sctp_send_msgb(conn->iofd, msg, MSG_NOSIGNAL); + else + rc = osmo_iofd_write_msgb(conn->iofd, msg); + if (rc < 0) + msgb_free(msg); + break; + default: + OSMO_ASSERT(false); + } +} + +/*! Receive data via an Osmocom stream server in osmo_fd mode. + * \param[in] conn Stream Server from which to receive + * \param msg pre-allocate message buffer to which received data is appended + * \returns number of bytes read, negative on error. + * + * Application programs using the legacy osmo_fd mode of osmo_stream_srv will use + * this function to read/receive from a stream socket after they have been notified that + * it is readable (via select/poll). + * + * If conn is an SCTP connection, additional specific considerations shall be taken: + * - msg->cb is always filled with SCTP ppid, and SCTP stream values, see msgb_sctp_*() APIs. + * - If an SCTP notification was received when reading from the SCTP socket, + * msgb_sctp_msg_flags(msg) will contain bit flag + * OSMO_STREAM_SCTP_MSG_FLAGS_NOTIFICATION set, and the msgb will + * contain a "union sctp_notification" instead of user data. In this case the + * return code will be either 0 (if conn is considered dead after the + * notification) or -EAGAIN (if conn is considered still alive after the + * notification) resembling the standard recv() API. + */ +int osmo_stream_srv_recv(struct osmo_stream_srv *conn, struct msgb *msg) +{ + int ret; + OSMO_ASSERT(conn); + OSMO_ASSERT(msg); + OSMO_ASSERT(conn->mode == OSMO_STREAM_MODE_OSMO_FD); + + switch (conn->srv->sk_domain) { + case AF_UNIX: + ret = recv(conn->ofd.fd, msg->tail, msgb_tailroom(msg), 0); + break; + case AF_INET: + case AF_INET6: + case AF_UNSPEC: + switch (conn->srv->proto) { +#ifdef HAVE_LIBSCTP + case IPPROTO_SCTP: + { + char log_pfx[128]; + snprintf(log_pfx, sizeof(log_pfx), "SRV(%s,%s)", conn->name ? : "", conn->sockname); + ret = stream_sctp_recvmsg_wrapper(conn->ofd.fd, msg, log_pfx); + break; + } +#endif + case IPPROTO_TCP: + default: + ret = recv(conn->ofd.fd, msg->tail, msgb_tailroom(msg), 0); + break; + } + break; + default: + ret = -ENOTSUP; + } + + if (ret < 0) { + if (errno == EPIPE || errno == ECONNRESET) + LOGSSRV(conn, LOGL_ERROR, "lost connection with client\n"); + return ret; + } else if (ret == 0) { + LOGSSRV(conn, LOGL_ERROR, "connection closed with client\n"); + return ret; + } + msgb_put(msg, ret); + LOGSSRV(conn, LOGL_DEBUG, "received %d bytes from client\n", ret); + return ret; +} + +void osmo_stream_srv_clear_tx_queue(struct osmo_stream_srv *conn) +{ + switch (conn->mode) { + case OSMO_STREAM_MODE_OSMO_FD: + msgb_queue_free(&conn->tx_queue); + osmo_fd_write_disable(&conn->ofd); + break; + case OSMO_STREAM_MODE_OSMO_IO: + osmo_iofd_txqueue_clear(conn->iofd); + break; + case OSMO_STREAM_MODE_UNKNOWN: + default: + break; + } + + if (conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY) + osmo_stream_srv_destroy(conn); +} + +/*! @} */ |