diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Makefile.am | 13 | ||||
-rw-r--r-- | src/amr.c | 207 | ||||
-rw-r--r-- | src/datagram.c | 46 | ||||
-rw-r--r-- | src/ipa.c | 82 | ||||
-rw-r--r-- | src/jibuf.c | 9 | ||||
-rw-r--r-- | src/osmux.c | 998 | ||||
-rw-r--r-- | src/osmux_input.c | 857 | ||||
-rw-r--r-- | src/osmux_output.c | 396 | ||||
-rw-r--r-- | src/prim.c | 473 | ||||
-rw-r--r-- | src/rs232.c | 32 | ||||
-rw-r--r-- | src/rtp.c | 6 | ||||
-rw-r--r-- | src/sctp.c | 95 | ||||
-rw-r--r-- | src/stream.c | 1232 | ||||
-rw-r--r-- | src/stream_cli.c | 1237 | ||||
-rw-r--r-- | src/stream_srv.c | 1215 |
15 files changed, 4810 insertions, 2088 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 4d8b900..7b38ac0 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1,6 +1,6 @@ # This is _NOT_ the library release version, it's an API version. # Please read Chapter 6 "Library interface versions" of the libtool documentation before making any modification -LIBVERSION=7:0:1 +LIBVERSION=12:0:1 AM_CPPFLAGS = -I$(top_srcdir)/include -I$(top_builddir) AM_CFLAGS= -fPIC -Wall $(LIBOSMOCORE_CFLAGS) $(LIBOSMOGSM_CFLAGS) $(LIBOSMOABIS_CFLAGS) $(COVERAGE_CFLAGS) $(LIBSCTP_CFLAGS) @@ -9,7 +9,7 @@ AM_LDFLAGS = $(COVERAGE_LDFLAGS) lib_LTLIBRARIES = libosmonetif.la libosmonetif_la_LIBADD = $(LIBOSMOCORE_LIBS) $(LIBOSMOGSM_LIBS) $(LIBSCTP_LIBS) -libosmonetif_la_LDFLAGS = -version-info $(LIBVERSION) -no-undefined +libosmonetif_la_LDFLAGS = $(AM_LDFLAGS) -version-info $(LIBVERSION) -no-undefined libosmonetif_la_SOURCES = amr.c \ datagram.c \ @@ -17,6 +17,15 @@ libosmonetif_la_SOURCES = amr.c \ ipa_unit.c \ jibuf.c \ osmux.c \ + osmux_input.c \ + osmux_output.c \ + prim.c \ rs232.c \ rtp.c \ + stream_cli.c \ + stream_srv.c \ stream.c + +if ENABLE_LIBSCTP +libosmonetif_la_SOURCES += sctp.c +endif @@ -14,9 +14,11 @@ #include <unistd.h> #include <string.h> #include <stdbool.h> + +#include <osmocom/core/utils.h> #include <osmocom/netif/amr.h> -/* According to TS 26.101: +/* According to TS 26.101 Table A.1b: * * Frame type AMR code bits bytes * 0 4.75 95 12 @@ -27,8 +29,35 @@ * 5 7.95 159 20 * 6 10.20 204 26 * 7 12.20 244 31 + * 8 AMR SID 39 5 + * 9 GSM-EFR SID 43 5 + * 10 TDMA-EFR SID 38 5 + * 11 PDC-EFR SID 37 5 + * 12 NOT USED + * 13 NOT USED + * 14 NOT USED + * 15 NO DATA 0 0 */ +static int amr_ft_to_bits[AMR_FT_MAX] = { + [AMR_FT_0] = AMR_FT_0_LEN_BITS, + [AMR_FT_1] = AMR_FT_1_LEN_BITS, + [AMR_FT_2] = AMR_FT_2_LEN_BITS, + [AMR_FT_3] = AMR_FT_3_LEN_BITS, + [AMR_FT_4] = AMR_FT_4_LEN_BITS, + [AMR_FT_5] = AMR_FT_5_LEN_BITS, + [AMR_FT_6] = AMR_FT_6_LEN_BITS, + [AMR_FT_7] = AMR_FT_7_LEN_BITS, + [AMR_FT_SID] = AMR_FT_SID_LEN_BITS, + [AMR_FT_GSM_EFR_SID] = AMR_FT_GSM_EFR_SID_LEN_BITS, + [AMR_FT_TDMA_EFR_SID] = AMR_FT_TDMA_EFR_SID_LEN_BITS, + [AMR_FT_PDC_EFR_SID] = AMR_FT_PDC_EFR_SID_LEN_BITS, + [12] = 0, /* for future use */ + [13] = 0, /* for future use */ + [14] = 0, /* for future use */ + [AMR_FT_NO_DATA] = AMR_FT_NO_DATA_LEN_BITS, +}; + static size_t amr_ft_to_bytes[AMR_FT_MAX] = { [AMR_FT_0] = AMR_FT_0_LEN, [AMR_FT_1] = AMR_FT_1_LEN, @@ -39,44 +68,59 @@ static size_t amr_ft_to_bytes[AMR_FT_MAX] = { [AMR_FT_6] = AMR_FT_6_LEN, [AMR_FT_7] = AMR_FT_7_LEN, [AMR_FT_SID] = AMR_FT_SID_LEN, + [AMR_FT_GSM_EFR_SID] = AMR_FT_GSM_EFR_SID_LEN, + [AMR_FT_TDMA_EFR_SID] = AMR_FT_TDMA_EFR_SID_LEN, + [AMR_FT_PDC_EFR_SID] = AMR_FT_PDC_EFR_SID_LEN, + [12] = 0, /* for future use */ + [13] = 0, /* for future use */ + [14] = 0, /* for future use */ + [AMR_FT_NO_DATA] = AMR_FT_NO_DATA_LEN, }; +size_t osmo_amr_bits(uint8_t amr_ft) +{ + OSMO_ASSERT(amr_ft < AMR_FT_MAX); + return amr_ft_to_bits[amr_ft]; +} + size_t osmo_amr_bytes(uint8_t amr_ft) { + OSMO_ASSERT(amr_ft < AMR_FT_MAX); return amr_ft_to_bytes[amr_ft]; } +int osmo_amr_bytes_to_ft(size_t bytes) +{ + int ft; + + for (ft = 0; ft < AMR_FT_PDC_EFR_SID; ft++) { + if (amr_ft_to_bytes[ft] == bytes) + return ft; + } + /* 12-14 not used, jump to 15 (AMR_FT_NO_DATA): */ + if (amr_ft_to_bytes[AMR_FT_NO_DATA] == bytes) + return AMR_FT_NO_DATA; + + return -1; +} + int osmo_amr_ft_valid(uint8_t amr_ft) { - /* - * Extracted from RFC3267: - * - * "... with a FT value in the range 9-14 for AMR ... the whole packet - * SHOULD be discarded." - * - * "... packets containing only NO_DATA frames (FT=15) SHOULD NOT be - * transmitted." - * - * So, let's discard frames with a AMR FT >= 9. - */ - if (amr_ft >= AMR_FT_MAX) - return 0; - - return 1; + return amr_ft <= AMR_FT_PDC_EFR_SID || amr_ft == AMR_FT_NO_DATA; } /*! Check if an AMR frame is octet aligned by looking at the padding bits. * \param[inout] payload user provided memory containing the AMR payload. * \param[in] payload_len overall length of the AMR payload. * \returns true when the payload is octet aligned. */ -bool osmo_amr_is_oa(uint8_t *payload, unsigned int payload_len) +bool osmo_amr_is_oa(const uint8_t *payload, unsigned int payload_len) { /* NOTE: The distinction between octet-aligned and bandwith-efficient * mode normally relys on out of band methods that explicitly select * one of the two modes. (See also RFC 3267, chapter 3.8). However the * A interface in GSM does not provide ways to communicate which mode - * is used exactly used. The following functions uses some heuristics - * to check if an AMR payload is octet aligned or not. */ + * is exactly used. The following functions uses some heuristics to + * check if an AMR payload is octet aligned or not. */ struct amr_hdr *oa_hdr = (struct amr_hdr *)payload; unsigned int frame_len; @@ -103,7 +147,7 @@ bool osmo_amr_is_oa(uint8_t *payload, unsigned int payload_len) /* Match the length of the received payload against the expected frame * length that is defined by the frame type. */ - if(!osmo_amr_ft_valid(oa_hdr->ft)) + if (!osmo_amr_ft_valid(oa_hdr->ft)) return false; frame_len = osmo_amr_bytes(oa_hdr->ft); if (frame_len != payload_len - sizeof(struct amr_hdr)) @@ -119,8 +163,10 @@ bool osmo_amr_is_oa(uint8_t *payload, unsigned int payload_len) int osmo_amr_oa_to_bwe(uint8_t *payload, unsigned int payload_len) { struct amr_hdr *oa_hdr = (struct amr_hdr *)payload; + unsigned int ft = oa_hdr->ft; unsigned int frame_len = payload_len - sizeof(struct amr_hdr); unsigned int i; + int bwe_payload_len; /* This implementation is not capable to handle multi-frame * packets, so we need to make sure that the frame we operate on @@ -128,8 +174,12 @@ int osmo_amr_oa_to_bwe(uint8_t *payload, unsigned int payload_len) if (oa_hdr->f != 0) return -1; + /* Check for valid FT (AMR mode) value */ + if (!osmo_amr_ft_valid(oa_hdr->ft)) + return -1; + /* Move TOC close to CMR */ - payload[0] |= (payload[1] >> 4) & 0x0f; + payload[0] = (payload[0] & 0xf0) | ((payload[1] >> 4) & 0x0f); payload[1] = (payload[1] << 4) & 0xf0; for (i = 0; i < frame_len; i++) { @@ -137,8 +187,10 @@ int osmo_amr_oa_to_bwe(uint8_t *payload, unsigned int payload_len) payload[i + 2] = payload[i + 2] << 6; } - /* The overall saving is one byte! */ - return payload_len - 1; + /* Calculate new payload length */ + bwe_payload_len = OSMO_BYTES_FOR_BITS(AMR_HDR_BWE_LEN_BITS + osmo_amr_bits(ft)); + + return bwe_payload_len; } /*! Convert an AMR frame from bandwith-efficient mode to octet-aligned mode. @@ -150,10 +202,15 @@ int osmo_amr_bwe_to_oa(uint8_t *payload, unsigned int payload_len, unsigned int payload_maxlen) { uint8_t buf[256]; - unsigned int frame_len = payload_len - 1; + /* The header is only valid after shifting first two bytes to OA mode */ + struct amr_hdr_bwe *bwe_hdr = (struct amr_hdr_bwe *)payload; + struct amr_hdr *oa_hdr; unsigned int i; + unsigned int oa_payload_len; + uint8_t ft; - memset(buf, 0, sizeof(buf)); + if (payload_len < sizeof(struct amr_hdr_bwe)) + return -1; if (payload_len + 1 > payload_maxlen) return -1; @@ -161,16 +218,98 @@ int osmo_amr_bwe_to_oa(uint8_t *payload, unsigned int payload_len, if (payload_len + 1 > sizeof(buf)) return -1; - buf[0] = payload[0] & 0xf0; - buf[1] = payload[0] << 4; - buf[1] |= (payload[1] >> 4) & 0x0c; + ft = (bwe_hdr->ft_hi << 1) | bwe_hdr->ft_lo; + if (!osmo_amr_ft_valid(ft)) + return -1; + if (OSMO_BYTES_FOR_BITS(AMR_HDR_BWE_LEN_BITS + osmo_amr_bits(ft)) > payload_len) + return -1; + + memset(buf, 0, sizeof(buf)); + oa_hdr = (struct amr_hdr *)buf; + oa_hdr->cmr = bwe_hdr->cmr; + oa_hdr->f = bwe_hdr->f; + oa_hdr->ft = ft; + oa_hdr->q = bwe_hdr->q; + + /* Calculate new payload length */ + oa_payload_len = 2 + osmo_amr_bytes(ft); + + for (i = 2; i < oa_payload_len - 1; i++) { + buf[i] = payload[i - 1] << 2; + buf[i] |= payload[i] >> 6; + } + buf[i] = payload[i - 1] << 2; + + memcpy(payload, buf, oa_payload_len); + return oa_payload_len; +} + +/*! Convert an AMR frame from bandwith-efficient mode to IuuP/IuFP payload. + * The IuuP/IuPF payload only contains the class a, b, c bits. No header. + * \param[inout] payload user provided memory containing the AMR payload. + * \param[in] payload_len overall length of the AMR payload. + * \param[in] payload_maxlen maximum length of the user provided memory. + * \returns resulting payload length, negative on error. */ +int osmo_amr_bwe_to_iuup(uint8_t *payload, unsigned int payload_len) +{ + /* The header is only valid after shifting first two bytes to OA mode */ + unsigned int i, required_len_bits; + unsigned int amr_speech_len_bytes, amr_speech_len_bits; + uint8_t ft; + + if (payload_len < 2) + return -1; + + /* Calculate new payload length */ + ft = ((payload[0] & 0x07) << 1) | ((payload[1] & 0x80) >> 7); + if (!osmo_amr_ft_valid(ft)) + return -1; + + amr_speech_len_bits = osmo_amr_bits(ft); + amr_speech_len_bytes = osmo_amr_bytes(ft); - for (i = 0; i < frame_len - 1; i++) { - buf[i + 2] = payload[i + 1] << 2; - buf[i + 2] |= payload[i + 2] >> 6; + /* shift of AMR_HDR_BWE_LEN_BITS (10) bits, aka remove BWE Hdr + ToC: */ + required_len_bits = AMR_HDR_BWE_LEN_BITS + amr_speech_len_bits; + if (payload_len < OSMO_BYTES_FOR_BITS(required_len_bits)) + return -1; + + for (i = 0; i < amr_speech_len_bytes; i++) { + /* we have to shift the payload by 10 bits to get only the Class A, B, C bits */ + payload[i] = (payload[i + 1] << 2) | ((payload[i + 2]) >> 6); } - buf[i + 2] = payload[i + 1] << 2; - memcpy(payload, buf, payload_len + 1); - return payload_len + 1; + return amr_speech_len_bytes; +} + +/*! Convert an AMR frame from IuuP/IuFP payload to bandwith-efficient mode. + * The IuuP/IuPF payload only contains the class a, b, c bits. No header. + * The resulting buffer has space at the start prepared to be filled by CMR, TOC. + * \param[inout] payload user provided memory containing the AMR payload. + * \param[in] payload_len overall length of the AMR payload. + * \param[in] payload_maxlen maximum length of the user provided memory (payload_len + 2 required). + * \returns resulting payload length, negative on error. */ +int osmo_amr_iuup_to_bwe(uint8_t *payload, unsigned int payload_len, + unsigned int payload_maxlen) +{ + /* shift all bits by AMR_HDR_BWE_LEN_BITS (10) */ + unsigned int i, required_len_bits, required_len_bytes; + + int ft = osmo_amr_bytes_to_ft(payload_len); + if (ft < 0) + return ft; + + required_len_bits = osmo_amr_bits(ft) + 10; + required_len_bytes = OSMO_BYTES_FOR_BITS(required_len_bits); + if (payload_maxlen < required_len_bytes) + return -1; + + i = payload_len + 1; + payload[i] = (payload[i - 2] << 6); + for (i = payload_len; i >= 2; i--) { + /* we have to shift the payload by AMR_HDR_BWE_LEN_BITS (10) bits to prepend BWE Hdr + ToC */ + payload[i] = (payload[i - 1] >> 2) | (payload[i - 2] << 6); + } + payload[i] = (payload[i - 1] >> 2); + payload[0] = 0; + return required_len_bytes; } diff --git a/src/datagram.c b/src/datagram.c index 634e702..9df0630 100644 --- a/src/datagram.c +++ b/src/datagram.c @@ -41,13 +41,12 @@ #include <osmocom/netif/datagram.h> -/*! \addtogroup datagram Osmocom Datagram Socket +#define OSMO_DGRAM_CLI_F_RECONF (1 << 0) + +#define OSMO_DGRAM_RX_F_RECONF (1 << 0) + +/*! \addtogroup datagram * @{ - * - * This code is intended to abstract any use of datagram type sockets, - * such as UDP. It offers both transmitter and receiver side - * implementations, fully integrated with the libosmocore select loop - * abstraction. */ /*! \file datagram.c @@ -59,8 +58,6 @@ * Client side. */ -#define OSMO_DGRAM_CLI_F_RECONF (1 << 0) - struct osmo_dgram_tx { struct osmo_fd ofd; struct llist_head tx_queue; @@ -95,7 +92,7 @@ static int osmo_dgram_tx_write(struct osmo_dgram_tx *conn) LOGP(DLINP, LOGL_DEBUG, "sending data\n"); if (llist_empty(&conn->tx_queue)) { - conn->ofd.when &= ~BSC_FD_WRITE; + osmo_fd_write_disable(&conn->ofd); return 0; } lh = conn->tx_queue.next; @@ -115,11 +112,11 @@ static int osmo_dgram_tx_fd_cb(struct osmo_fd *ofd, unsigned int what) { struct osmo_dgram_tx *conn = ofd->data; - if (what & BSC_FD_WRITE) { + if (what & OSMO_FD_WRITE) { LOGP(DLINP, LOGL_DEBUG, "write\n"); osmo_dgram_tx_write(conn); } - return 0; + return 0; } /*! \brief Create an Osmocom datagram transmitter @@ -135,11 +132,7 @@ struct osmo_dgram_tx *osmo_dgram_tx_create(void *ctx) if (!conn) return NULL; - conn->ofd.fd = -1; - conn->ofd.when |= BSC_FD_READ; - conn->ofd.priv_nr = 0; /* XXX */ - conn->ofd.cb = osmo_dgram_tx_fd_cb; - conn->ofd.data = conn; + osmo_fd_setup(&conn->ofd, -1, OSMO_FD_READ, osmo_dgram_tx_fd_cb, conn, 0); INIT_LLIST_HEAD(&conn->tx_queue); return conn; @@ -240,22 +233,20 @@ void osmo_dgram_tx_send(struct osmo_dgram_tx *conn, struct msgb *msg) { msgb_enqueue(&conn->tx_queue, msg); - conn->ofd.when |= BSC_FD_WRITE; + osmo_fd_write_enable(&conn->ofd); } /* * Server side. */ -#define OSMO_DGRAM_RX_F_RECONF (1 << 0) - struct osmo_dgram_rx { - struct osmo_fd ofd; - char *addr; - uint16_t port; + struct osmo_fd ofd; + char *addr; + uint16_t port; int (*cb)(struct osmo_dgram_rx *conn); - void *data; - unsigned int flags; + void *data; + unsigned int flags; }; /*! \brief Receive data via Osmocom datagram receiver @@ -290,7 +281,7 @@ static int osmo_dgram_rx_cb(struct osmo_fd *ofd, unsigned int what) struct osmo_dgram_rx *conn = ofd->data; LOGP(DLINP, LOGL_DEBUG, "read\n"); - if (what & BSC_FD_READ) + if (what & OSMO_FD_READ) osmo_dgram_rx_read(conn); return 0; @@ -309,10 +300,7 @@ struct osmo_dgram_rx *osmo_dgram_rx_create(void *ctx) if (!conn) return NULL; - conn->ofd.fd = -1; - conn->ofd.when |= BSC_FD_READ; - conn->ofd.cb = osmo_dgram_rx_cb; - conn->ofd.data = conn; + osmo_fd_setup(&conn->ofd, -1, OSMO_FD_READ, osmo_dgram_rx_cb, conn, 0); return conn; } @@ -97,6 +97,11 @@ struct msgb *osmo_ipa_msg_alloc(int headroom) return msg; } +struct msgb *osmo_ipa_ext_msg_alloc(size_t headroom) +{ + return osmo_ipa_msg_alloc(sizeof(struct ipa_head_ext) + headroom); +} + void osmo_ipa_msg_push_header(struct msgb *msg, uint8_t proto) { struct ipa_head *hh; @@ -366,3 +371,80 @@ osmo_ipa_parse_msg_id_resp(struct msgb *msg, struct ipaccess_unit *unit_data) return 0; } + +#define MSG_CB_IPA_INFO_OFFSET 0 + +/* Check and remove headers (in case of p == IPAC_PROTO_OSMO, also the IPA extension header). + * Returns a negative number on error, otherwise the number of octets removed */ +static inline int ipa_check_pull_headers(struct msgb *msg) +{ + int ret; + size_t octets_removed = 0; + msg->l1h = msg->data; + struct ipa_head *ih = (struct ipa_head *)msg->data; + osmo_ipa_msgb_cb_proto(msg) = ih->proto; + + if ((ret = osmo_ipa_process_msg(msg)) < 0) { + LOGP(DLINP, LOGL_ERROR, "Error processing IPA message\n"); + return -EIO; + } + msgb_pull(msg, sizeof(struct ipa_head)); + octets_removed += sizeof(struct ipa_head); + msg->l2h = msg->data; + if (ih->proto != IPAC_PROTO_OSMO) + return octets_removed; + + osmo_ipa_msgb_cb_proto_ext(msg) = msg->data[0]; + msgb_pull(msg, sizeof(struct ipa_head_ext)); + octets_removed += sizeof(struct ipa_head_ext); + return octets_removed; +} + +/*! Segmentation callback used by libosmo-netif streaming backend + * See definition of `struct osmo_io_ops` for callback semantics + * \param[out] msg Original `struct msgb` received via osmo_io + * \returns The total packet length indicated by the first header, + * otherwise negative number on error. Constants: + * -EAGAIN, if the header has not been read yet, + * -ENOBUFS, if the header declares a payload too large + */ +int osmo_ipa_segmentation_cb(struct msgb *msg) +{ + const struct ipa_head *hh = (const struct ipa_head *) msg->data; + size_t payload_len, total_len; + size_t available = msgb_length(msg) + msgb_tailroom(msg); + int removed_octets = 0; + + if (msgb_length(msg) < sizeof(*hh)) { + /* Haven't even read the entire header */ + return -EAGAIN; + } + payload_len = osmo_ntohs(hh->len); + total_len = sizeof(*hh) + payload_len; + if (OSMO_UNLIKELY(available < total_len)) { + LOGP(DLINP, LOGL_ERROR, "Not enough space left in message buffer. " + "Have %zu octets, but need %zu\n", + available, total_len); + return -ENOBUFS; + } + if (total_len <= msgb_length(msg)) { + removed_octets = ipa_check_pull_headers(msg); + if (removed_octets < 0) { + LOGP(DLINP, LOGL_ERROR, "Error pulling IPA headers\n"); + return removed_octets; + } + } + return total_len; +} + +/*! Push IPA headers to a message + * If we have IPAC_PROTO_OSMO this also takes care of the extension header + * \param[out] msg Target message + * \param p Target IPA protocol + * \param pe Target IPA protocol extension. Ignored, unless p equals IPAC_PROTO_OSMO. */ +void osmo_ipa_msg_push_headers(struct msgb *msg, enum ipaccess_proto p, enum ipaccess_proto_ext pe) +{ + if (p == IPAC_PROTO_OSMO) + ipa_prepend_header_ext(msg, pe); + osmo_ipa_msg_push_header(msg, p); +} diff --git a/src/jibuf.c b/src/jibuf.c index 502f6e5..a351248 100644 --- a/src/jibuf.c +++ b/src/jibuf.c @@ -26,10 +26,6 @@ #include <arpa/inet.h> -/*! \addtogroup jibuf Osmocom Jitter Buffer - * @{ - */ - /*! \file jibuf.c * \brief Osmocom Jitter Buffer helpers */ @@ -290,6 +286,11 @@ static void recalc_threshold_delay(struct osmo_jibuf *jb) } +/*! \addtogroup jibuf Osmocom Jitter Buffer + * @{ + */ + + //---------------------------------- /*! \brief Allocate a new jitter buffer instance diff --git a/src/osmux.c b/src/osmux.c index 8b6a115..999aedd 100644 --- a/src/osmux.c +++ b/src/osmux.c @@ -1,7 +1,7 @@ /* * (C) 2012-2017 by Pablo Neira Ayuso <pablo@gnumonks.org> * (C) 2012 by On Waves ehf <http://www.on-waves.com> - * (C) 2015-2017 by sysmocom - s.f.m.c. GmbH + * (C) 2015-2022 by sysmocom - s.f.m.c. GmbH * * SPDX-License-Identifier: GPL-2.0+ * @@ -27,6 +27,14 @@ #include <arpa/inet.h> +#define SNPRINTF_BUFFER_SIZE(ret, remain, offset) \ + if (ret < 0) \ + ret = 0; \ + offset += ret; \ + if (ret > remain) \ + ret = remain; \ + remain -= ret; + /*! \addtogroup osmux Osmocom Multiplex Protocol * @{ * @@ -44,997 +52,22 @@ * \brief Osmocom multiplex protocol helpers */ - -/* This allows you to debug timing reconstruction in the output path */ -#if 0 -#define DEBUG_TIMING 0 -#endif - -/* This allows you to debug osmux message transformations (spamming) */ -#if 0 -#define DEBUG_MSG 0 -#endif - -/* delta time between two RTP messages (in microseconds) */ -#define DELTA_RTP_MSG 20000 -/* delta time between two RTP messages (in samples, 8kHz) */ -#define DELTA_RTP_TIMESTAMP 160 - -static void *osmux_ctx; - static uint32_t osmux_get_payload_len(struct osmux_hdr *osmuxh) { return osmo_amr_bytes(osmuxh->amr_ft) * (osmuxh->ctr+1); } -static uint32_t osmux_ft_dummy_size(uint8_t amr_ft, uint8_t batch_factor) -{ - return sizeof(struct osmux_hdr) + (osmo_amr_bytes(amr_ft) * batch_factor); -} - -struct osmux_hdr *osmux_xfrm_output_pull(struct msgb *msg) -{ - struct osmux_hdr *osmuxh; -next: - osmuxh = NULL; - if (msg->len > sizeof(struct osmux_hdr)) { - size_t len; - - osmuxh = (struct osmux_hdr *)msg->data; - - switch (osmuxh->ft) { - case OSMUX_FT_VOICE_AMR: - break; - case OSMUX_FT_DUMMY: - len = osmux_ft_dummy_size(osmuxh->amr_ft, osmuxh->ctr + 1); - if (msgb_length(msg) < len) { - LOGP(DLMUX, LOGL_ERROR, "Discarding bad Dummy FT: %s\n", - osmo_hexdump(msg->data, msgb_length(msg))); - return NULL; - } - msgb_pull(msg, len); - goto next; - default: - LOGP(DLMUX, LOGL_ERROR, "Discarding unsupported Osmux FT %d\n", - osmuxh->ft); - return NULL; - } - if (!osmo_amr_ft_valid(osmuxh->amr_ft)) { - LOGP(DLMUX, LOGL_ERROR, "Discarding bad AMR FT %d\n", - osmuxh->amr_ft); - return NULL; - } - - len = osmo_amr_bytes(osmuxh->amr_ft) * (osmuxh->ctr+1) + - sizeof(struct osmux_hdr); - - if (msgb_length(msg) < len) { - LOGP(DLMUX, LOGL_ERROR, - "Discarding malformed OSMUX message: %s\n", - osmo_hexdump(msg->data, msgb_length(msg))); - return NULL; - } - - msgb_pull(msg, len); - } else if (msg->len > 0) { - LOGP(DLMUX, LOGL_ERROR, - "remaining %d bytes, broken osmuxhdr?\n", msg->len); - } - - return osmuxh; -} - -static struct msgb * -osmux_rebuild_rtp(struct osmux_out_handle *h, struct osmux_hdr *osmuxh, - void *payload, int payload_len, bool first_pkt) -{ - struct msgb *prev_msg, *out_msg; - struct timespec *prev_ts, *out_ts; - struct rtp_hdr *rtph; - struct amr_hdr *amrh; - struct timespec delta = { .tv_sec = 0, .tv_nsec = DELTA_RTP_MSG*1000 }; - - out_msg = msgb_alloc(sizeof(struct rtp_hdr) + - sizeof(struct amr_hdr) + - osmo_amr_bytes(osmuxh->amr_ft), - "OSMUX test"); - if (out_msg == NULL) - return NULL; - - /* Reconstruct RTP header */ - rtph = (struct rtp_hdr *)out_msg->data; - rtph->csrc_count = 0; - rtph->extension = 0; - rtph->version = RTP_VERSION; - rtph->payload_type = h->rtp_payload_type; - /* ... emulate timestamp and ssrc */ - rtph->timestamp = htonl(h->rtp_timestamp); - rtph->sequence = htons(h->rtp_seq); - rtph->ssrc = htonl(h->rtp_ssrc); - /* rtp packet with the marker bit is always guaranteed to be the first - * one. We want to notify with marker in 2 scenarios: - * 1- Sender told us through osmux frame rtp_m. - * 2- Sntermediate osmux frame lost (seq gap), otherwise rtp receiver only sees - * steady increase of delay - */ - rtph->marker = first_pkt && - (osmuxh->rtp_m || (osmuxh->seq != h->osmux_seq_ack + 1)); - - msgb_put(out_msg, sizeof(struct rtp_hdr)); - - /* Reconstruct AMR header */ - amrh = (struct amr_hdr *)out_msg->tail; - amrh->cmr = osmuxh->amr_cmr; - amrh->f = osmuxh->amr_f; - amrh->ft = osmuxh->amr_ft; - amrh->q = osmuxh->amr_q; - - msgb_put(out_msg, sizeof(struct amr_hdr)); - - /* add AMR speech data */ - memcpy(out_msg->tail, payload, payload_len); - msgb_put(out_msg, payload_len); - - /* bump last RTP sequence number and timestamp that has been used */ - h->rtp_seq++; - h->rtp_timestamp += DELTA_RTP_TIMESTAMP; - - out_ts = ((struct timespec *)&((out_msg)->cb[0])); - if (first_pkt || llist_empty(&h->list)) { - osmo_clock_gettime(CLOCK_MONOTONIC, out_ts); - } else { - prev_msg = llist_last_entry(&h->list, struct msgb, list); - prev_ts = ((struct timespec *)&((prev_msg)->cb[0])); - timespecadd(prev_ts, &delta, out_ts); - } - - return out_msg; -} - -int osmux_xfrm_output(struct osmux_hdr *osmuxh, struct osmux_out_handle *h, - struct llist_head *list) -{ - struct msgb *msg; - int i; - - INIT_LLIST_HEAD(list); - - for (i=0; i<osmuxh->ctr+1; i++) { - struct rtp_hdr *rtph; - - msg = osmux_rebuild_rtp(h, osmuxh, - osmux_get_payload(osmuxh) + - i * osmo_amr_bytes(osmuxh->amr_ft), - osmo_amr_bytes(osmuxh->amr_ft), !i); - if (msg == NULL) - continue; - - rtph = osmo_rtp_get_hdr(msg); - if (rtph == NULL) - continue; - -#ifdef DEBUG_MSG - { - char buf[4096]; - - osmo_rtp_snprintf(buf, sizeof(buf), msg); - buf[sizeof(buf)-1] = '\0'; - LOGP(DLMUX, LOGL_DEBUG, "to BTS: %s\n", buf); - } -#endif - llist_add_tail(&msg->list, list); - } - - /* Update last seen seq number: */ - h->osmux_seq_ack = osmuxh->seq; - - return i; -} - -static void osmux_xfrm_output_trigger(void *data) -{ - struct osmux_out_handle *h = data; - struct timespec delay_ts, now; - struct msgb *msg, *next; - - llist_for_each_entry_safe(msg, next, &h->list, list) { - osmo_clock_gettime(CLOCK_MONOTONIC, &now); - struct timespec *msg_ts = ((struct timespec *)&((msg)->cb[0])); - if (timespeccmp(msg_ts, &now, >)) { - timespecsub(msg_ts, &now, &delay_ts); - osmo_timer_schedule(&h->timer, - delay_ts.tv_sec, delay_ts.tv_nsec / 1000); - return; - } - - /* Transmit the rtp packet */ - llist_del(&msg->list); - if (h->tx_cb) - h->tx_cb(msg, h->data); - else - msgb_free(msg); - } -} - -/*! \brief Generate RTP packets from osmux frame AMR payload set and schedule - * them for transmission at appropiate time. - * \param[in] h the osmux out handle handling a specific CID - * \param[in] osmuxh Buffer pointing to osmux frame header structure and AMR payload - * \return Number of generated RTP packets - * - * The osmux frame passed to this function must be of the type OSMUX_FT_VOICE_AMR. - * The generated RTP packets are kept into h's internal list and sent to the - * callback configured through osmux_xfrm_output_set_tx_cb when are ready to be - * transmitted according to schedule. - */ -int osmux_xfrm_output_sched(struct osmux_out_handle *h, struct osmux_hdr *osmuxh) -{ - struct timespec now, *msg_ts; - struct msgb *msg; - int i; - bool was_empty = llist_empty(&h->list); - - if (!was_empty) { - /* If we received new data it means we are behind schedule and - * we should flush all previous quickly */ - osmo_clock_gettime(CLOCK_MONOTONIC, &now); - llist_for_each_entry(msg, &h->list, list) { - msg_ts = ((struct timespec *)&((msg)->cb[0])); - *msg_ts = now; - } - osmo_timer_schedule(&h->timer, 0, 0); - } - - for (i=0; i<osmuxh->ctr+1; i++) { - struct rtp_hdr *rtph; - - msg = osmux_rebuild_rtp(h, osmuxh, - osmux_get_payload(osmuxh) + - i * osmo_amr_bytes(osmuxh->amr_ft), - osmo_amr_bytes(osmuxh->amr_ft), !i); - if (msg == NULL) - continue; - - rtph = osmo_rtp_get_hdr(msg); - if (rtph == NULL) - continue; - - llist_add_tail(&msg->list, &h->list); - } - - /* Update last seen seq number: */ - h->osmux_seq_ack = osmuxh->seq; - - /* In case list is still empty after parsing messages, no need to rearm */ - if(was_empty && !llist_empty(&h->list)) - osmux_xfrm_output_trigger(h); - return i; -} - -/*! \brief Flush all scheduled RTP packets still pending to be transmitted - * \param[in] h the osmux out handle to flush - * - * This function will immediately call the transmit callback for all queued RTP - * packets, making sure the list ends up empty. It will also stop all internal - * timers to make sure the osmux_out_handle can be dropped or re-used by calling - * osmux_xfrm_output on it. - */ -void osmux_xfrm_output_flush(struct osmux_out_handle *h) -{ - struct msgb *msg, *next; - - if (osmo_timer_pending(&h->timer)) - osmo_timer_del(&h->timer); - - llist_for_each_entry_safe(msg, next, &h->list, list) { - llist_del(&msg->list); - if (h->tx_cb) - h->tx_cb(msg, h->data); - else - msgb_free(msg); - } -} - -struct osmux_batch { - struct osmo_timer_list timer; - struct osmux_hdr *osmuxh; - struct llist_head circuit_list; - unsigned int remaining_bytes; - uint8_t seq; - uint32_t nmsgs; - int ndummy; -}; - -struct osmux_circuit { - struct llist_head head; - int ccid; - struct llist_head msg_list; - int nmsgs; - int dummy; -}; - -static int osmux_batch_enqueue(struct msgb *msg, struct osmux_circuit *circuit, - uint8_t batch_factor) -{ - /* Validate amount of messages per batch. The counter field of the - * osmux header is just 3 bits long, so make sure it doesn't overflow. - */ - if (circuit->nmsgs >= batch_factor || circuit->nmsgs >= 8) { - struct rtp_hdr *rtph; - - rtph = osmo_rtp_get_hdr(msg); - if (rtph == NULL) - return -1; - - LOGP(DLMUX, LOGL_DEBUG, "Batch is full for RTP " - "ssrc=%u\n", rtph->ssrc); - return -1; - } - - llist_add_tail(&msg->list, &circuit->msg_list); - circuit->nmsgs++; - return 0; -} - -static void osmux_batch_dequeue(struct msgb *msg, struct osmux_circuit *circuit) -{ - llist_del(&msg->list); - circuit->nmsgs--; -} - -static void osmux_circuit_del_msgs(struct osmux_batch *batch, struct osmux_circuit *circuit) -{ - struct msgb *cur, *tmp; - llist_for_each_entry_safe(cur, tmp, &circuit->msg_list, list) { - osmux_batch_dequeue(cur, circuit); - msgb_free(cur); - batch->nmsgs--; - } -} - -struct osmux_input_state { - struct msgb *out_msg; - struct msgb *msg; - struct rtp_hdr *rtph; - struct amr_hdr *amrh; - uint32_t amr_payload_len; - int ccid; - int add_osmux_hdr; -}; - -static int osmux_batch_put(struct osmux_batch *batch, - struct osmux_input_state *state) -{ - struct osmux_hdr *osmuxh; - - if (state->add_osmux_hdr) { - osmuxh = (struct osmux_hdr *)state->out_msg->tail; - osmuxh->ft = OSMUX_FT_VOICE_AMR; - osmuxh->ctr = 0; - osmuxh->rtp_m = osmuxh->rtp_m || state->rtph->marker; - osmuxh->amr_f = state->amrh->f; - osmuxh->amr_q= state->amrh->q; - osmuxh->seq = batch->seq++; - osmuxh->circuit_id = state->ccid; - osmuxh->amr_cmr = state->amrh->cmr; - osmuxh->amr_ft = state->amrh->ft; - msgb_put(state->out_msg, sizeof(struct osmux_hdr)); - - /* annotate current osmux header */ - batch->osmuxh = osmuxh; - } else { - if (batch->osmuxh->ctr == 0x7) { - LOGP(DLMUX, LOGL_ERROR, "cannot add msg=%p, " - "too many messages for this RTP ssrc=%u\n", - state->msg, state->rtph->ssrc); - return 0; - } - batch->osmuxh->ctr++; - } - - memcpy(state->out_msg->tail, osmo_amr_get_payload(state->amrh), - state->amr_payload_len); - msgb_put(state->out_msg, state->amr_payload_len); - - return 0; -} - -static int osmux_xfrm_encode_amr(struct osmux_batch *batch, - struct osmux_input_state *state) -{ - uint32_t amr_len; - - state->amrh = osmo_rtp_get_payload(state->rtph, state->msg, &amr_len); - if (state->amrh == NULL) - return -1; - - state->amr_payload_len = amr_len - sizeof(struct amr_hdr); - - if (osmux_batch_put(batch, state) < 0) - return -1; - - return 0; -} - -static void osmux_encode_dummy(struct osmux_batch *batch, uint8_t batch_factor, - struct osmux_input_state *state) -{ - struct osmux_hdr *osmuxh; - /* TODO: This should be configurable at some point. */ - uint32_t payload_size = osmux_ft_dummy_size(AMR_FT_3, batch_factor) - - sizeof(struct osmux_hdr); - - osmuxh = (struct osmux_hdr *)state->out_msg->tail; - osmuxh->ft = OSMUX_FT_DUMMY; - osmuxh->ctr = batch_factor - 1; - osmuxh->amr_f = 0; - osmuxh->amr_q= 0; - osmuxh->seq = 0; - osmuxh->circuit_id = state->ccid; - osmuxh->amr_cmr = 0; - osmuxh->amr_ft = AMR_FT_3; - msgb_put(state->out_msg, sizeof(struct osmux_hdr)); - - memset(state->out_msg->tail, 0xff, payload_size); - msgb_put(state->out_msg, payload_size); -} - -static struct msgb *osmux_build_batch(struct osmux_batch *batch, - uint32_t batch_size, uint8_t batch_factor) -{ - struct msgb *batch_msg; - struct osmux_circuit *circuit; - -#ifdef DEBUG_MSG - LOGP(DLMUX, LOGL_DEBUG, "Now building batch\n"); -#endif - - batch_msg = msgb_alloc(batch_size, "osmux"); - if (batch_msg == NULL) { - LOGP(DLMUX, LOGL_ERROR, "Not enough memory\n"); - return NULL; - } - - llist_for_each_entry(circuit, &batch->circuit_list, head) { - struct msgb *cur, *tmp; - int ctr = 0; - - if (circuit->dummy) { - struct osmux_input_state state = { - .out_msg = batch_msg, - .ccid = circuit->ccid, - }; - osmux_encode_dummy(batch, batch_factor, &state); - continue; - } - - llist_for_each_entry_safe(cur, tmp, &circuit->msg_list, list) { - struct osmux_input_state state = { - .msg = cur, - .out_msg = batch_msg, - .ccid = circuit->ccid, - }; -#ifdef DEBUG_MSG - char buf[4096]; - - osmo_rtp_snprintf(buf, sizeof(buf), cur); - buf[sizeof(buf)-1] = '\0'; - LOGP(DLMUX, LOGL_DEBUG, "to BSC-NAT: %s\n", buf); -#endif - - state.rtph = osmo_rtp_get_hdr(cur); - if (state.rtph == NULL) - return NULL; - - if (ctr == 0) { -#ifdef DEBUG_MSG - LOGP(DLMUX, LOGL_DEBUG, "add osmux header\n"); -#endif - state.add_osmux_hdr = 1; - } - - osmux_xfrm_encode_amr(batch, &state); - osmux_batch_dequeue(cur, circuit); - msgb_free(cur); - ctr++; - batch->nmsgs--; - } - } - return batch_msg; -} - -void osmux_xfrm_input_deliver(struct osmux_in_handle *h) -{ - struct msgb *batch_msg; - struct osmux_batch *batch = (struct osmux_batch *)h->internal_data; - -#ifdef DEBUG_MSG - LOGP(DLMUX, LOGL_DEBUG, "invoking delivery function\n"); -#endif - batch_msg = osmux_build_batch(batch, h->batch_size, h->batch_factor); - if (!batch_msg) - return; - h->stats.output_osmux_msgs++; - h->stats.output_osmux_bytes += batch_msg->len; - - h->deliver(batch_msg, h->data); - osmo_timer_del(&batch->timer); - batch->remaining_bytes = h->batch_size; - - if (batch->ndummy) { - osmo_timer_schedule(&batch->timer, 0, - h->batch_factor * DELTA_RTP_MSG); - } -} - -static void osmux_batch_timer_expired(void *data) -{ - struct osmux_in_handle *h = data; - -#ifdef DEBUG_MSG - LOGP(DLMUX, LOGL_DEBUG, "osmux_batch_timer_expired\n"); -#endif - osmux_xfrm_input_deliver(h); -} - -static int osmux_rtp_amr_payload_len(struct msgb *msg, struct rtp_hdr *rtph) -{ - struct amr_hdr *amrh; - unsigned int amr_len; - int amr_payload_len; - - amrh = osmo_rtp_get_payload(rtph, msg, &amr_len); - if (amrh == NULL) - return -1; - - if (!osmo_amr_ft_valid(amrh->ft)) - return -1; - - amr_payload_len = amr_len - sizeof(struct amr_hdr); - - /* The AMR payload does not fit with what we expect */ - if (osmo_amr_bytes(amrh->ft) != amr_payload_len) { - LOGP(DLMUX, LOGL_ERROR, - "Bad AMR frame, expected %zd bytes, got %d bytes\n", - osmo_amr_bytes(amrh->ft), amr_len); - return -1; - } - return amr_payload_len; -} - -static void osmux_replay_lost_packets(struct osmux_circuit *circuit, - struct rtp_hdr *cur_rtph, int batch_factor) -{ - int16_t diff; - struct msgb *last; - struct rtp_hdr *rtph; - int i; - - /* Have we see any RTP packet in this batch before? */ - if (llist_empty(&circuit->msg_list)) - return; - - /* Get last RTP packet seen in this batch */ - last = llist_entry(circuit->msg_list.prev, struct msgb, list); - rtph = osmo_rtp_get_hdr(last); - if (rtph == NULL) - return; - - diff = ntohs(cur_rtph->sequence) - ntohs(rtph->sequence); - - /* Lifesaver: make sure bugs don't spawn lots of clones */ - if (diff > 16) - diff = 16; - - /* If diff between last RTP packet seen and this one is > 1, - * then we lost several RTP packets, let's replay them. - */ - for (i=1; i<diff; i++) { - struct msgb *clone; - - /* Clone last RTP packet seen */ - clone = msgb_alloc(last->data_len, "RTP clone"); - if (!clone) - continue; - - memcpy(clone->data, last->data, last->len); - msgb_put(clone, last->len); - - /* The original RTP message has been already sanity check. */ - rtph = osmo_rtp_get_hdr(clone); - - /* Adjust sequence number and timestamp */ - rtph->sequence = htons(ntohs(rtph->sequence) + i); - rtph->timestamp = htonl(ntohl(rtph->timestamp) + - DELTA_RTP_TIMESTAMP); - - /* No more room in this batch, skip padding with more clones */ - if (osmux_batch_enqueue(clone, circuit, batch_factor) < 0) { - msgb_free(clone); - break; - } - - LOGP(DLMUX, LOGL_ERROR, "adding cloned RTP\n"); - } -} - -static struct osmux_circuit * -osmux_batch_find_circuit(struct osmux_batch *batch, int ccid) -{ - struct osmux_circuit *circuit; - - llist_for_each_entry(circuit, &batch->circuit_list, head) { - if (circuit->ccid == ccid) - return circuit; - } - return NULL; -} - -static struct osmux_circuit * -osmux_batch_add_circuit(struct osmux_batch *batch, int ccid, int dummy, - uint8_t batch_factor) -{ - struct osmux_circuit *circuit; - - circuit = osmux_batch_find_circuit(batch, ccid); - if (circuit != NULL) { - LOGP(DLMUX, LOGL_ERROR, "circuit %u already exists!\n", ccid); - return NULL; - } - - circuit = talloc_zero(osmux_ctx, struct osmux_circuit); - if (circuit == NULL) { - LOGP(DLMUX, LOGL_ERROR, "OOM on circuit %u\n", ccid); - return NULL; - } - - circuit->ccid = ccid; - INIT_LLIST_HEAD(&circuit->msg_list); - llist_add_tail(&circuit->head, &batch->circuit_list); - - if (dummy) { - circuit->dummy = dummy; - batch->ndummy++; - if (!osmo_timer_pending(&batch->timer)) - osmo_timer_schedule(&batch->timer, 0, - batch_factor * DELTA_RTP_MSG); - } - return circuit; -} - -static void osmux_batch_del_circuit(struct osmux_batch *batch, struct osmux_circuit *circuit) -{ - if (circuit->dummy) - batch->ndummy--; - llist_del(&circuit->head); - osmux_circuit_del_msgs(batch, circuit); - talloc_free(circuit); -} - -static int -osmux_batch_add(struct osmux_batch *batch, uint32_t batch_factor, struct msgb *msg, - struct rtp_hdr *rtph, int ccid) -{ - int bytes = 0, amr_payload_len; - struct osmux_circuit *circuit; - struct msgb *cur; - - circuit = osmux_batch_find_circuit(batch, ccid); - if (!circuit) - return -1; - - /* We've seen the first RTP message, disable dummy padding */ - if (circuit->dummy) { - circuit->dummy = 0; - batch->ndummy--; - } - amr_payload_len = osmux_rtp_amr_payload_len(msg, rtph); - if (amr_payload_len < 0) - return -1; - - /* First check if there is room for this message in the batch */ - bytes += amr_payload_len; - if (circuit->nmsgs == 0) - bytes += sizeof(struct osmux_hdr); - - /* No room, sorry. You'll have to retry */ - if (bytes > batch->remaining_bytes) - return 1; - - /* Init of talkspurt (RTP M marker bit) needs to be in the first AMR slot - * of the OSMUX packet, enforce sending previous batch if required: - */ - if (rtph->marker && circuit->nmsgs != 0) - return 1; - - - /* Extra validation: check if this message already exists, should not - * happen but make sure we don't propagate duplicated messages. - */ - llist_for_each_entry(cur, &circuit->msg_list, list) { - struct rtp_hdr *rtph2 = osmo_rtp_get_hdr(cur); - if (rtph2 == NULL) - return -1; - - /* Already exists message with this sequence, skip */ - if (rtph2->sequence == rtph->sequence) { - LOGP(DLMUX, LOGL_ERROR, "already exists " - "message with seq=%u, skip it\n", - rtph->sequence); - return -1; - } - } - /* Handle RTP packet loss scenario */ - osmux_replay_lost_packets(circuit, rtph, batch_factor); - - /* This batch is full, force batch delivery */ - if (osmux_batch_enqueue(msg, circuit, batch_factor) < 0) - return 1; - -#ifdef DEBUG_MSG - LOGP(DLMUX, LOGL_DEBUG, "adding msg with ssrc=%u to batch\n", - rtph->ssrc); -#endif - - /* Update remaining room in this batch */ - batch->remaining_bytes -= bytes; - - if (batch->nmsgs == 0) { -#ifdef DEBUG_MSG - LOGP(DLMUX, LOGL_DEBUG, "osmux start timer batch\n"); -#endif - osmo_timer_schedule(&batch->timer, 0, - batch_factor * DELTA_RTP_MSG); - } - batch->nmsgs++; - - return 0; -} - -/** - * osmux_xfrm_input - add RTP message to OSmux batch - * \param msg: RTP message that you want to batch into one OSmux message - * - * If 0 is returned, this indicates that the message has been batched or that - * an error occured and we have skipped the message. If 1 is returned, you - * have to invoke osmux_xfrm_input_deliver and try again. - * - * The function takes care of releasing the messages in case of error and - * when building the batch. - */ -int osmux_xfrm_input(struct osmux_in_handle *h, struct msgb *msg, int ccid) -{ - int ret; - struct rtp_hdr *rtph; - struct osmux_batch *batch = (struct osmux_batch *)h->internal_data; - - /* Ignore too big RTP/RTCP messages, most likely forged. Sanity check - * to avoid a possible forever loop in the caller. - */ - if (msg->len > h->batch_size - sizeof(struct osmux_hdr)) { - msgb_free(msg); - return 0; - } - - rtph = osmo_rtp_get_hdr(msg); - if (rtph == NULL) { - msgb_free(msg); - return 0; - } - - switch(rtph->payload_type) { - case RTP_PT_RTCP: - msgb_free(msg); - return 0; - default: - /* The RTP payload type is dynamically allocated, - * although we use static ones. Assume that we always - * receive AMR traffic. - */ - - /* Add this RTP to the OSMUX batch */ - ret = osmux_batch_add(batch, h->batch_factor, - msg, rtph, ccid); - if (ret < 0) { - /* Cannot put this message into the batch. - * Malformed, duplicated, OOM. Drop it and tell - * the upper layer that we have digest it. - */ - msgb_free(msg); - return 0; - } - - h->stats.input_rtp_msgs++; - h->stats.input_rtp_bytes += msg->len; - break; - } - return ret; -} - -void osmux_xfrm_input_init(struct osmux_in_handle *h) -{ - struct osmux_batch *batch; - - /* Default to osmux packet size if not specified */ - if (h->batch_size == 0) - h->batch_size = OSMUX_BATCH_DEFAULT_MAX; - - batch = talloc_zero(osmux_ctx, struct osmux_batch); - if (batch == NULL) - return; - - INIT_LLIST_HEAD(&batch->circuit_list); - batch->remaining_bytes = h->batch_size; - osmo_timer_setup(&batch->timer, osmux_batch_timer_expired, h); - - h->internal_data = (void *)batch; - - LOGP(DLMUX, LOGL_DEBUG, "initialized osmux input converter\n"); -} - -/*! \brief Set transmission callback to call when a generated RTP packet is to be transmitted - * \param[in] h the osmux out handle handling a specific CID - * \param[in] osmuxh Buffer pointing to osmux frame header structure and AMR payload - * \return Number of generated RTP packets - * - * This Function sets the callback called by the interal timer set by - * osmux_xfrm_out_sched function. - */ -void osmux_xfrm_output_set_tx_cb(struct osmux_out_handle *h, - void (*tx_cb)(struct msgb *msg, void *data), - void *data) -{ - h->tx_cb = tx_cb; - h->data = data; -} - -int osmux_xfrm_input_open_circuit(struct osmux_in_handle *h, int ccid, - int dummy) -{ - struct osmux_batch *batch = (struct osmux_batch *)h->internal_data; - - return osmux_batch_add_circuit(batch, ccid, dummy, h->batch_factor) ? 0 : -1; -} - -void osmux_xfrm_input_close_circuit(struct osmux_in_handle *h, int ccid) -{ - struct osmux_batch *batch = (struct osmux_batch *)h->internal_data; - struct osmux_circuit *circuit; - - circuit = osmux_batch_find_circuit(batch, ccid); - if (circuit == NULL) - return; - - osmux_batch_del_circuit(batch, circuit); -} - -void osmux_xfrm_input_fini(struct osmux_in_handle *h) -{ - struct osmux_batch *batch = (struct osmux_batch *)h->internal_data; - struct osmux_circuit *circuit, *next; - - llist_for_each_entry_safe(circuit, next, &batch->circuit_list, head) - osmux_batch_del_circuit(batch, circuit); - - osmo_timer_del(&batch->timer); - talloc_free(batch); -} - -struct osmux_tx_handle { - struct osmo_timer_list timer; - struct msgb *msg; - void (*tx_cb)(struct msgb *msg, void *data); - void *data; -#ifdef DEBUG_TIMING - struct timeval start; - struct timeval when; -#endif -}; - -static void osmux_tx_cb(void *data) -{ - struct osmux_tx_handle *h = data; -#ifdef DEBUG_TIMING - struct timeval now, diff; - - osmo_gettimeofday(&now, NULL); - timersub(&now, &h->start, &diff); - timersub(&diff,&h->when, &diff); - LOGP(DLMUX, LOGL_DEBUG, "we are lagging %lu.%.6lu in scheduled " - "transmissions\n", diff.tv_sec, diff.tv_usec); -#endif - - h->tx_cb(h->msg, h->data); - - talloc_free(h); -} - -static void -osmux_tx(struct msgb *msg, struct timeval *when, - void (*tx_cb)(struct msgb *msg, void *data), void *data) -{ - struct osmux_tx_handle *h; - - h = talloc_zero(osmux_ctx, struct osmux_tx_handle); - if (h == NULL) - return; - - h->msg = msg; - h->tx_cb = tx_cb; - h->data = data; - osmo_timer_setup(&h->timer, osmux_tx_cb, h); - -#ifdef DEBUG_TIMING - osmo_gettimeofday(&h->start, NULL); - h->when.tv_sec = when->tv_sec; - h->when.tv_usec = when->tv_usec; -#endif - /* send it now */ - if (when->tv_sec == 0 && when->tv_usec == 0) { - osmux_tx_cb(h); - return; - } - osmo_timer_schedule(&h->timer, when->tv_sec, when->tv_usec); -} - -void -osmux_tx_sched(struct llist_head *list, - void (*tx_cb)(struct msgb *msg, void *data), void *data) -{ - struct msgb *cur, *tmp; - struct timeval delta = { .tv_sec = 0, .tv_usec = DELTA_RTP_MSG }; - struct timeval when; - - timerclear(&when); - - llist_for_each_entry_safe(cur, tmp, list, list) { - -#ifdef DEBUG_MSG - LOGP(DLMUX, LOGL_DEBUG, "scheduled transmision in %lu.%6lu " - "seconds, msg=%p\n", when.tv_sec, when.tv_usec, cur); -#endif - llist_del(&cur->list); - osmux_tx(cur, &when, tx_cb, data); - timeradd(&when, &delta, &when); - } -} - -void osmux_xfrm_output_init2(struct osmux_out_handle *h, uint32_t rtp_ssrc, uint8_t rtp_payload_type) -{ - memset(h, 0, sizeof(*h)); - h->rtp_seq = (uint16_t)random(); - h->rtp_timestamp = (uint32_t)random(); - h->rtp_ssrc = rtp_ssrc; - h->rtp_payload_type = rtp_payload_type; - INIT_LLIST_HEAD(&h->list); - osmo_timer_setup(&h->timer, osmux_xfrm_output_trigger, h); -} - -void osmux_xfrm_output_init(struct osmux_out_handle *h, uint32_t rtp_ssrc) -{ - /* backward compatibility with old users, where 98 was harcoded in osmux_rebuild_rtp() */ - osmux_xfrm_output_init2(h, rtp_ssrc, 98); -} - -#define SNPRINTF_BUFFER_SIZE(ret, remain, offset) \ - if (ret < 0) \ - ret = 0; \ - offset += ret; \ - if (ret > remain) \ - ret = remain; \ - remain -= ret; - static int osmux_snprintf_header(char *buf, size_t size, struct osmux_hdr *osmuxh) { unsigned int remain = size, offset = 0; int ret; ret = snprintf(buf, remain, "OSMUX seq=%03u ccid=%03u " - "ft=%01u ctr=%01u " + "ft=%01u rtp_m=%01u ctr=%01u " "amr_f=%01u amr_q=%01u " - "amr_ft=%02u amr_cmr=%02u ", + "amr_ft=%02u amr_cmr=%02u", osmuxh->seq, osmuxh->circuit_id, - osmuxh->ft, osmuxh->ctr, + osmuxh->ft, osmuxh->rtp_m, osmuxh->ctr, osmuxh->amr_f, osmuxh->amr_q, osmuxh->amr_ft, osmuxh->amr_cmr); SNPRINTF_BUFFER_SIZE(ret, remain, offset); @@ -1093,7 +126,10 @@ int osmux_snprintf(char *buf, size_t size, struct msgb *msg) return -1; } osmuxh = (struct osmux_hdr *)((uint8_t *)msg->data + msg_off); - + if (msg_off) { + ret = snprintf(buf + offset, remain, ", "); + SNPRINTF_BUFFER_SIZE(ret, remain, offset); + } ret = osmux_snprintf_header(buf + offset, remain, osmuxh); SNPRINTF_BUFFER_SIZE(ret, remain, offset); @@ -1123,6 +159,8 @@ int osmux_snprintf(char *buf, size_t size, struct msgb *msg) } if (osmuxh->ft == OSMUX_FT_VOICE_AMR) { + ret = snprintf(buf + offset, remain, " "); + SNPRINTF_BUFFER_SIZE(ret, remain, offset); ret = osmux_snprintf_payload(buf + offset, remain, osmux_get_payload(osmuxh), payload_len); diff --git a/src/osmux_input.c b/src/osmux_input.c new file mode 100644 index 0000000..2184a08 --- /dev/null +++ b/src/osmux_input.c @@ -0,0 +1,857 @@ +/* + * (C) 2012-2017 by Pablo Neira Ayuso <pablo@gnumonks.org> + * (C) 2012 by On Waves ehf <http://www.on-waves.com> + * (C) 2015-2022 by sysmocom - s.f.m.c. GmbH + * + * SPDX-License-Identifier: GPL-2.0+ + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + */ + +#include <stdio.h> +#include <string.h> +#include <inttypes.h> + +#include <osmocom/core/msgb.h> +#include <osmocom/core/timer.h> +#include <osmocom/core/timer_compat.h> +#include <osmocom/core/select.h> +#include <osmocom/core/talloc.h> +#include <osmocom/core/logging.h> + +#include <osmocom/netif/amr.h> +#include <osmocom/netif/rtp.h> +#include <osmocom/netif/osmux.h> + +#include <arpa/inet.h> + +/* This allows you to debug osmux message transformations (spamming) */ +#if 0 +#define DEBUG_MSG 0 +#endif + +/* delta time between two RTP messages (in microseconds) */ +#define DELTA_RTP_MSG 20000 +/* delta time between two RTP messages (in samples, 8kHz) */ +#define DELTA_RTP_TIMESTAMP 160 + +#define LOGMUXLK_(link, lvl, fmt, args ...) \ + LOGP(DLMUX, lvl, "[%s,%u/%" PRIu16 "]" fmt, \ + (link)->name, (link)->h->batch_size - (link)->remaining_bytes, \ + (link)->h->batch_size, \ + ## args) + +#define LOGMUXLK(link, lvl, fmt, args ...) \ + LOGMUXLK_(link, lvl, " " fmt, ## args) + +#define LOGMUXCID(link, circuit, lvl, fmt, args ...) \ + LOGMUXLK_(link, lvl, "[CID=%" PRIu8 ",batched=%u/%u] " fmt, \ + (circuit)->ccid, (circuit)->nmsgs, (link)->h->batch_factor, ## args) + +/*! \addtogroup osmux Osmocom Multiplex Protocol + * @{ + * + * This code implements a variety of utility functions related to the + * OSMUX user-plane multiplexing protocol, an efficient alternative to + * plain UDP/RTP streams for voice transport in back-haul of cellular + * networks. + * + * For information about the OSMUX protocol design, please see the + * OSMUX reference manual at + * http://ftp.osmocom.org/docs/latest/osmux-reference.pdf + */ + +/*! \file osmux_input.c + * \brief Osmocom multiplex protocol helpers (input) + */ + +static void *osmux_ctx; + +static uint32_t osmux_ft_dummy_size(uint8_t amr_ft, uint8_t batch_factor) +{ + return sizeof(struct osmux_hdr) + (osmo_amr_bytes(amr_ft) * batch_factor); +} + +/* This is (struct osmux_in_handle)->internal_data. + * TODO: API have been defined to access all fields of osmux_in_handle + * (deprecated osmux_xfrm_input_init()), hence at some point we remove struct + * osmux_in_handle definition from osmux.h and we move it here internally and + * merge it with struct osmux_link. + */ +struct osmux_link { + struct osmo_timer_list timer; + struct osmux_hdr *osmuxh; + struct llist_head circuit_list; + unsigned int remaining_bytes; + uint32_t nmsgs; + int ndummy; + char *name; + struct osmux_in_handle *h; /* backpointer to parent object */ +}; + +struct osmux_circuit { + struct llist_head head; + int ccid; + struct llist_head msg_list; + int nmsgs; + int dummy; + uint8_t seq; + int32_t last_transmitted_rtp_seq; /* -1 = unset */ + uint32_t last_transmitted_rtp_ts; /* Check last_transmitted_rtp_seq = -1 to detect unset */ +}; + +/* Used internally to temporarily cache all parsed content of an RTP pkt from user to be transmitted as Osmux */ +struct osmux_in_req { + struct osmux_circuit *circuit; + struct msgb *msg; + struct rtp_hdr *rtph; + uint32_t rtp_payload_len; + struct amr_hdr *amrh; + int amr_payload_len; +}; + +/* returns: 1 if batch is full, 0 if batch still not full, negative on error. */ +static int osmux_circuit_enqueue(struct osmux_link *link, struct osmux_circuit *circuit, struct msgb *msg) +{ + /* Validate amount of messages per batch. The counter field of the + * osmux header is just 3 bits long, so make sure it doesn't overflow. + */ + OSMO_ASSERT(link->h->batch_factor <= 8); + if (circuit->nmsgs >= link->h->batch_factor) { + struct rtp_hdr *rtph; + + rtph = osmo_rtp_get_hdr(msg); + if (rtph == NULL) + return -1; + + LOGMUXCID(link, circuit, LOGL_DEBUG, "Batch is full for RTP sssrc=%u\n", rtph->ssrc); + return 1; + } + + llist_add_tail(&msg->list, &circuit->msg_list); + circuit->nmsgs++; + return 0; +} + +static void osmux_circuit_dequeue(struct osmux_circuit *circuit, struct msgb *msg) +{ + llist_del(&msg->list); + circuit->nmsgs--; +} + +static void osmux_circuit_del_msgs(struct osmux_link *link, struct osmux_circuit *circuit) +{ + struct msgb *cur, *tmp; + llist_for_each_entry_safe(cur, tmp, &circuit->msg_list, list) { + osmux_circuit_dequeue(circuit, cur); + msgb_free(cur); + link->nmsgs--; + } +} + +struct osmux_input_state { + struct msgb *out_msg; + struct msgb *msg; + struct rtp_hdr *rtph; + struct amr_hdr *amrh; + uint32_t amr_payload_len; + struct osmux_circuit *circuit; + int add_osmux_hdr; +}; + +static int osmux_link_put(struct osmux_link *link, struct osmux_input_state *state) +{ + uint16_t rtp_seqnum = ntohs(state->rtph->sequence); + + if (state->add_osmux_hdr) { + bool seq_jump = state->circuit->last_transmitted_rtp_seq != -1 && + ((state->circuit->last_transmitted_rtp_seq + 1) & 0xffff) != rtp_seqnum; + struct osmux_hdr *osmuxh; + osmuxh = (struct osmux_hdr *)msgb_put(state->out_msg, + sizeof(struct osmux_hdr)); + osmuxh->ft = OSMUX_FT_VOICE_AMR; + osmuxh->ctr = 0; + osmuxh->rtp_m = state->rtph->marker || seq_jump; + osmuxh->seq = state->circuit->seq++; + osmuxh->circuit_id = state->circuit->ccid; + osmuxh->amr_ft = state->amrh->ft; + + /* annotate current osmux header */ + link->osmuxh = osmuxh; + } else { + if (link->osmuxh->ctr == 0x7) { + LOGMUXCID(link, state->circuit, LOGL_ERROR, + "Cannot encode RTP pkt ssrc=%u into osmux batch, too many packets\n", + state->rtph->ssrc); + return 0; + } + link->osmuxh->ctr++; + } + /* For fields below, we only use the last included in batch and ignore any previous: */ + link->osmuxh->amr_cmr = state->amrh->cmr; + link->osmuxh->amr_f = state->amrh->f; + link->osmuxh->amr_q = state->amrh->q; + + if (state->amr_payload_len > 0) { + memcpy(state->out_msg->tail, osmo_amr_get_payload(state->amrh), + state->amr_payload_len); + msgb_put(state->out_msg, state->amr_payload_len); + } + + /* Update circuit state of last transmitted incoming RTP seqnum/ts: */ + state->circuit->last_transmitted_rtp_seq = rtp_seqnum; + state->circuit->last_transmitted_rtp_ts = ntohl(state->rtph->timestamp); + return 0; +} + +static void osmux_encode_dummy(struct osmux_link *link, struct osmux_input_state *state) +{ + struct osmux_hdr *osmuxh; + /* TODO: This should be configurable at some point. */ + uint32_t payload_size = osmux_ft_dummy_size(AMR_FT_3, link->h->batch_factor) - + sizeof(struct osmux_hdr); + + osmuxh = (struct osmux_hdr *)state->out_msg->tail; + osmuxh->ft = OSMUX_FT_DUMMY; + osmuxh->ctr = link->h->batch_factor - 1; + osmuxh->amr_f = 0; + osmuxh->amr_q = 0; + osmuxh->seq = 0; + osmuxh->circuit_id = state->circuit->ccid; + osmuxh->amr_cmr = 0; + osmuxh->amr_ft = AMR_FT_3; + msgb_put(state->out_msg, sizeof(struct osmux_hdr)); + + memset(state->out_msg->tail, 0xff, payload_size); + msgb_put(state->out_msg, payload_size); +} + +static struct msgb *osmux_build_batch(struct osmux_link *link) +{ + struct msgb *batch_msg; + struct osmux_circuit *circuit; + +#ifdef DEBUG_MSG + LOGMUXLK(link, LOGL_DEBUG, "Now building batch\n"); +#endif + + batch_msg = msgb_alloc(link->h->batch_size, "osmux"); + if (batch_msg == NULL) { + LOGMUXLK(link, LOGL_ERROR, "Not enough memory\n"); + return NULL; + } + + llist_for_each_entry(circuit, &link->circuit_list, head) { + struct msgb *cur, *tmp; + int ctr = 0; + int prev_amr_ft; + + if (circuit->dummy) { + struct osmux_input_state state = { + .out_msg = batch_msg, + .circuit = circuit, + }; + osmux_encode_dummy(link, &state); + continue; + } + + llist_for_each_entry_safe(cur, tmp, &circuit->msg_list, list) { + struct osmux_input_state state = { + .msg = cur, + .out_msg = batch_msg, + .circuit = circuit, + }; + uint32_t amr_len; +#ifdef DEBUG_MSG + char buf[4096]; + + osmo_rtp_snprintf(buf, sizeof(buf), cur); + buf[sizeof(buf)-1] = '\0'; + LOGMUXCID(link, circuit, LOGL_DEBUG, "to BSC-NAT: %s\n", buf); +#endif + + state.rtph = osmo_rtp_get_hdr(cur); + if (!state.rtph) + return NULL; + state.amrh = osmo_rtp_get_payload(state.rtph, state.msg, &amr_len); + if (!state.amrh) + return NULL; + state.amr_payload_len = amr_len - sizeof(struct amr_hdr); + + if (ctr == 0) { +#ifdef DEBUG_MSG + LOGMUXCID(link, circuit, LOGL_DEBUG, "Add osmux header (First in batch)\n"); +#endif + state.add_osmux_hdr = 1; + } else if (prev_amr_ft != state.amrh->ft) { + /* If AMR FT changed, we have to generate an extra batch osmux header: */ +#ifdef DEBUG_MSG + LOGMUXCID(link, circuit, LOGL_DEBUG, "Add osmux header (New AMR FT)\n"); +#endif + state.add_osmux_hdr = 1; + } + + osmux_link_put(link, &state); + osmux_circuit_dequeue(circuit, cur); + prev_amr_ft = state.amrh->ft; + ctr++; + msgb_free(cur); + link->nmsgs--; + } + } + return batch_msg; +} + +void osmux_xfrm_input_deliver(struct osmux_in_handle *h) +{ + struct msgb *batch_msg; + struct osmux_link *link = (struct osmux_link *)h->internal_data; + +#ifdef DEBUG_MSG + LOGMUXLK(link, LOGL_DEBUG, "Invoking delivery function\n"); +#endif + batch_msg = osmux_build_batch(link); + if (!batch_msg) + return; + h->stats.output_osmux_msgs++; + h->stats.output_osmux_bytes += batch_msg->len; + + h->deliver(batch_msg, h->data); + osmo_timer_del(&link->timer); + link->remaining_bytes = h->batch_size; + + if (link->ndummy) + osmo_timer_schedule(&link->timer, 0, h->batch_factor * DELTA_RTP_MSG); +} + +static void osmux_link_timer_expired(void *data) +{ + struct osmux_in_handle *h = data; + +#ifdef DEBUG_MSG + const struct osmux_link *link = (struct osmux_link *)h->internal_data; + LOGMUXLK(link, LOGL_DEBUG, "Batch delivery timer timeout\n"); +#endif + osmux_xfrm_input_deliver(h); +} + +static int osmux_rtp_amr_payload_len(struct amr_hdr *amrh, uint32_t amr_len) +{ + int amr_payload_len; + + if (!osmo_amr_ft_valid(amrh->ft)) + return -1; + + amr_payload_len = amr_len - sizeof(struct amr_hdr); + + /* The AMR payload does not fit with what we expect */ + if (osmo_amr_bytes(amrh->ft) != amr_payload_len) { + LOGP(DLMUX, LOGL_ERROR, + "Bad AMR frame FT=%u, expected %zd bytes, got %d bytes\n", + amrh->ft, osmo_amr_bytes(amrh->ft), amr_len); + return -1; + } + return amr_payload_len; +} + +/* Last stored AMR FT to be added in the current osmux batch. -1 if none stored yet */ +static int osmux_circuit_get_last_stored_amr_ft(struct osmux_circuit *circuit) +{ + struct msgb *last; + struct rtp_hdr *rtph; + struct amr_hdr *amrh; + uint32_t amr_len; + /* Have we seen any RTP packet in this batch before? */ + if (llist_empty(&circuit->msg_list)) + return -1; + OSMO_ASSERT(circuit->nmsgs > 0); + + /* Get last RTP packet seen in this batch */ + last = llist_entry(circuit->msg_list.prev, struct msgb, list); + rtph = osmo_rtp_get_hdr(last); + amrh = osmo_rtp_get_payload(rtph, last, &amr_len); + if (amrh == NULL) + return -1; + return amrh->ft; + +} + +static struct osmux_circuit * +osmux_link_find_circuit(struct osmux_link *link, int ccid) +{ + struct osmux_circuit *circuit; + + llist_for_each_entry(circuit, &link->circuit_list, head) { + if (circuit->ccid == ccid) + return circuit; + } + return NULL; +} + +static void osmux_link_del_circuit(struct osmux_link *link, struct osmux_circuit *circuit) +{ + if (circuit->dummy) + link->ndummy--; + llist_del(&circuit->head); + osmux_circuit_del_msgs(link, circuit); + talloc_free(circuit); +} + +/* returns: 1 if batch is full, 0 if batch still not full, negative on error. */ +static int osmux_link_add(struct osmux_link *link, const struct osmux_in_req *req) +{ + unsigned int needed_bytes = 0; + int rc; + /* Init of talkspurt (RTP M marker bit) needs to be in the first AMR slot + * of the OSMUX packet, enforce sending previous batch if required: + */ + if (req->rtph->marker && req->circuit->nmsgs != 0) + return 1; + + /* First check if there is room for this message in the batch */ + /* First in batch comes after the batch header: */ + if (req->circuit->nmsgs == 0) + needed_bytes += sizeof(struct osmux_hdr); + /* If AMR FT changes in the middle of current batch a new header is + * required to adapt to size change: */ + else if (osmux_circuit_get_last_stored_amr_ft(req->circuit) != req->amrh->ft) + needed_bytes += sizeof(struct osmux_hdr); + needed_bytes += req->amr_payload_len; + + /* No room, sorry. You'll have to retry */ + if (needed_bytes > link->remaining_bytes) + return 1; + + /* This batch is full, force batch delivery */ + rc = osmux_circuit_enqueue(link, req->circuit, req->msg); + if (rc != 0) + return rc; + +#ifdef DEBUG_MSG + LOGMUXCID(link, req->circuit, LOGL_DEBUG, "Adding msg with ssrc=%u to batch\n", + req->rtph->ssrc); +#endif + + /* Update remaining room in this batch */ + link->remaining_bytes -= needed_bytes; + + if (link->nmsgs == 0) { +#ifdef DEBUG_MSG + LOGMUXLK(link, LOGL_DEBUG, "Osmux start batch delivery timer\n"); +#endif + osmo_timer_schedule(&link->timer, 0, + link->h->batch_factor * DELTA_RTP_MSG); + } + link->nmsgs++; + + return 0; +}; + +/* returns: 1 if batch is full, 0 if batch still not full, negative on error. */ +static int osmux_replay_lost_packets(struct osmux_link *link, const struct osmux_in_req *req) +{ + int16_t diff; + uint16_t lost_pkts; + struct msgb *copy_from; + uint16_t last_seq, cur_seq; + uint32_t last_ts; + int i, rc; + struct osmux_in_req clone_req; + + /* If M bit is set, this is a sync point, so any sort of seq jump is expected and has no real meaning. */ + if (req->rtph->marker) + return 0; + + /* Have we seen any RTP packet in this batch before? */ + if (llist_empty(&req->circuit->msg_list)) { + /* Since current batch is empty, it can be assumed: + * 1- circuit->last_transmitted_rtp_seq is either unset or really contains the last RTP enqueued + * 2- This pkt will generate an osmuxhdr and hence there's no + * restriction on the FT, as opposite to the case where the batch + * is half full + * Conclusion: + * 1- It is fine using circuit->last_transmitted_rtp_seq as last enqueued RTP header to detect seqnum jumps. + * 2- It is fine filling holes at the start of the batch by using current req->rtph. + */ + if (req->circuit->last_transmitted_rtp_seq == -1) + return 0; /* first message in circuit, do nothing */ + copy_from = req->msg; + last_seq = req->circuit->last_transmitted_rtp_seq; + last_ts = req->circuit->last_transmitted_rtp_ts; + } else { + /* Get last RTP packet seen in this batch, so that we simply keep filling with same osmuxhdr */ + struct rtp_hdr *last_rtph; + copy_from = llist_entry(req->circuit->msg_list.prev, struct msgb, list); + last_rtph = osmo_rtp_get_hdr(copy_from); + if (last_rtph == NULL) + return -1; + last_seq = ntohs(last_rtph->sequence); + last_ts = ntohl(last_rtph->timestamp); + } + cur_seq = ntohs(req->rtph->sequence); + diff = cur_seq - last_seq; + + /* If diff between last RTP packet seen and this one is > 1, + * then we lost several RTP packets, let's replay them. + */ + if (diff <= 1) + return 0; + lost_pkts = diff - 1; + + LOGMUXCID(link, req->circuit, LOGL_INFO, + "RTP seq jump detected: %" PRIu16 " -> %" PRIu16 " (%" PRId16 + " lost packets)\n", + last_seq, cur_seq, lost_pkts); + + /* We know we can feed only up to batch_factor before osmux_link_add() + * returning 1 signalling "transmission needed, call deliver() and retry". + * Hence, it doesn't make sense to even attempt recreating a big number of + * RTP packets (>batch_factor). + */ + if (lost_pkts > link->h->batch_factor - req->circuit->nmsgs) { + if (llist_empty(&req->circuit->msg_list)) { + /* If we are starting a batch, it doesn't make sense to keep filling entire + * batches with lost packets, since it could potentially end up in a loop if + * the lost_pkts value is huge. Instead, avoid recreating packets and let the + * osmux encoder set an M bit on the osmuxhdr when acting upon current req->rtph. + */ + return 0; + } + lost_pkts = link->h->batch_factor - req->circuit->nmsgs; + } + + rc = 0; + clone_req = *req; + for (i = 0; i < lost_pkts; i++) { + /* Clone last (or new if last not available) RTP packet seen */ + clone_req.msg = msgb_copy(copy_from, "RTP clone"); + if (!clone_req.msg) + continue; + + /* The original RTP message has been already sanity checked. */ + clone_req.rtph = osmo_rtp_get_hdr(clone_req.msg); + clone_req.amrh = osmo_rtp_get_payload(clone_req.rtph, clone_req.msg, &clone_req.rtp_payload_len); + clone_req.amr_payload_len = osmux_rtp_amr_payload_len(clone_req.amrh, clone_req.rtp_payload_len); + + /* Faking a follow up RTP pkt here, so no Marker bit: */ + clone_req.rtph->marker = false; + /* Adjust sequence number and timestamp */ + clone_req.rtph->sequence = htons(last_seq + 1 + i); + clone_req.rtph->timestamp = last_ts + ((1 + i) * DELTA_RTP_TIMESTAMP); + rc = osmux_link_add(link, &clone_req); + /* No more room in this batch, skip padding with more clones */ + if (rc != 0) { + msgb_free(clone_req.msg); + return rc; + } + } + return rc; +} + +/* returns: 1 if batch is full, 0 if batch still not full, negative on error. */ +static int osmux_link_handle_rtp_req(struct osmux_link *link, struct osmux_in_req *req) +{ + struct msgb *cur, *next; + int rc; + + /* We've seen the first RTP message, disable dummy padding */ + if (req->circuit->dummy) { + req->circuit->dummy = 0; + link->ndummy--; + } + + /* Extra validation: check if this message already exists, should not + * happen but make sure we don't propagate duplicated messages. + */ + llist_for_each_entry_safe(cur, next, &req->circuit->msg_list, list) { + struct rtp_hdr *rtph2 = osmo_rtp_get_hdr(cur); + OSMO_ASSERT(rtph2); + + /* Already exists message with this sequence. Let's copy over + * the new RTP, since there's the chance that the existing one may + * be a forged copy we did when we detected a hole. */ + if (rtph2->sequence == req->rtph->sequence) { + if (msgb_length(cur) != msgb_length(req->msg)) { + /* Different packet size, AMR FT may have changed. Let's avoid changing it to + * break accounted size to be written (would need new osmux_hdr, etc.) */ + LOGMUXCID(link, req->circuit, LOGL_NOTICE, + "RTP pkt with seq=%u and different len %u != %u already exists, skip it\n", + ntohs(req->rtph->sequence), msgb_length(cur), msgb_length(req->msg)); + return -1; + } + LOGMUXCID(link, req->circuit, LOGL_INFO, + "RTP pkt with seq=%u already exists, replace it\n", + ntohs(req->rtph->sequence)); + __llist_add(&req->msg->list, &cur->list, cur->list.next); + llist_del(&cur->list); + msgb_free(cur); + return 0; + } + } + + /* Handle RTP packet loss scenario */ + rc = osmux_replay_lost_packets(link, req); + if (rc != 0) + return rc; + + return osmux_link_add(link, req); +} + +/** + * osmux_xfrm_input - add RTP message to OSmux batch + * \param msg: RTP message that you want to batch into one OSmux message + * + * If 0 is returned, this indicates that the message has been batched and the + * msgb is now owned by the osmux layer. + * If negative value is returned, an error occurred and the message has been + * dropped (and freed). + * If 1 is returned, you have to invoke osmux_xfrm_input_deliver and try again. + * + * The function takes care of releasing the messages in case of error and + * when building the batch. + */ +int osmux_xfrm_input(struct osmux_in_handle *h, struct msgb *msg, int ccid) +{ + int ret; + struct osmux_link *link = (struct osmux_link *)h->internal_data; + struct osmux_in_req req = { + .msg = msg, + .rtph = osmo_rtp_get_hdr(msg), + .circuit = osmux_link_find_circuit(link, ccid), + }; + + if (!req.circuit) { + LOGMUXLK(link, LOGL_INFO, "Couldn't find circuit CID=%u\n", ccid); + goto err_free; + } + + if (!req.rtph) { + LOGMUXCID(link, req.circuit, LOGL_NOTICE, "msg not containing an RTP header\n"); + goto err_free; + } + + /* Ignore too big RTP/RTCP messages, most likely forged. Sanity check + * to avoid a possible forever loop in the caller. + */ + if (msg->len > h->batch_size - sizeof(struct osmux_hdr)) { + LOGMUXCID(link, req.circuit, LOGL_NOTICE, + "RTP payload too big (%u) for configured batch size (%u)\n", + msg->len, h->batch_size); + goto err_free; + } + + switch (req.rtph->payload_type) { + case RTP_PT_RTCP: + LOGMUXCID(link, req.circuit, LOGL_INFO, "Dropping RTCP packet\n"); + msgb_free(msg); + return 0; + default: + /* The RTP payload type is dynamically allocated, + * although we use static ones. Assume that we always + * receive AMR traffic. + */ + req.amrh = osmo_rtp_get_payload(req.rtph, req.msg, &req.rtp_payload_len); + if (req.amrh == NULL) + goto err_free; + req.amr_payload_len = osmux_rtp_amr_payload_len(req.amrh, req.rtp_payload_len); + if (req.amr_payload_len < 0) { + LOGMUXCID(link, req.circuit, LOGL_NOTICE, "AMR payload invalid\n"); + goto err_free; + } + + /* Add this RTP to the OSMUX batch */ + ret = osmux_link_handle_rtp_req(link, &req); + if (ret < 0) { + /* Cannot put this message into the batch. + * Malformed, duplicated, OOM. Drop it and tell + * the upper layer that we have digest it. + */ + LOGMUXCID(link, req.circuit, LOGL_DEBUG, "Dropping RTP packet instead of adding to batch\n"); + goto err_free; + } + + h->stats.input_rtp_msgs++; + h->stats.input_rtp_bytes += msg->len; + break; + } + return ret; + +err_free: + msgb_free(msg); + return -1; +} + +static int osmux_xfrm_input_talloc_destructor(struct osmux_in_handle *h) +{ + struct osmux_link *link = (struct osmux_link *)h->internal_data; + struct osmux_circuit *circuit, *next; + + llist_for_each_entry_safe(circuit, next, &link->circuit_list, head) + osmux_link_del_circuit(link, circuit); + + osmo_timer_del(&link->timer); + talloc_free(link); + return 0; +} + +static unsigned int next_default_name_idx = 0; +/*! \brief Allocate a new osmux in handle (osmux source, tx side) + * \param[in] ctx talloc context to use when allocating the returned struct + * \return Allocated osmux in handle + * + * This object contains configuration and state to handle a group of circuits (trunk), + * receiving RTP packets from the upper layer (API user) and sending batched & + * trunked Osmux messages containing all the data of those circuits down the + * stack outgoing network Osmux messages. + * Returned pointer can be freed with regular talloc_free, all pending messages + * in queue and all internal data will be freed. */ +struct osmux_in_handle *osmux_xfrm_input_alloc(void *ctx) +{ + struct osmux_in_handle *h; + struct osmux_link *link; + + h = talloc_zero(ctx, struct osmux_in_handle); + OSMO_ASSERT(h); + + h->batch_size = OSMUX_BATCH_DEFAULT_MAX; + + link = talloc_zero(h, struct osmux_link); + OSMO_ASSERT(link); + INIT_LLIST_HEAD(&link->circuit_list); + link->h = h; + link->remaining_bytes = h->batch_size; + link->name = talloc_asprintf(link, "input-%u", next_default_name_idx++); + osmo_timer_setup(&link->timer, osmux_link_timer_expired, h); + + h->internal_data = (void *)link; + + LOGMUXLK(link, LOGL_DEBUG, "Initialized osmux input converter\n"); + + talloc_set_destructor(h, osmux_xfrm_input_talloc_destructor); + return h; +} + +/*! \deprecated: Use osmux_xfrm_input_alloc() instead */ +void osmux_xfrm_input_init(struct osmux_in_handle *h) +{ + struct osmux_link *link; + + /* Default to osmux packet size if not specified */ + if (h->batch_size == 0) + h->batch_size = OSMUX_BATCH_DEFAULT_MAX; + + link = talloc_zero(osmux_ctx, struct osmux_link); + if (link == NULL) + return; + INIT_LLIST_HEAD(&link->circuit_list); + link->h = h; + link->remaining_bytes = h->batch_size; + link->name = talloc_asprintf(link, "%u", next_default_name_idx++); + osmo_timer_setup(&link->timer, osmux_link_timer_expired, h); + + h->internal_data = (void *)link; + + LOGMUXLK(link, LOGL_DEBUG, "Initialized osmux input converter\n"); +} + +int osmux_xfrm_input_set_batch_factor(struct osmux_in_handle *h, uint8_t batch_factor) +{ + if (batch_factor > 8) + return -1; + h->batch_factor = batch_factor; + return 0; +} + +void osmux_xfrm_input_set_batch_size(struct osmux_in_handle *h, uint16_t batch_size) +{ + if (batch_size == 0) + h->batch_size = OSMUX_BATCH_DEFAULT_MAX; + else + h->batch_size = batch_size; +} + +void osmux_xfrm_input_set_initial_seqnum(struct osmux_in_handle *h, uint8_t osmux_seqnum) +{ + h->osmux_seq = osmux_seqnum; +} + +void osmux_xfrm_input_set_deliver_cb(struct osmux_in_handle *h, + void (*deliver_cb)(struct msgb *msg, void *data), void *data) +{ + h->deliver = deliver_cb; + h->data = data; +} + +void *osmux_xfrm_input_get_deliver_cb_data(struct osmux_in_handle *h) +{ + return h->data; +} + +void osmux_xfrm_input_set_name(struct osmux_in_handle *h, const char *name) +{ + struct osmux_link *link = (struct osmux_link *)h->internal_data; + osmo_talloc_replace_string(link, &link->name, name); +} + +int osmux_xfrm_input_open_circuit(struct osmux_in_handle *h, int ccid, + int dummy) +{ + struct osmux_link *link = (struct osmux_link *)h->internal_data; + struct osmux_circuit *circuit; + + circuit = osmux_link_find_circuit(link, ccid); + if (circuit != NULL) { + LOGMUXLK(link, LOGL_ERROR, "circuit %u already exists!\n", ccid); + return -1; + } + + circuit = talloc_zero(osmux_ctx, struct osmux_circuit); + if (circuit == NULL) { + LOGMUXLK(link, LOGL_ERROR, "OOM on circuit %u\n", ccid); + return -1; + } + + circuit->ccid = ccid; + circuit->seq = h->osmux_seq; + circuit->last_transmitted_rtp_seq = -1; /* field unset */ + INIT_LLIST_HEAD(&circuit->msg_list); + llist_add_tail(&circuit->head, &link->circuit_list); + + if (dummy) { + circuit->dummy = dummy; + link->ndummy++; + if (!osmo_timer_pending(&link->timer)) + osmo_timer_schedule(&link->timer, 0, + h->batch_factor * DELTA_RTP_MSG); + } + LOGMUXCID(link, circuit, LOGL_INFO, "Circuit opened successfully\n"); + return 0; +} + +void osmux_xfrm_input_close_circuit(struct osmux_in_handle *h, int ccid) +{ + struct osmux_link *link = (struct osmux_link *)h->internal_data; + struct osmux_circuit *circuit; + + circuit = osmux_link_find_circuit(link, ccid); + if (circuit == NULL) { + LOGMUXLK(link, LOGL_NOTICE, "Unable to close circuit %d: Not found\n", + ccid); + return; + } + + LOGMUXCID(link, circuit, LOGL_INFO, "Closing circuit\n"); + + osmux_link_del_circuit(link, circuit); +} + +/*! \deprecated: Use talloc_free() instead (will call osmux_xfrm_input_talloc_destructor()) */ +void osmux_xfrm_input_fini(struct osmux_in_handle *h) +{ + (void)osmux_xfrm_input_talloc_destructor(h); +} + +/*! @} */ diff --git a/src/osmux_output.c b/src/osmux_output.c new file mode 100644 index 0000000..664ed60 --- /dev/null +++ b/src/osmux_output.c @@ -0,0 +1,396 @@ +/* + * (C) 2012-2017 by Pablo Neira Ayuso <pablo@gnumonks.org> + * (C) 2012 by On Waves ehf <http://www.on-waves.com> + * (C) 2015-2022 by sysmocom - s.f.m.c. GmbH + * + * SPDX-License-Identifier: GPL-2.0+ + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + */ + +#include <stdio.h> +#include <string.h> + +#include <osmocom/core/msgb.h> +#include <osmocom/core/timer.h> +#include <osmocom/core/timer_compat.h> +#include <osmocom/core/select.h> +#include <osmocom/core/talloc.h> +#include <osmocom/core/logging.h> + +#include <osmocom/netif/amr.h> +#include <osmocom/netif/rtp.h> +#include <osmocom/netif/osmux.h> + +#include <arpa/inet.h> + +/* delta time between two RTP messages (in microseconds) */ +#define DELTA_RTP_MSG 20000 +/* delta time between two RTP messages (in samples, 8kHz) */ +#define DELTA_RTP_TIMESTAMP 160 + + +/*! \addtogroup osmux Osmocom Multiplex Protocol + * @{ + * + * This code implements a variety of utility functions related to the + * OSMUX user-plane multiplexing protocol, an efficient alternative to + * plain UDP/RTP streams for voice transport in back-haul of cellular + * networks. + * + * For information about the OSMUX protocol design, please see the + * OSMUX reference manual at + * http://ftp.osmocom.org/docs/latest/osmux-reference.pdf + */ + +/*! \file osmux_output.c + * \brief Osmocom multiplex protocol helpers (output) + */ +static uint32_t osmux_ft_dummy_size(uint8_t amr_ft, uint8_t batch_factor) +{ + return sizeof(struct osmux_hdr) + (osmo_amr_bytes(amr_ft) * batch_factor); +} + +struct osmux_hdr *osmux_xfrm_output_pull(struct msgb *msg) +{ + struct osmux_hdr *osmuxh; + size_t len; + +next: + if (msgb_length(msg) == 0) + return NULL; /* base case, we drained the msg successfully, tell user it is done. */ + + if (msgb_length(msg) < sizeof(struct osmux_hdr)) { + LOGP(DLMUX, LOGL_ERROR, "remaining %d bytes, broken osmuxhdr?\n", msgb_length(msg)); + return NULL; + } + + osmuxh = (struct osmux_hdr *)msgb_data(msg); + switch (osmuxh->ft) { + case OSMUX_FT_VOICE_AMR: + if (!osmo_amr_ft_valid(osmuxh->amr_ft)) { + LOGP(DLMUX, LOGL_ERROR, "Discarding bad AMR FT %d\n", osmuxh->amr_ft); + return NULL; + } + len = osmo_amr_bytes(osmuxh->amr_ft) * (osmuxh->ctr + 1) + sizeof(struct osmux_hdr); + if (msgb_length(msg) < len) { + LOGP(DLMUX, LOGL_ERROR, + "Discarding malformed OSMUX message: %s\n", + osmo_hexdump(msgb_data(msg), msgb_length(msg))); + return NULL; + } + msgb_pull(msg, len); + return osmuxh; + + case OSMUX_FT_DUMMY: + if (!osmo_amr_ft_valid(osmuxh->amr_ft)) { + LOGP(DLMUX, LOGL_ERROR, "Discarding bad Dummy FT: amr_ft=%u\n", osmuxh->amr_ft); + return NULL; + } + len = osmux_ft_dummy_size(osmuxh->amr_ft, osmuxh->ctr + 1); + if (msgb_length(msg) < len) { + LOGP(DLMUX, LOGL_ERROR, "Discarding bad Dummy FT: %s\n", + osmo_hexdump(msgb_data(msg), msgb_length(msg))); + return NULL; + } + msgb_pull(msg, len); + goto next; + + default: + LOGP(DLMUX, LOGL_ERROR, "Discarding unsupported Osmux FT %d\n", + osmuxh->ft); + return NULL; + } +} + +static struct msgb * +osmux_rebuild_rtp(struct osmux_out_handle *h, struct osmux_hdr *osmuxh, + void *payload, int payload_len, bool first_pkt) +{ + struct msgb *prev_msg, *out_msg; + struct timespec *prev_ts, *out_ts; + struct rtp_hdr *rtph; + struct amr_hdr *amrh; + struct timespec delta = { .tv_sec = 0, .tv_nsec = DELTA_RTP_MSG*1000 }; + unsigned int msg_len = sizeof(struct rtp_hdr) + + sizeof(struct amr_hdr) + + payload_len; + + if (h->rtp_msgb_alloc_cb) { + out_msg = h->rtp_msgb_alloc_cb(h->rtp_msgb_alloc_cb_data, msg_len); + } else { + out_msg = msgb_alloc(msg_len, "osmux-rtp"); + } + if (out_msg == NULL) + return NULL; + + /* Reconstruct RTP header */ + rtph = (struct rtp_hdr *)out_msg->data; + rtph->csrc_count = 0; + rtph->extension = 0; + rtph->version = RTP_VERSION; + rtph->payload_type = h->rtp_payload_type; + /* ... emulate timestamp and ssrc */ + rtph->timestamp = htonl(h->rtp_timestamp); + rtph->sequence = htons(h->rtp_seq); + rtph->ssrc = htonl(h->rtp_ssrc); + /* rtp packet with the marker bit is always guaranteed to be the first + * one. We want to notify with marker in 2 scenarios: + * 1- Sender told us through osmux frame rtp_m. + * 2- Intermediate osmux frame lost (seq gap), otherwise rtp receiver only sees + * steady increase of delay + */ + rtph->marker = first_pkt && + (osmuxh->rtp_m || (osmuxh->seq != ((h->osmux_seq_ack + 1) & 0xff))); + + msgb_put(out_msg, sizeof(struct rtp_hdr)); + + /* Reconstruct AMR header */ + amrh = (struct amr_hdr *)out_msg->tail; + amrh->cmr = osmuxh->amr_cmr; + amrh->f = osmuxh->amr_f; + amrh->ft = osmuxh->amr_ft; + amrh->q = osmuxh->amr_q; + + msgb_put(out_msg, sizeof(struct amr_hdr)); + + /* add AMR speech data */ + if (payload_len > 0) { + memcpy(out_msg->tail, payload, payload_len); + msgb_put(out_msg, payload_len); + } + + /* bump last RTP sequence number and timestamp that has been used */ + h->rtp_seq++; + h->rtp_timestamp += DELTA_RTP_TIMESTAMP; + + out_ts = ((struct timespec *)&((out_msg)->cb[0])); + if (first_pkt || llist_empty(&h->list)) { + osmo_clock_gettime(CLOCK_MONOTONIC, out_ts); + } else { + prev_msg = llist_last_entry(&h->list, struct msgb, list); + prev_ts = ((struct timespec *)&((prev_msg)->cb[0])); + timespecadd(prev_ts, &delta, out_ts); + } + + return out_msg; +} + +static void osmux_xfrm_output_trigger(void *data) +{ + struct osmux_out_handle *h = data; + struct timespec delay_ts, now; + struct msgb *msg, *next; + + llist_for_each_entry_safe(msg, next, &h->list, list) { + osmo_clock_gettime(CLOCK_MONOTONIC, &now); + struct timespec *msg_ts = ((struct timespec *)&((msg)->cb[0])); + if (timespeccmp(msg_ts, &now, >)) { + timespecsub(msg_ts, &now, &delay_ts); + osmo_timer_schedule(&h->timer, + delay_ts.tv_sec, delay_ts.tv_nsec / 1000); + return; + } + + /* Transmit the rtp packet */ + llist_del(&msg->list); + if (h->tx_cb) + h->tx_cb(msg, h->data); + else + msgb_free(msg); + } +} + +/*! \brief Generate RTP packets from osmux frame AMR payload set and schedule + * them for transmission at appropriate time. + * \param[in] h the osmux out handle handling a specific CID + * \param[in] osmuxh Buffer pointing to osmux frame header structure and AMR payload + * \return Number of generated RTP packets + * + * The osmux frame passed to this function must be of the type OSMUX_FT_VOICE_AMR. + * The generated RTP packets are kept into h's internal list and sent to the + * callback configured through osmux_xfrm_output_set_tx_cb when are ready to be + * transmitted according to schedule. + */ +int osmux_xfrm_output_sched(struct osmux_out_handle *h, struct osmux_hdr *osmuxh) +{ + struct timespec now, *msg_ts; + struct msgb *msg; + int i; + bool was_empty = llist_empty(&h->list); + + if (!was_empty) { + /* If we received new data it means we are behind schedule and + * we should flush all previous quickly */ + osmo_clock_gettime(CLOCK_MONOTONIC, &now); + llist_for_each_entry(msg, &h->list, list) { + msg_ts = ((struct timespec *)&((msg)->cb[0])); + *msg_ts = now; + } + osmo_timer_schedule(&h->timer, 0, 0); + } + + for (i = 0; i < osmuxh->ctr + 1; i++) { + struct rtp_hdr *rtph; + + msg = osmux_rebuild_rtp(h, osmuxh, + osmux_get_payload(osmuxh) + + i * osmo_amr_bytes(osmuxh->amr_ft), + osmo_amr_bytes(osmuxh->amr_ft), !i); + if (msg == NULL) + continue; + + rtph = osmo_rtp_get_hdr(msg); + if (rtph == NULL) + continue; + + llist_add_tail(&msg->list, &h->list); + } + + /* Update last seen seq number: */ + h->osmux_seq_ack = osmuxh->seq; + + /* In case list is still empty after parsing messages, no need to rearm */ + if (was_empty && !llist_empty(&h->list)) + osmux_xfrm_output_trigger(h); + return i; +} + +/*! \brief Flush all scheduled RTP packets still pending to be transmitted + * \param[in] h the osmux out handle to flush + * + * This function will immediately call the transmit callback for all queued RTP + * packets, making sure the list ends up empty. It will also stop all internal + * timers to make sure the osmux_out_handle can be dropped or re-used by calling + * osmux_xfrm_output on it. + */ +void osmux_xfrm_output_flush(struct osmux_out_handle *h) +{ + struct msgb *msg, *next; + + if (osmo_timer_pending(&h->timer)) + osmo_timer_del(&h->timer); + + llist_for_each_entry_safe(msg, next, &h->list, list) { + llist_del(&msg->list); + if (h->tx_cb) + h->tx_cb(msg, h->data); + else + msgb_free(msg); + } +} + +struct osmux_tx_handle { + struct osmo_timer_list timer; + struct msgb *msg; + void (*tx_cb)(struct msgb *msg, void *data); + void *data; +}; + +static int osmux_xfrm_output_talloc_destructor(struct osmux_out_handle *h) +{ + osmux_xfrm_output_flush(h); + return 0; +} + +/* Placeholder to avoid init code duplication while keeping backward + * compatilbility with deprecated osmux_xfrm_output_init{2}() APIs. */ +static void _osmux_xfrm_output_init(struct osmux_out_handle *h, uint32_t rtp_ssrc, uint8_t rtp_payload_type) +{ + h->rtp_seq = (uint16_t)random(); + h->rtp_timestamp = (uint32_t)random(); + h->rtp_ssrc = rtp_ssrc; + h->rtp_payload_type = rtp_payload_type; + INIT_LLIST_HEAD(&h->list); + osmo_timer_setup(&h->timer, osmux_xfrm_output_trigger, h); +} + +/*! \brief Allocate a new osmux out handle + * \param[in] ctx talloc context to use when allocating the returned struct + * \return Allocated osmux out handle + * + * This object contains configuration and state to handle a specific CID in + * incoming network Osmux messages, repackaging the frames for that CID as RTP + * packets and pushing them up the protocol stack. + * Returned pointer can be freed with regular talloc_free, queue will be flushed + * and all internal data will be freed. */ +struct osmux_out_handle *osmux_xfrm_output_alloc(void *ctx) +{ + struct osmux_out_handle *h; + + h = talloc_zero(ctx, struct osmux_out_handle); + OSMO_ASSERT(h); + + _osmux_xfrm_output_init(h, (uint32_t)random(), 98); + + talloc_set_destructor(h, osmux_xfrm_output_talloc_destructor); + return h; +} + +/*! \deprecated: Use osmux_xfrm_output_alloc() and osmux_xfrm_output_set_rtp_*() instead */ +void osmux_xfrm_output_init2(struct osmux_out_handle *h, uint32_t rtp_ssrc, uint8_t rtp_payload_type) +{ + memset(h, 0, sizeof(*h)); + _osmux_xfrm_output_init(h, rtp_ssrc, rtp_payload_type); +} + +/*! \deprecated: Use osmux_xfrm_output_alloc() and osmux_xfrm_output_set_rtp_*() instead */ +void osmux_xfrm_output_init(struct osmux_out_handle *h, uint32_t rtp_ssrc) +{ + /* backward compatibility with old users, where 98 was harcoded in osmux_rebuild_rtp() */ + memset(h, 0, sizeof(*h)); + _osmux_xfrm_output_init(h, rtp_ssrc, 98); +} + +/*! \brief Set transmission callback to call when a generated RTP packet is to be transmitted + * \param[in] h the osmux out handle handling a specific CID + * \param[in] osmuxh Buffer pointing to osmux frame header structure and AMR payload + * \return Number of generated RTP packets + * + * This Function sets the callback called by the interal timer set by + * osmux_xfrm_out_sched function. + */ +void osmux_xfrm_output_set_tx_cb(struct osmux_out_handle *h, + void (*tx_cb)(struct msgb *msg, void *data), + void *data) +{ + h->tx_cb = tx_cb; + h->data = data; +} + +/*! \brief Set callback to call when an RTP packet to be generated is to be allocated + * \param[in] h the osmux out handle handling a specific CID + * \param[in] cb User defined msgb alloc function for generated RTP pkts + * \param[in] cb_data Opaque data pointer set by user and passed in \ref cb + * \return msgb structure to be used to fill in generated RTP pkt content + */ +void osmux_xfrm_output_set_rtp_msgb_alloc_cb(struct osmux_out_handle *h, + rtp_msgb_alloc_cb_t cb, + void *cb_data) +{ + h->rtp_msgb_alloc_cb = cb; + h->rtp_msgb_alloc_cb_data = cb_data; +} + +/*! \brief Set SSRC of generated RTP packets from Osmux frames + * \param[in] h the osmux out handle handling a specific CID + * \param[in] rtp_ssrc the RTP SSRC to set + */ +void osmux_xfrm_output_set_rtp_ssrc(struct osmux_out_handle *h, uint32_t rtp_ssrc) +{ + h->rtp_ssrc = rtp_ssrc; +} + +/*! \brief Set Payload Type of generated RTP packets from Osmux frames + * \param[in] h the osmux out handle handling a specific CID + * \param[in] rtp_payload_type the RTP Payload Type to set + */ +void osmux_xfrm_output_set_rtp_pl_type(struct osmux_out_handle *h, uint32_t rtp_payload_type) +{ + h->rtp_payload_type = rtp_payload_type; +} + +/*! @} */ diff --git a/src/prim.c b/src/prim.c new file mode 100644 index 0000000..eae1064 --- /dev/null +++ b/src/prim.c @@ -0,0 +1,473 @@ +/* (C) 2021 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de> + * Author: Pau Espin Pedrol <pespin@sysmocom.de> + * All Rights Reserved + * + * SPDX-License-Identifier: GPL-2.0+ + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + */ + +#include <stdio.h> +#include <unistd.h> +#include <stdlib.h> +#include <string.h> +#include <errno.h> +#include <assert.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <inttypes.h> + +#include <osmocom/core/talloc.h> +#include <osmocom/core/select.h> +#include <osmocom/core/socket.h> +#include <osmocom/core/logging.h> + +#include <osmocom/netif/prim.h> +#include <osmocom/netif/stream.h> + +struct osmo_prim_pkt_hdr { + uint32_t sap; /*!< Service Access Point Identifier */ + uint16_t primitive; /*!< Primitive number */ + uint16_t operation; /*! Primitive Operation (enum osmo_prim_operation) */ +} __attribute__ ((packed)); + +/* Here we take advantage of the fact that sizeof(struct + * osmo_prim_pkt_hdr) <= sizeof(struct osmo_prim_hdr), so we don't need + * to allocate headroom when serializing later. + */ +osmo_static_assert(sizeof(struct osmo_prim_pkt_hdr) <= sizeof(struct osmo_prim_hdr), + osmo_prim_msgb_alloc_validate_headroom); + +/*! Allocate a primitive of given type and its associated msgb. +* \param[in] sap Service Access Point +* \param[in] primitive Primitive Number +* \param[in] operation Primitive Operation (REQ/RESP/IND/CONF) +* \param[in] alloc_len Total length (including struct osmo_prim_hdr) to allocate for the primitive +* \returns Pointer to allocated prim_hdr inisde its own msgb. The osmo_prim_hdr +* is pre-alocated & pre-filled. +*/ +struct osmo_prim_hdr *osmo_prim_msgb_alloc(unsigned int sap, unsigned int primitive, + enum osmo_prim_operation operation, size_t alloc_len) +{ + struct msgb *msg; + struct osmo_prim_hdr *oph; + + if (alloc_len < sizeof(*oph)) + return NULL; + + msg = msgb_alloc(alloc_len, "osmo_prim_msgb_alloc"); + oph = (struct osmo_prim_hdr *)msgb_put(msg, sizeof(*oph)); + osmo_prim_init(oph, sap, primitive, operation, msg); + msg->l2h = msg->tail; + + return oph; +} + +struct osmo_prim_srv_link { + void *priv; + char *addr; + int log_cat; /* Defaults to DLGLOBAL */ + struct osmo_stream_srv_link *stream; + osmo_prim_srv_conn_cb opened_conn_cb; + osmo_prim_srv_conn_cb closed_conn_cb; + osmo_prim_srv_rx_sapi_version rx_sapi_version_cb; + osmo_prim_srv_rx_cb rx_cb; + size_t rx_msgb_alloc_len; +}; + +struct osmo_prim_srv { + void *priv; + struct osmo_prim_srv_link *link; /* backpointer */ + struct osmo_stream_srv *stream; +}; + +/****************************** + * CONTROL SAP + ******************************/ +#define OSMO_PRIM_CTL_SAPI 0xffffffff +#define OSMO_PRIM_CTL_API_VERSION 0 + +enum sap_ctl_prim_type { + SAP_CTL_PRIM_HELLO, + _SAP_CTL_PRIM_MAX +}; + +const struct value_string sap_ctl_prim_type_names[] = { + OSMO_VALUE_STRING(SAP_CTL_PRIM_HELLO), + { 0, NULL } +}; + +/* HNB_CTL_PRIM_HELLO.ind, UL */ +struct sap_ctl_hello_param { + uint32_t sapi; /* SAPI for which we negotiate version */ + uint16_t api_version; /* The intended version */ +} __attribute__ ((packed)); + +struct sap_ctl_prim { + struct osmo_prim_hdr hdr; + union { + struct sap_ctl_hello_param hello_req; + struct sap_ctl_hello_param hello_cnf; + } u; +} __attribute__ ((packed)); + +static struct sap_ctl_prim *_sap_ctl_makeprim_hello_cnf(uint32_t sapi, uint16_t api_version) +{ + struct sap_ctl_prim *ctl_prim; + + ctl_prim = (struct sap_ctl_prim *)osmo_prim_msgb_alloc( + OSMO_PRIM_CTL_SAPI, SAP_CTL_PRIM_HELLO, PRIM_OP_CONFIRM, + sizeof(struct osmo_prim_hdr) + sizeof(struct sap_ctl_hello_param)); + msgb_put(ctl_prim->hdr.msg, sizeof(struct sap_ctl_hello_param)); + ctl_prim->u.hello_cnf.sapi = sapi; + ctl_prim->u.hello_cnf.api_version = api_version; + + return ctl_prim; +} + +/****************************** + * osmo_prim_srv + ******************************/ +#define LOGSRV(srv, lvl, fmt, args...) LOGP((srv)->link->log_cat, lvl, fmt, ## args) + +static int _srv_sap_ctl_rx_hello_req(struct osmo_prim_srv *prim_srv, struct sap_ctl_hello_param *hello_ind) +{ + struct sap_ctl_prim *prim_resp; + int rc; + + LOGSRV(prim_srv, LOGL_INFO, "Rx CTL-HELLO.req SAPI=%u API_VERSION=%u\n", hello_ind->sapi, hello_ind->api_version); + + if (hello_ind->sapi == OSMO_PRIM_CTL_SAPI) + rc = hello_ind->api_version == OSMO_PRIM_CTL_API_VERSION ? OSMO_PRIM_CTL_API_VERSION : -1; + else if (prim_srv->link->rx_sapi_version_cb) + rc = prim_srv->link->rx_sapi_version_cb(prim_srv, hello_ind->sapi, hello_ind->api_version); + else /* Accept whatever version by default: */ + rc = hello_ind->api_version; + + if (rc < 0) { + LOGSRV(prim_srv, LOGL_ERROR, + "SAPI=%u API_VERSION=%u not supported! destroying connection\n", + hello_ind->sapi, hello_ind->api_version); + osmo_stream_srv_set_flush_and_destroy(prim_srv->stream); + return rc; + } + prim_resp = _sap_ctl_makeprim_hello_cnf(hello_ind->sapi, (uint16_t)rc); + LOGSRV(prim_srv, LOGL_INFO, "Tx CTL-HELLO.cnf SAPI=%u API_VERSION=%u\n", + hello_ind->sapi, prim_resp->u.hello_cnf.api_version); + rc = osmo_prim_srv_send(prim_srv, prim_resp->hdr.msg); + return rc; +} + +static int _srv_sap_ctl_rx(struct osmo_prim_srv *prim_srv, struct osmo_prim_hdr *oph) +{ + switch (oph->operation) { + case PRIM_OP_REQUEST: + switch (oph->primitive) { + case SAP_CTL_PRIM_HELLO: + return _srv_sap_ctl_rx_hello_req(prim_srv, (struct sap_ctl_hello_param *)msgb_data(oph->msg)); + default: + LOGSRV(prim_srv, LOGL_ERROR, "Rx unknown CTL SAP primitive %u (len=%u)\n", + oph->primitive, msgb_length(oph->msg)); + return -EINVAL; + } + break; + case PRIM_OP_RESPONSE: + case PRIM_OP_INDICATION: + case PRIM_OP_CONFIRM: + default: + LOGSRV(prim_srv, LOGL_ERROR, "Rx CTL SAP unexpected primitive operation %s-%s (len=%u)\n", + get_value_string(sap_ctl_prim_type_names, oph->primitive), + get_value_string(osmo_prim_op_names, oph->operation), + msgb_length(oph->msg)); + return -EINVAL; + } +} + +static int _osmo_prim_srv_read_cb(struct osmo_stream_srv *srv) +{ + struct osmo_prim_srv *prim_srv = osmo_stream_srv_get_data(srv); + struct osmo_prim_pkt_hdr *pkth; + struct msgb *msg; + struct osmo_prim_hdr oph; + int rc; + + msg = msgb_alloc_c(prim_srv, sizeof(*pkth) + prim_srv->link->rx_msgb_alloc_len, + "osmo_prim_srv_link_rx"); + if (!msg) + return -ENOMEM; + rc = osmo_stream_srv_recv(srv, msg); + if (rc == 0) + goto close; + + if (rc < 0) { + if (errno == EAGAIN) { + msgb_free(msg); + return 0; + } + goto close; + } + + if (rc < sizeof(*pkth)) { + LOGSRV(prim_srv, LOGL_ERROR, "Received %d bytes on UD Socket, but primitive hdr size " + "is %zu, discarding\n", rc, sizeof(*pkth)); + msgb_free(msg); + return 0; + } + pkth = (struct osmo_prim_pkt_hdr *)msgb_data(msg); + + /* De-serialize message: */ + osmo_prim_init(&oph, pkth->sap, pkth->primitive, pkth->operation, msg); + msgb_pull(msg, sizeof(*pkth)); + + switch (oph.sap) { + case OSMO_PRIM_CTL_SAPI: + rc = _srv_sap_ctl_rx(prim_srv, &oph); + break; + default: + if (prim_srv->link->rx_cb) + rc = prim_srv->link->rx_cb(prim_srv, &oph); + break; + } + /* as we always synchronously process the message in _osmo_prim_srv_link_rx() and + * its callbacks, we can free the message here. */ + msgb_free(msg); + + return rc; + +close: + msgb_free(msg); + osmo_prim_srv_close(prim_srv); + return -1; +} + +static void osmo_prim_srv_free(struct osmo_prim_srv *prim_srv); +static int _osmo_prim_srv_closed_cb(struct osmo_stream_srv *srv) +{ + struct osmo_prim_srv *prim_srv = osmo_stream_srv_get_data(srv); + struct osmo_prim_srv_link *prim_link = prim_srv->link; + if (prim_link->closed_conn_cb) + return prim_link->closed_conn_cb(prim_srv); + osmo_prim_srv_free(prim_srv); + return 0; +} + +/*! Allocate a primitive of given type and its associated msgb. +* \param[in] srv The osmo_prim_srv_link instance where message is to be sent through +* \param[in] msg msgb containing osmo_prim_hdr plus extra content, allocated through \ref osmo_prim_msgb_alloc() +* \returns zero on success, negative on error */ +int osmo_prim_srv_send(struct osmo_prim_srv *prim_srv, struct msgb *msg) +{ + struct osmo_prim_hdr *oph; + struct osmo_prim_pkt_hdr *pkth; + unsigned int sap; + unsigned int primitive; + enum osmo_prim_operation operation; + + OSMO_ASSERT(prim_srv); + + /* Serialize the oph: */ + oph = (struct osmo_prim_hdr *)msgb_data(msg); + OSMO_ASSERT(oph && msgb_length(msg) >= sizeof(*oph)); + sap = oph->sap; + primitive = oph->primitive; + operation = oph->operation; + msgb_pull(msg, sizeof(*oph)); + pkth = (struct osmo_prim_pkt_hdr *)msgb_push(msg, sizeof(*pkth)); + pkth->sap = sap; + pkth->primitive = primitive; + pkth->operation = operation; + + /* Finally enqueue the msg */ + osmo_stream_srv_send(prim_srv->stream, msg); + + return 0; +} + +static struct osmo_prim_srv *osmo_prim_srv_alloc(struct osmo_prim_srv_link *prim_link, int fd) +{ + struct osmo_prim_srv *prim_srv; + prim_srv = talloc_zero(prim_link, struct osmo_prim_srv); + if (!prim_srv) + return NULL; + prim_srv->link = prim_link; + prim_srv->stream = osmo_stream_srv_create(prim_link, prim_link->stream, fd, + _osmo_prim_srv_read_cb, + _osmo_prim_srv_closed_cb, + prim_srv); + if (!prim_srv->stream) { + talloc_free(prim_srv); + return NULL; + } + /* Inherit link priv pointer by default, user can later set it through API: */ + prim_srv->priv = prim_link->priv; + return prim_srv; +} + +static void osmo_prim_srv_free(struct osmo_prim_srv *prim_srv) +{ + talloc_free(prim_srv); +} + +void osmo_prim_srv_set_name(struct osmo_prim_srv *prim_srv, const char *name) +{ + osmo_stream_srv_set_name(prim_srv->stream, name); +} + +struct osmo_prim_srv_link *osmo_prim_srv_get_link(struct osmo_prim_srv *prim_srv) +{ + return prim_srv->link; +} + +void osmo_prim_srv_set_priv(struct osmo_prim_srv *prim_srv, void *priv) +{ + prim_srv->priv = priv; +} + +void *osmo_prim_srv_get_priv(const struct osmo_prim_srv *prim_srv) +{ + return prim_srv->priv; +} + +void osmo_prim_srv_close(struct osmo_prim_srv *prim_srv) +{ + osmo_stream_srv_destroy(prim_srv->stream); + /* we free prim_srv in _osmo_prim_srv_closed_cb() */ +} + +/****************************** + * osmo_prim_srv_link + ******************************/ + +#define LOGSRVLINK(srv, lvl, fmt, args...) LOGP((srv)->log_cat, lvl, fmt, ## args) + +/* accept connection coming from PCU */ +static int _osmo_prim_srv_link_accept(struct osmo_stream_srv_link *link, int fd) +{ + struct osmo_prim_srv *prim_srv; + struct osmo_prim_srv_link *prim_link = osmo_stream_srv_link_get_data(link); + + prim_srv = osmo_prim_srv_alloc(prim_link, fd); + + if (prim_link->opened_conn_cb) + return prim_link->opened_conn_cb(prim_srv); + + return 0; +} + +struct osmo_prim_srv_link *osmo_prim_srv_link_alloc(void *ctx) +{ + struct osmo_prim_srv_link *prim_link; + prim_link = talloc_zero(ctx, struct osmo_prim_srv_link); + if (!prim_link) + return NULL; + prim_link->stream = osmo_stream_srv_link_create(prim_link); + if (!prim_link->stream) { + talloc_free(prim_link); + return NULL; + } + osmo_stream_srv_link_set_data(prim_link->stream, prim_link); + osmo_stream_srv_link_set_domain(prim_link->stream, AF_UNIX); + osmo_stream_srv_link_set_type(prim_link->stream, SOCK_SEQPACKET); + osmo_stream_srv_link_set_accept_cb(prim_link->stream, _osmo_prim_srv_link_accept); + + prim_link->log_cat = DLGLOBAL; + prim_link->rx_msgb_alloc_len = 1600 - sizeof(struct osmo_prim_pkt_hdr); + return prim_link; +} + +void osmo_prim_srv_link_free(struct osmo_prim_srv_link *prim_link) +{ + if (!prim_link) + return; + + if (prim_link->stream) { + osmo_stream_srv_link_close(prim_link->stream); + osmo_stream_srv_link_destroy(prim_link->stream); + prim_link->stream = NULL; + } + talloc_free(prim_link); +} + +void osmo_prim_srv_link_set_name(struct osmo_prim_srv_link *prim_link, const char *name) +{ + osmo_stream_srv_link_set_name(prim_link->stream, name); +} + +int osmo_prim_srv_link_set_addr(struct osmo_prim_srv_link *prim_link, const char *path) +{ + osmo_talloc_replace_string(prim_link, &prim_link->addr, path); + osmo_stream_srv_link_set_addr(prim_link->stream, path); + return 0; +} + +const char *osmo_prim_srv_link_get_addr(struct osmo_prim_srv_link *prim_link) +{ + return prim_link->addr; +} + +void osmo_prim_srv_link_set_priv(struct osmo_prim_srv_link *prim_link, void *priv) +{ + prim_link->priv = priv; +} + +void *osmo_prim_srv_link_get_priv(const struct osmo_prim_srv_link *prim_link) +{ + return prim_link->priv; +} + +void osmo_prim_srv_link_set_log_category(struct osmo_prim_srv_link *prim_link, int log_cat) +{ + prim_link->log_cat = log_cat; +} + +void osmo_prim_srv_link_set_opened_conn_cb(struct osmo_prim_srv_link *prim_link, osmo_prim_srv_conn_cb opened_conn_cb) +{ + prim_link->opened_conn_cb = opened_conn_cb; +} +void osmo_prim_srv_link_set_closed_conn_cb(struct osmo_prim_srv_link *prim_link, osmo_prim_srv_conn_cb closed_conn_cb) +{ + prim_link->closed_conn_cb = closed_conn_cb; +} + +void osmo_prim_srv_link_set_rx_sapi_version_cb(struct osmo_prim_srv_link *prim_link, osmo_prim_srv_rx_sapi_version rx_sapi_version_cb) +{ + prim_link->rx_sapi_version_cb = rx_sapi_version_cb; +} + +void osmo_prim_srv_link_set_rx_cb(struct osmo_prim_srv_link *prim_link, osmo_prim_srv_rx_cb rx_cb) +{ + prim_link->rx_cb = rx_cb; +} + +void osmo_prim_srv_link_set_rx_msgb_alloc_len(struct osmo_prim_srv_link *prim_link, size_t alloc_len) +{ + prim_link->rx_msgb_alloc_len = alloc_len; +} + +int osmo_prim_srv_link_open(struct osmo_prim_srv_link *prim_link) +{ + int rc; + + if (!prim_link->addr) { + LOGSRVLINK(prim_link, LOGL_ERROR, "Cannot open, Address not configured\n"); + return -1; + } + + rc = osmo_stream_srv_link_open(prim_link->stream); + + LOGSRVLINK(prim_link, LOGL_INFO, "Started listening on Lower Layer Unix Domain Socket: %s\n", prim_link->addr); + + return rc; +} diff --git a/src/rs232.c b/src/rs232.c index 28f1ba0..b20c111 100644 --- a/src/rs232.c +++ b/src/rs232.c @@ -62,7 +62,7 @@ void rs232_tx_timer_cb(void *ptr) struct osmo_rs232 *r = ptr; /* we're again ready to transmit. */ - r->ofd.when |= BSC_FD_WRITE; + osmo_fd_write_enable(&r->ofd); } static int handle_ser_write(struct osmo_fd *bfd) @@ -72,15 +72,15 @@ static int handle_ser_write(struct osmo_fd *bfd) struct msgb *msg; int written; - LOGP(DLINP, LOGL_DEBUG, "writing data to rs232\n"); + LOGP(DLINP, LOGL_DEBUG, "writing data to rs232\n"); - if (llist_empty(&r->tx_queue)) { - r->ofd.when &= ~BSC_FD_WRITE; - return 0; - } - lh = r->tx_queue.next; - llist_del(lh); - msg = llist_entry(lh, struct msgb, list); + if (llist_empty(&r->tx_queue)) { + osmo_fd_write_disable(&r->ofd); + return 0; + } + lh = r->tx_queue.next; + llist_del(lh); + msg = llist_entry(lh, struct msgb, list); written = write(bfd->fd, msg->data, msg->len); if (written < msg->len) { @@ -92,7 +92,7 @@ static int handle_ser_write(struct osmo_fd *bfd) /* We've got more data to write, but we have to wait to make it. */ if (!llist_empty(&r->tx_queue) && r->cfg.delay_us) { - r->ofd.when &= ~BSC_FD_WRITE; + osmo_fd_write_disable(&r->ofd); osmo_timer_schedule(&r->tx_timer, 0, r->cfg.delay_us); } return 0; @@ -114,13 +114,13 @@ static int serial_fd_cb(struct osmo_fd *bfd, unsigned int what) { int rc = 0; - if (what & BSC_FD_READ) + if (what & OSMO_FD_READ) rc = handle_ser_read(bfd); if (rc < 0) return rc; - if (what & BSC_FD_WRITE) + if (what & OSMO_FD_WRITE) rc = handle_ser_write(bfd); return rc; @@ -223,9 +223,7 @@ int osmo_rs232_open(struct osmo_rs232 *r) return rc; } - bfd->when = BSC_FD_READ; - bfd->cb = serial_fd_cb; - bfd->data = r; + osmo_fd_setup(bfd, bfd->fd, OSMO_FD_READ, serial_fd_cb, r, 0); rc = osmo_fd_register(bfd); if (rc < 0) { @@ -256,8 +254,8 @@ int osmo_rs232_read(struct osmo_rs232 *r, struct msgb *msg) void osmo_rs232_write(struct osmo_rs232 *r, struct msgb *msg) { - msgb_enqueue(&r->tx_queue, msg); - r->ofd.when |= BSC_FD_WRITE; + msgb_enqueue(&r->tx_queue, msg); + osmo_fd_write_enable(&r->ofd); } void osmo_rs232_close(struct osmo_rs232 *r) @@ -89,13 +89,14 @@ int osmo_rtp_handle_tx_set_timestamp(struct osmo_rtp_handle *h, uint32_t timesta struct rtp_hdr *osmo_rtp_get_hdr(struct msgb *msg) { - struct rtp_hdr *rtph = (struct rtp_hdr *)msg->data; + struct rtp_hdr *rtph; if (msg->len < sizeof(struct rtp_hdr)) { DEBUGPC(DLMUX, "received RTP frame too short (len = %d)\n", msg->len); return NULL; } + rtph = (struct rtp_hdr *)msg->data; if (rtph->version != RTP_VERSION) { DEBUGPC(DLMUX, "received RTP version %d not supported.\n", rtph->version); @@ -198,7 +199,8 @@ osmo_rtp_build(struct osmo_rtp_handle *h, uint8_t payload_type, rtph->timestamp = htonl(h->tx.timestamp); h->tx.timestamp += duration; rtph->ssrc = htonl(h->tx.ssrc); - memcpy(msg->data + sizeof(struct rtp_hdr), data, payload_len); + if (payload_len > 0) + memcpy(msg->data + sizeof(struct rtp_hdr), data, payload_len); msgb_put(msg, sizeof(struct rtp_hdr) + payload_len); return msg; diff --git a/src/sctp.c b/src/sctp.c new file mode 100644 index 0000000..807bdac --- /dev/null +++ b/src/sctp.c @@ -0,0 +1,95 @@ +#include <netinet/sctp.h> +#include <osmocom/netif/sctp.h> + +const struct value_string osmo_sctp_assoc_chg_strs[] = { + { SCTP_COMM_UP, "COMM_UP" }, + { SCTP_COMM_LOST, "COMM_LOST" }, + { SCTP_RESTART, "RESTART" }, + { SCTP_SHUTDOWN_COMP, "SHUTDOWN_COMP" }, + { SCTP_CANT_STR_ASSOC, "CANT_STR_ASSOC" }, + { 0, NULL } +}; + +const struct value_string osmo_sctp_paddr_chg_strs[] = { + { SCTP_ADDR_AVAILABLE, "ADDR_AVAILABLE" }, + { SCTP_ADDR_UNREACHABLE, "ADDR_UNREACHABLE" }, + { SCTP_ADDR_REMOVED, "ADDR_REMOVED" }, + { SCTP_ADDR_ADDED, "ADDR_ADDED" }, + { SCTP_ADDR_MADE_PRIM, "ADDR_MADE_PRIM" }, + { SCTP_ADDR_CONFIRMED, "ADDR_CONFIRMED" }, +#ifdef SCTP_ADDR_PF + { SCTP_ADDR_PF, "ADDR_POTENTIALLY_FAILED" }, +#endif + { 0, NULL } +}; + +const struct value_string osmo_sctp_sn_type_strs[] = { + { SCTP_ASSOC_CHANGE, "ASSOC_CHANGE" }, + { SCTP_PEER_ADDR_CHANGE, "PEER_ADDR_CHANGE" }, + { SCTP_SHUTDOWN_EVENT, "SHUTDOWN_EVENT" }, + { SCTP_SEND_FAILED, "SEND_FAILED" }, + { SCTP_REMOTE_ERROR, "REMOTE_ERROR" }, + { SCTP_PARTIAL_DELIVERY_EVENT, "PARTIAL_DELIVERY_EVENT" }, + { SCTP_ADAPTATION_INDICATION, "ADAPTATION_INDICATION" }, +#ifdef SCTP_AUTHENTICATION_INDICATION + { SCTP_AUTHENTICATION_INDICATION, "AUTHENTICATION_INDICATION" }, +#endif +#ifdef SCTP_SENDER_DRY_EVENT + { SCTP_SENDER_DRY_EVENT, "SENDER_DRY_EVENT" }, +#endif + { 0, NULL } +}; + + +const struct value_string osmo_sctp_sn_error_strs[] = { + { SCTP_FAILED_THRESHOLD, "FAILED_THRESHOLD" }, + { SCTP_RECEIVED_SACK, "RECEIVED_SACK" }, + { SCTP_HEARTBEAT_SUCCESS, "HEARTBEAT_SUCCESS" }, + { SCTP_RESPONSE_TO_USER_REQ, "RESPONSE_TO_USER_REQ" }, + { SCTP_INTERNAL_ERROR, "INTERNAL_ERROR" }, + { SCTP_SHUTDOWN_GUARD_EXPIRES, "SHUTDOWN_GUARD_EXPIRES" }, + { SCTP_PEER_FAULTY, "PEER_FAULTY" }, + { 0, NULL } +}; + +/* rfc4960 section 3.3.10 "Operation Error", in host byte order */ +const struct value_string osmo_sctp_op_error_strs[] = { + { OSMO_SCTP_OP_ERR_INVALID_STREAM_ID, "Invalid Stream Identifier" }, + { OSMO_SCTP_OP_ERR_MISS_MAND_PARAM, "Missing Mandatory Parameter" }, + { OSMO_SCTP_OP_ERR_STALE_COOKIE, "Stale Cookie Error" }, + { OSMO_SCTP_OP_ERR_NO_RESOURCES, "Out of Resource" }, + { OSMO_SCTP_OP_ERR_UNRESOLV_ADDR, "Unresolvable Address" }, + { OSMO_SCTP_OP_ERR_UNKN_CHUNK_TYPE, "Unrecognized Chunk Type" }, + { OSMO_SCTP_OP_ERR_INVALID_MAND_PARAM, "Invalid Mandatory Parameter" }, + { OSMO_SCTP_OP_ERR_UNKN_PARAM, "Unrecognized Parameters" }, + { OSMO_SCTP_OP_ERR_NO_USER_DATA, "No User Data" }, + { OSMO_SCTP_OP_ERR_COOKIE_RX_WHILE_SHUTDOWN, "Cookie Received While Shutting Down" }, + { OSMO_SCTP_OP_ERR_RESTART_ASSC_NEW_ADDR, "Restart of an Association with New Addresses" }, + { OSMO_SCTP_OP_ERR_UNER_INITED_ABORT, "User Initiated Abort" }, + { OSMO_SCTP_OP_ERR_PROTO_VERSION, "Protocol Violation" }, + { 0, NULL } +}; + +/* linux/sctp.h enum sctp_spinfo_state */ +const struct value_string osmo_sctp_spinfo_state_strs[] = { + { SCTP_INACTIVE, "INACTIVE" }, + { SCTP_PF, "POTENTIALLY_FAILED" }, + { SCTP_ACTIVE, "ACTIVE" }, + { SCTP_UNCONFIRMED, "UNCONFIRMED" }, + { SCTP_UNKNOWN, "UNKNOWN" }, + { 0, NULL } +}; + +/* linux/sctp.h enum sctp_sstat_state */ +const struct value_string osmo_sctp_sstat_state_strs[] = { + { SCTP_EMPTY, "EMPTY" }, + { SCTP_CLOSED, "CLOSED" }, + { SCTP_COOKIE_WAIT, "COOKIE_WAIT" }, + { SCTP_COOKIE_ECHOED, "COOKIE_ECHOED" }, + { SCTP_ESTABLISHED, "ESTABLISHED" }, + { SCTP_SHUTDOWN_PENDING, "SHUTDOWN_PENDING" }, + { SCTP_SHUTDOWN_SENT, "SHUTDOWN_SENT" }, + { SCTP_SHUTDOWN_RECEIVED, "SHUTDOWN_RECEIVED" }, + { SCTP_SHUTDOWN_ACK_SENT, "SHUTDOWN_ACK_SENT" }, + { 0, NULL } +}; diff --git a/src/stream.c b/src/stream.c index 3d0b665..f8cbed6 100644 --- a/src/stream.c +++ b/src/stream.c @@ -1,5 +1,6 @@ /* (C) 2011 by Pablo Neira Ayuso <pablo@gnumonks.org> * (C) 2015-2016 by Harald Welte <laforge@gnumonks.org> + * (C) 2023 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de> * All Rights Reserved. * * SPDX-License-Identifier: GPL-2.0+ @@ -37,43 +38,117 @@ #include <osmocom/core/utils.h> #include <osmocom/gsm/tlv.h> #include <osmocom/core/msgb.h> +#include <osmocom/core/osmo_io.h> +#include <osmocom/core/panic.h> #include <osmocom/core/logging.h> #include <osmocom/core/talloc.h> #include <osmocom/core/socket.h> #include <osmocom/netif/stream.h> +#include <osmocom/netif/stream_private.h> #include "config.h" +#include <osmocom/netif/sctp.h> + +/*! \cond private */ + #ifdef HAVE_LIBSCTP -#include <netinet/sctp.h> -#endif -#define LOGSCLI(cli, level, fmt, args...) \ - LOGP(DLINP, level, "[%s] %s(): " fmt, get_value_string(stream_cli_state_names, (cli)->state), __func__, ## args) +/* is any of the bytes from offset .. u8_size in 'u8' non-zero? return offset or -1 if all zero */ +static int byte_nonzero(const uint8_t *u8, unsigned int offset, unsigned int u8_size) +{ + int j; + + for (j = offset; j < u8_size; j++) { + if (u8[j] != 0) + return j; + } + + return -1; +} + +static unsigned int sctp_sockopt_event_subscribe_size = 0; + +static int determine_sctp_sockopt_event_subscribe_size(void) +{ + uint8_t buf[256]; + socklen_t buf_len = sizeof(buf); + int sd, rc; + + /* only do this once */ + if (sctp_sockopt_event_subscribe_size > 0) + return 0; + + sd = socket(AF_INET, SOCK_STREAM, IPPROTO_SCTP); + if (sd < 0) + return sd; + + rc = getsockopt(sd, IPPROTO_SCTP, SCTP_EVENTS, buf, &buf_len); + close(sd); + if (rc < 0) + return rc; + + sctp_sockopt_event_subscribe_size = (unsigned int)buf_len; -/*! \addtogroup stream Osmocom Stream Socket - * @{ + LOGP(DLINP, LOGL_INFO, "sizes of 'struct sctp_event_subscribe': compile-time %zu, kernel: %u\n", + sizeof(struct sctp_event_subscribe), sctp_sockopt_event_subscribe_size); + return 0; +} + +/* Attempt to work around Linux kernel ABI breakage * - * This code is intended to abstract any use of stream-type sockets, - * such as TCP and SCTP. It offers both server and client side - * implementations, fully integrated with the libosmocore select loop - * abstraction. - */ + * The Linux kernel ABI for the SCTP_EVENTS socket option has been broken repeatedly. + * - until commit 35ea82d611da59f8bea44a37996b3b11bb1d3fd7 ( kernel < 4.11), the size is 10 bytes + * - in 4.11 it is 11 bytes + * - in 4.12 .. 5.4 it is 13 bytes + * - in kernels >= 5.5 it is 14 bytes + * + * This wouldn't be a problem if the kernel didn't have a "stupid" assumption that the structure + * size passed by userspace will match 1:1 the length of the structure at kernel compile time. In + * an ideal world, it would just use the known first bytes and assume the remainder is all zero. + * But as it doesn't do that, let's try to work around this */ +static int sctp_setsockopt_events_linux_workaround(int fd, const struct sctp_event_subscribe *event) +{ -/*! \file stream.c - * \brief Osmocom stream socket helpers - */ + const unsigned int compiletime_size = sizeof(*event); + int rc; -/* - * Platforms that don't have MSG_NOSIGNAL (which disables SIGPIPE) - * usually have SO_NOSIGPIPE (set via setsockopt). - */ -#ifndef MSG_NOSIGNAL -#define MSG_NOSIGNAL 0 -#endif + if (determine_sctp_sockopt_event_subscribe_size() < 0) { + LOGP(DLINP, LOGL_ERROR, "Cannot determine SCTP_EVENTS socket option size\n"); + return -1; + } + + if (compiletime_size == sctp_sockopt_event_subscribe_size) { + /* no kernel workaround needed */ + return setsockopt(fd, IPPROTO_SCTP, SCTP_EVENTS, event, compiletime_size); + } else if (compiletime_size < sctp_sockopt_event_subscribe_size) { + /* we are using an older userspace with a more modern kernel and hence need + * to pad the data */ + uint8_t buf[sctp_sockopt_event_subscribe_size]; + + memcpy(buf, event, compiletime_size); + memset(buf + sizeof(*event), 0, sctp_sockopt_event_subscribe_size - compiletime_size); + return setsockopt(fd, IPPROTO_SCTP, SCTP_EVENTS, buf, sctp_sockopt_event_subscribe_size); + } else /* if (compiletime_size > sctp_sockopt_event_subscribe_size) */ { + /* we are using a newer userspace with an older kernel and hence need to truncate + * the data - but only if the caller didn't try to enable any of the events of the + * truncated portion */ + rc = byte_nonzero((const uint8_t *)event, sctp_sockopt_event_subscribe_size, + compiletime_size); + if (rc >= 0) { + LOGP(DLINP, LOGL_ERROR, "Kernel only supports sctp_event_subscribe of %u bytes, " + "but caller tried to enable more modern event at offset %u\n", + sctp_sockopt_event_subscribe_size, rc); + return -1; + } + + return setsockopt(fd, IPPROTO_SCTP, SCTP_EVENTS, event, sctp_sockopt_event_subscribe_size); + } +} +#endif // HAVE_LIBSCTP -static int sctp_sock_activate_events(int fd) +int stream_sctp_sock_activate_events(int fd) { #ifdef HAVE_LIBSCTP struct sctp_event_subscribe event; @@ -84,32 +159,31 @@ static int sctp_sock_activate_events(int fd) event.sctp_data_io_event = 1; event.sctp_association_event = 1; event.sctp_address_event = 1; - event.sctp_address_event = 1; event.sctp_send_failure_event = 1; event.sctp_peer_error_event = 1; event.sctp_shutdown_event = 1; /* IMPORTANT: Do NOT enable sender_dry_event here, see * https://bugzilla.redhat.com/show_bug.cgi?id=1442784 */ - rc = setsockopt(fd, IPPROTO_SCTP, SCTP_EVENTS, - &event, sizeof(event)); + rc = sctp_setsockopt_events_linux_workaround(fd, &event); if (rc < 0) - LOGP(DLINP, LOGL_ERROR, "couldn't activate SCTP events " - "on FD %u\n", fd); + LOGP(DLINP, LOGL_ERROR, "couldn't activate SCTP events on FD %u\n", fd); return rc; #else return -1; #endif } -static int setsockopt_nodelay(int fd, int proto, int on) +int stream_setsockopt_nodelay(int fd, int proto, int on) { int rc; switch (proto) { +#ifdef HAVE_LIBSCTP case IPPROTO_SCTP: rc = setsockopt(fd, IPPROTO_SCTP, SCTP_NODELAY, &on, sizeof(on)); break; +#endif case IPPROTO_TCP: rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on)); break; @@ -122,1027 +196,145 @@ static int setsockopt_nodelay(int fd, int proto, int on) return rc; } - -/* - * Client side. - */ - -enum osmo_stream_cli_state { - STREAM_CLI_STATE_NONE = 0, - STREAM_CLI_STATE_CONNECTING = 1, - STREAM_CLI_STATE_CONNECTED = 2, - STREAM_CLI_STATE_MAX -}; - -static const struct value_string stream_cli_state_names[] = { - { STREAM_CLI_STATE_NONE, " NONE" }, - { STREAM_CLI_STATE_CONNECTING, "CONNECTING" }, - { STREAM_CLI_STATE_CONNECTED, " CONNECTED" }, - { 0, NULL } -}; - -#define OSMO_STREAM_CLI_F_RECONF (1 << 0) -#define OSMO_STREAM_CLI_F_NODELAY (1 << 1) - -struct osmo_stream_cli { - struct osmo_fd ofd; - struct llist_head tx_queue; - struct osmo_timer_list timer; - enum osmo_stream_cli_state state; - char *addr; - uint16_t port; - char *local_addr; - uint16_t local_port; - uint16_t proto; - int (*connect_cb)(struct osmo_stream_cli *srv); - int (*disconnect_cb)(struct osmo_stream_cli *srv); - int (*read_cb)(struct osmo_stream_cli *srv); - int (*write_cb)(struct osmo_stream_cli *srv); - void *data; - int flags; - int reconnect_timeout; -}; - -void osmo_stream_cli_close(struct osmo_stream_cli *cli); - -/*! \brief Re-connect an Osmocom Stream Client - * If re-connection is enabled for this client - * (which is the case unless negative timeout was explicitly set via osmo_stream_cli_set_reconnect_timeout() call), - * we close any existing connection (if any) and schedule a re-connect timer */ -void osmo_stream_cli_reconnect(struct osmo_stream_cli *cli) -{ - osmo_stream_cli_close(cli); - - if (cli->reconnect_timeout < 0) { - LOGSCLI(cli, LOGL_INFO, "not reconnecting, disabled.\n"); - return; - } - - LOGSCLI(cli, LOGL_INFO, "retrying in %d seconds...\n", - cli->reconnect_timeout); - osmo_timer_schedule(&cli->timer, cli->reconnect_timeout, 0); - cli->state = STREAM_CLI_STATE_CONNECTING; -} - -/*! \brief Close an Osmocom Stream Client - * \param[in] cli Osmocom Stream Client to be closed - * We unregister the socket fd from the osmocom select() loop - * abstraction and close the socket */ -void osmo_stream_cli_close(struct osmo_stream_cli *cli) -{ - if (cli->ofd.fd == -1) - return; - osmo_fd_unregister(&cli->ofd); - close(cli->ofd.fd); - cli->ofd.fd = -1; - - if (cli->state == STREAM_CLI_STATE_CONNECTED) { - LOGSCLI(cli, LOGL_DEBUG, "connection closed\n"); - if (cli->disconnect_cb) - cli->disconnect_cb(cli); - } - - cli->state = STREAM_CLI_STATE_NONE; -} - -static void osmo_stream_cli_read(struct osmo_stream_cli *cli) -{ - LOGSCLI(cli, LOGL_DEBUG, "message received\n"); - - if (cli->read_cb) - cli->read_cb(cli); -} - -static int osmo_stream_cli_write(struct osmo_stream_cli *cli) -{ #ifdef HAVE_LIBSCTP - struct sctp_sndrcvinfo sinfo; -#endif - struct msgb *msg; - struct llist_head *lh; - int ret; - - LOGSCLI(cli, LOGL_DEBUG, "sending data\n"); - - if (llist_empty(&cli->tx_queue)) { - cli->ofd.when &= ~BSC_FD_WRITE; - return 0; - } - lh = cli->tx_queue.next; - llist_del(lh); - msg = llist_entry(lh, struct msgb, list); - - if (cli->state == STREAM_CLI_STATE_CONNECTING) { - LOGSCLI(cli, LOGL_ERROR, "not connected, dropping data!\n"); - return 0; - } - - switch (cli->proto) { -#ifdef HAVE_LIBSCTP - case IPPROTO_SCTP: - memset(&sinfo, 0, sizeof(sinfo)); - sinfo.sinfo_ppid = htonl(msgb_sctp_ppid(msg)); - sinfo.sinfo_stream = msgb_sctp_stream(msg); - ret = sctp_send(cli->ofd.fd, msg->data, msgb_length(msg), - &sinfo, MSG_NOSIGNAL); - break; -#endif - case IPPROTO_TCP: - default: - ret = send(cli->ofd.fd, msg->data, msg->len, 0); - break; - } - if (ret < 0) { - if (errno == EPIPE || errno == ENOTCONN) { - osmo_stream_cli_reconnect(cli); - } - LOGSCLI(cli, LOGL_ERROR, "error to send\n"); - } - msgb_free(msg); - return 0; -} - -static int osmo_stream_cli_fd_cb(struct osmo_fd *ofd, unsigned int what) +static int stream_sctp_recvmsg_trailer(const char *log_pfx, struct msgb *msg, int ret, const struct sctp_sndrcvinfo *sinfo, int flags) { - struct osmo_stream_cli *cli = ofd->data; - int error, ret; - socklen_t len = sizeof(error); - - switch(cli->state) { - case STREAM_CLI_STATE_CONNECTING: - ret = getsockopt(ofd->fd, SOL_SOCKET, SO_ERROR, &error, &len); - if (ret >= 0 && error > 0) { - osmo_stream_cli_reconnect(cli); - return 0; - } - ofd->when &= ~BSC_FD_WRITE; - LOGSCLI(cli, LOGL_DEBUG, "connection done.\n"); - cli->state = STREAM_CLI_STATE_CONNECTED; - if (cli->proto == IPPROTO_SCTP) { -#ifdef SO_NOSIGPIPE - int val = 1; - - ret = setsockopt(ofd->fd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&val, sizeof(val)); - if (ret < 0) - LOGSCLI(cli, LOGL_DEBUG, "Failed setting SO_NOSIGPIPE: %s\n", strerror(errno)); -#endif - sctp_sock_activate_events(ofd->fd); - } - if (cli->connect_cb) - cli->connect_cb(cli); - break; - case STREAM_CLI_STATE_CONNECTED: - if (what & BSC_FD_READ) { - LOGSCLI(cli, LOGL_DEBUG, "connected read\n"); - osmo_stream_cli_read(cli); - } - if (what & BSC_FD_WRITE) { - LOGSCLI(cli, LOGL_DEBUG, "connected write\n"); - osmo_stream_cli_write(cli); - } - break; - default: - break; - } - return 0; -} - -static void cli_timer_cb(void *data); - -/*! \brief Create an Osmocom stream client - * \param[in] ctx talloc context from which to allocate memory - * This function allocates a new \ref osmo_stream_cli and initializes - * it with default values (5s reconnect timer, TCP protocol) */ -struct osmo_stream_cli *osmo_stream_cli_create(void *ctx) -{ - struct osmo_stream_cli *cli; - - cli = talloc_zero(ctx, struct osmo_stream_cli); - if (!cli) - return NULL; - - cli->proto = IPPROTO_TCP; - cli->ofd.fd = -1; - cli->ofd.when |= BSC_FD_READ | BSC_FD_WRITE; - cli->ofd.priv_nr = 0; /* XXX */ - cli->ofd.cb = osmo_stream_cli_fd_cb; - cli->ofd.data = cli; - cli->state = STREAM_CLI_STATE_CONNECTING; - osmo_timer_setup(&cli->timer, cli_timer_cb, cli); - cli->reconnect_timeout = 5; /* default is 5 seconds. */ - INIT_LLIST_HEAD(&cli->tx_queue); - - return cli; -} - -/*! \brief Set the remote address to which we connect - * \param[in] cli Stream Client to modify - * \param[in] addr Remote IP address - */ -void -osmo_stream_cli_set_addr(struct osmo_stream_cli *cli, const char *addr) -{ - osmo_talloc_replace_string(cli, &cli->addr, addr); - cli->flags |= OSMO_STREAM_CLI_F_RECONF; -} - -/*! \brief Set the remote port number to which we connect - * \param[in] cli Stream Client to modify - * \param[in] port Remote port number - */ -void -osmo_stream_cli_set_port(struct osmo_stream_cli *cli, uint16_t port) -{ - cli->port = port; - cli->flags |= OSMO_STREAM_CLI_F_RECONF; -} - -/*! \brief Set the local port number for the socket (to be bound to) - * \param[in] cli Stream Client to modify - * \param[in] port Local port number - */ -void -osmo_stream_cli_set_local_port(struct osmo_stream_cli *cli, uint16_t port) -{ - cli->local_port = port; - cli->flags |= OSMO_STREAM_CLI_F_RECONF; -} - -/*! \brief Set the local address for the socket (to be bound to) - * \param[in] cli Stream Client to modify - * \param[in] port Local host name - */ -void -osmo_stream_cli_set_local_addr(struct osmo_stream_cli *cli, const char *addr) -{ - osmo_talloc_replace_string(cli, &cli->local_addr, addr); - cli->flags |= OSMO_STREAM_CLI_F_RECONF; -} - -/*! \brief Set the protocol for the stream client socket - * \param[in] cli Stream Client to modify - * \param[in] proto Protocol (like IPPROTO_TCP (default), IPPROTO_SCTP, ...) - */ -void -osmo_stream_cli_set_proto(struct osmo_stream_cli *cli, uint16_t proto) -{ - cli->proto = proto; - cli->flags |= OSMO_STREAM_CLI_F_RECONF; -} - -/*! \brief Set the reconnect time of the stream client socket - * \param[in] cli Stream Client to modify - * \param[in] timeout Re-connect timeout in seconds or negative value to disable auto-reconnection */ -void -osmo_stream_cli_set_reconnect_timeout(struct osmo_stream_cli *cli, int timeout) -{ - cli->reconnect_timeout = timeout; -} - -/*! \brief Set application private data of the stream client socket - * \param[in] cli Stream Client to modify - * \param[in] data User-specific data (available in call-back functions) */ -void -osmo_stream_cli_set_data(struct osmo_stream_cli *cli, void *data) -{ - cli->data = data; -} - -/*! \brief Get application private data of the stream client socket - * \param[in] cli Stream Client to modify - * \returns Application private data, as set by \ref osmo_stream_cli_set_data() */ -void *osmo_stream_cli_get_data(struct osmo_stream_cli *cli) -{ - return cli->data; -} - -/*! \brief Get the stream client socket description. - * \param[in] cli Stream Client to examine - * \returns Socket description or NULL in case of error */ -char *osmo_stream_cli_get_sockname(const struct osmo_stream_cli *cli) -{ - static char buf[OSMO_SOCK_NAME_MAXLEN]; - - osmo_sock_get_name_buf(buf, OSMO_SOCK_NAME_MAXLEN, cli->ofd.fd); - - return buf; -} - -/*! \brief Get Osmocom File Descriptor of the stream client socket - * \param[in] cli Stream Client to modify - * \returns Pointer to \ref osmo_fd */ -struct osmo_fd * -osmo_stream_cli_get_ofd(struct osmo_stream_cli *cli) -{ - return &cli->ofd; -} - -/*! \brief Set the call-back function called on connect of the stream client socket - * \param[in] cli Stream Client to modify - * \param[in] connect_cb Call-back function to be called upon connect */ -void -osmo_stream_cli_set_connect_cb(struct osmo_stream_cli *cli, - int (*connect_cb)(struct osmo_stream_cli *cli)) -{ - cli->connect_cb = connect_cb; -} - -/*! \brief Set the call-back function called on disconnect of the stream client socket - * \param[in] cli Stream Client to modify - * \param[in] disconnect_cb Call-back function to be called upon disconnect */ -void osmo_stream_cli_set_disconnect_cb(struct osmo_stream_cli *cli, - int (*disconnect_cb)(struct osmo_stream_cli *cli)) -{ - cli->disconnect_cb = disconnect_cb; -} - -/*! \brief Set the call-back function called to read from the stream client socket - * \param[in] cli Stream Client to modify - * \param[in] read_cb Call-back function to be called when we want to read */ -void -osmo_stream_cli_set_read_cb(struct osmo_stream_cli *cli, - int (*read_cb)(struct osmo_stream_cli *cli)) -{ - cli->read_cb = read_cb; -} - -/*! \brief Destroy a Osmocom stream client (includes close) - * \param[in] cli Stream Client to destroy */ -void osmo_stream_cli_destroy(struct osmo_stream_cli *cli) -{ - osmo_stream_cli_close(cli); - osmo_timer_del(&cli->timer); - msgb_queue_free(&cli->tx_queue); - talloc_free(cli); -} - -/*! \brief DEPRECATED: use osmo_stream_cli_set_reconnect_timeout() or osmo_stream_cli_reconnect() instead! - * Open connection of an Osmocom stream client - * \param[in] cli Stream Client to connect - * \param[in] reconect 1 if we should not automatically reconnect - */ -int osmo_stream_cli_open2(struct osmo_stream_cli *cli, int reconnect) -{ - int ret; - - /* we are reconfiguring this socket, close existing first. */ - if ((cli->flags & OSMO_STREAM_CLI_F_RECONF) && cli->ofd.fd >= 0) - osmo_stream_cli_close(cli); - - cli->flags &= ~OSMO_STREAM_CLI_F_RECONF; - - ret = osmo_sock_init2(AF_INET, SOCK_STREAM, cli->proto, - cli->local_addr, cli->local_port, - cli->addr, cli->port, - OSMO_SOCK_F_CONNECT|OSMO_SOCK_F_BIND|OSMO_SOCK_F_NONBLOCK); - if (ret < 0) { - if (reconnect) - osmo_stream_cli_reconnect(cli); - return ret; + msgb_sctp_msg_flags(msg) = 0; + if (OSMO_LIKELY(sinfo)) { + msgb_sctp_ppid(msg) = ntohl(sinfo->sinfo_ppid); + msgb_sctp_stream(msg) = sinfo->sinfo_stream; } - cli->ofd.fd = ret; - - if (cli->flags & OSMO_STREAM_CLI_F_NODELAY) { - ret = setsockopt_nodelay(cli->ofd.fd, cli->proto, 1); - if (ret < 0) - goto error_close_socket; - } - - if (osmo_fd_register(&cli->ofd) < 0) - goto error_close_socket; - - return 0; - -error_close_socket: - close(ret); - cli->ofd.fd = -1; - return -EIO; -} - -/*! \brief Set the NODELAY socket option to avoid Nagle-like behavior - * Setting this to nodelay=true will automatically set the NODELAY - * socket option on any socket established via \ref osmo_stream_cli_open - * or any re-connect. You have to set this _before_ opening the - * socket. - * \param[in] cli Stream client whose sockets are to be configured - * \param[in] nodelay whether to set (true) NODELAY before connect() - */ -void osmo_stream_cli_set_nodelay(struct osmo_stream_cli *cli, bool nodelay) -{ - if (nodelay) - cli->flags |= OSMO_STREAM_CLI_F_NODELAY; - else - cli->flags &= ~OSMO_STREAM_CLI_F_NODELAY; -} - -/*! \brief Open connection of an Osmocom stream client - * By default the client will automatically reconnect after default timeout. - * To disable this, use osmo_stream_cli_set_reconnect_timeout() before calling this function. - * \param[in] cli Stream Client to connect */ -int osmo_stream_cli_open(struct osmo_stream_cli *cli) -{ - int ret; - - /* we are reconfiguring this socket, close existing first. */ - if ((cli->flags & OSMO_STREAM_CLI_F_RECONF) && cli->ofd.fd >= 0) - osmo_stream_cli_close(cli); - - cli->flags &= ~OSMO_STREAM_CLI_F_RECONF; - - ret = osmo_sock_init2(AF_INET, SOCK_STREAM, cli->proto, - cli->local_addr, cli->local_port, - cli->addr, cli->port, - OSMO_SOCK_F_CONNECT|OSMO_SOCK_F_BIND|OSMO_SOCK_F_NONBLOCK); - if (ret < 0) { - osmo_stream_cli_reconnect(cli); - return ret; - } - cli->ofd.fd = ret; - - if (cli->flags & OSMO_STREAM_CLI_F_NODELAY) { - ret = setsockopt_nodelay(cli->ofd.fd, cli->proto, 1); - if (ret < 0) - goto error_close_socket; - } - - if (osmo_fd_register(&cli->ofd) < 0) - goto error_close_socket; - - return 0; - -error_close_socket: - close(ret); - cli->ofd.fd = -1; - return -EIO; -} -static void cli_timer_cb(void *data) -{ - struct osmo_stream_cli *cli = data; - - LOGSCLI(cli, LOGL_DEBUG, "reconnecting.\n"); - - switch(cli->state) { - case STREAM_CLI_STATE_CONNECTING: - cli->ofd.when |= BSC_FD_READ | BSC_FD_WRITE; - osmo_stream_cli_open(cli); - break; - default: - break; - } -} - -/*! \brief Enqueue data to be sent via an Osmocom stream client - * \param[in] cli Stream Client through which we want to send - * \param[in] msg Message buffer to enqueue in transmit queue */ -void osmo_stream_cli_send(struct osmo_stream_cli *cli, struct msgb *msg) -{ - msgb_enqueue(&cli->tx_queue, msg); - cli->ofd.when |= BSC_FD_WRITE; -} - -/*! \brief Receive data via an Osmocom stream client - * \param[in] cli Stream Client through which we want to send - * \param msg pre-allocate message buffer to which received data is appended - * \returns number of bytes read; <=0 in case of error */ -int osmo_stream_cli_recv(struct osmo_stream_cli *cli, struct msgb *msg) -{ - int ret; - - ret = recv(cli->ofd.fd, msg->data, msg->data_len, 0); - if (ret < 0) { - if (errno == EPIPE || errno == ECONNRESET) { - LOGSCLI(cli, LOGL_ERROR, "lost connection with srv\n"); + if (flags & MSG_NOTIFICATION) { + char buf[512]; + struct osmo_strbuf sb = { .buf = buf, .len = sizeof(buf) }; + int logl = LOGL_INFO; + union sctp_notification *notif = (union sctp_notification *) msg->data; + + OSMO_STRBUF_PRINTF(sb, "%s NOTIFICATION %s flags=0x%x", log_pfx, + osmo_sctp_sn_type_str(notif->sn_header.sn_type), notif->sn_header.sn_flags); + msgb_put(msg, sizeof(union sctp_notification)); + msgb_sctp_msg_flags(msg) = OSMO_STREAM_SCTP_MSG_FLAGS_NOTIFICATION; + ret = -EAGAIN; + + switch (notif->sn_header.sn_type) { + case SCTP_ASSOC_CHANGE: + OSMO_STRBUF_PRINTF(sb, " %s", osmo_sctp_assoc_chg_str(notif->sn_assoc_change.sac_state)); + switch (notif->sn_assoc_change.sac_state) { + case SCTP_COMM_UP: + break; + case SCTP_COMM_LOST: + OSMO_STRBUF_PRINTF(sb, " (err: %s)", + osmo_sctp_sn_error_str(notif->sn_assoc_change.sac_error)); + /* Handle this like a regular disconnect */ + ret = 0; + break; + case SCTP_RESTART: + case SCTP_SHUTDOWN_COMP: + logl = LOGL_NOTICE; + break; + case SCTP_CANT_STR_ASSOC: + break; + } + break; + case SCTP_SEND_FAILED: + logl = LOGL_ERROR; + break; + case SCTP_PEER_ADDR_CHANGE: + { + char addr_str[INET6_ADDRSTRLEN + 10]; + struct sockaddr_storage sa = notif->sn_paddr_change.spc_aaddr; + osmo_sockaddr_to_str_buf(addr_str, sizeof(addr_str), + (const struct osmo_sockaddr *)&sa); + OSMO_STRBUF_PRINTF(sb, " %s %s err=%s", + osmo_sctp_paddr_chg_str(notif->sn_paddr_change.spc_state), addr_str, + (notif->sn_paddr_change.spc_state == SCTP_ADDR_UNREACHABLE) ? + osmo_sctp_sn_error_str(notif->sn_paddr_change.spc_error) : "None"); + } + break; + case SCTP_SHUTDOWN_EVENT: + logl = LOGL_NOTICE; + /* RFC6458 3.1.4: Any attempt to send more data will cause sendmsg() + * to return with an ESHUTDOWN error. */ + break; + case SCTP_REMOTE_ERROR: + logl = LOGL_NOTICE; + OSMO_STRBUF_PRINTF(sb, " %s", osmo_sctp_op_error_str(ntohs(notif->sn_remote_error.sre_error))); + break; } - osmo_stream_cli_reconnect(cli); - return ret; - } else if (ret == 0) { - LOGSCLI(cli, LOGL_ERROR, "connection closed with srv\n"); - osmo_stream_cli_reconnect(cli); + LOGP(DLINP, logl, "%s\n", buf); return ret; } - msgb_put(msg, ret); - LOGSCLI(cli, LOGL_DEBUG, "received %d bytes from srv\n", ret); - return ret; -} -/* - * Server side. - */ - -#define OSMO_STREAM_SRV_F_RECONF (1 << 0) -#define OSMO_STREAM_SRV_F_NODELAY (1 << 1) - -struct osmo_stream_srv_link { - struct osmo_fd ofd; - char *addr; - uint16_t port; - uint16_t proto; - int (*accept_cb)(struct osmo_stream_srv_link *srv, int fd); - void *data; - int flags; -}; - -static int osmo_stream_srv_fd_cb(struct osmo_fd *ofd, unsigned int what) -{ - int ret; - int sock_fd; - struct sockaddr_in sa; - socklen_t sa_len = sizeof(sa); - struct osmo_stream_srv_link *link = ofd->data; + if (OSMO_UNLIKELY(ret > 0 && !sinfo)) + LOGP(DLINP, LOGL_ERROR, "%s sctp_recvmsg without SNDRCV cmsg?!?\n", log_pfx); - ret = accept(ofd->fd, (struct sockaddr *)&sa, &sa_len); - if (ret < 0) { - LOGP(DLINP, LOGL_ERROR, "failed to accept from origin " - "peer, reason=`%s'\n", strerror(errno)); - return ret; - } - LOGP(DLINP, LOGL_DEBUG, "accept()ed new link from %s to port %u\n", - inet_ntoa(sa.sin_addr), link->port); - sock_fd = ret; - - if (link->proto == IPPROTO_SCTP) { - ret = sctp_sock_activate_events(sock_fd); - if (ret < 0) - goto error_close_socket; - } - - if (link->flags & OSMO_STREAM_SRV_F_NODELAY) { - ret = setsockopt_nodelay(sock_fd, link->proto, 1); - if (ret < 0) - goto error_close_socket; - } - - if (!link->accept_cb) { - ret = -ENOTSUP; - goto error_close_socket; - } - - ret = link->accept_cb(link, sock_fd); - if (ret) - goto error_close_socket; - return 0; - -error_close_socket: - close(sock_fd); return ret; } -/*! \brief Create an Osmocom Stream Server Link - * A Stream Server Link is the listen()+accept() "parent" to individual - * Stream Servers - * \param[in] ctx talloc allocation context - * \returns Stream Server Link with default values (TCP) - */ -struct osmo_stream_srv_link *osmo_stream_srv_link_create(void *ctx) -{ - struct osmo_stream_srv_link *link; - - link = talloc_zero(ctx, struct osmo_stream_srv_link); - if (!link) - return NULL; - - link->proto = IPPROTO_TCP; - link->ofd.fd = -1; - link->ofd.when |= BSC_FD_READ | BSC_FD_WRITE; - link->ofd.cb = osmo_stream_srv_fd_cb; - link->ofd.data = link; - - return link; -} - -/*! \brief Set the NODELAY socket option to avoid Nagle-like behavior - * Setting this to nodelay=true will automatically set the NODELAY - * socket option on any socket established via this server link, before - * calling the accept_cb() - * \param[in] link server link whose sockets are to be configured - * \param[in] nodelay whether to set (true) NODELAY after accept - */ -void osmo_stream_srv_link_set_nodelay(struct osmo_stream_srv_link *link, bool nodelay) -{ - if (nodelay) - link->flags |= OSMO_STREAM_SRV_F_NODELAY; - else - link->flags &= ~OSMO_STREAM_SRV_F_NODELAY; -} - -/*! \brief Set the local address to which we bind - * \param[in] link Stream Server Link to modify - * \param[in] addr Local IP address - */ -void osmo_stream_srv_link_set_addr(struct osmo_stream_srv_link *link, - const char *addr) -{ - osmo_talloc_replace_string(link, &link->addr, addr); - link->flags |= OSMO_STREAM_SRV_F_RECONF; -} - -/*! \brief Set the local port number to which we bind - * \param[in] link Stream Server Link to modify - * \param[in] port Local port number - */ -void osmo_stream_srv_link_set_port(struct osmo_stream_srv_link *link, - uint16_t port) -{ - link->port = port; - link->flags |= OSMO_STREAM_SRV_F_RECONF; -} - -/*! \brief Set the protocol for the stream server link - * \param[in] link Stream Server Link to modify - * \param[in] proto Protocol (like IPPROTO_TCP (default), IPPROTO_SCTP, ...) - */ -void -osmo_stream_srv_link_set_proto(struct osmo_stream_srv_link *link, - uint16_t proto) -{ - link->proto = proto; - link->flags |= OSMO_STREAM_SRV_F_RECONF; -} - -/*! \brief Set application private data of the stream server link - * \param[in] link Stream Server Link to modify - * \param[in] data User-specific data (available in call-back functions) */ -void -osmo_stream_srv_link_set_data(struct osmo_stream_srv_link *link, - void *data) -{ - link->data = data; -} - -/*! \brief Get application private data of the stream server link - * \param[in] link Stream Server Link to modify - * \returns Application private data, as set by \ref osmo_stream_cli_set_data() */ -void *osmo_stream_srv_link_get_data(struct osmo_stream_srv_link *link) -{ - return link->data; -} - -/*! \brief Get description of the stream server link e. g. 127.0.0.1:1234 - * \param[in] link Stream Server Link to examine - * \returns Link description or NULL in case of error */ -char *osmo_stream_srv_link_get_sockname(const struct osmo_stream_srv_link *link) -{ - static char buf[INET6_ADDRSTRLEN + 6]; - int rc = osmo_sock_get_local_ip(link->ofd.fd, buf, INET6_ADDRSTRLEN); - if (rc < 0) - return NULL; - - buf[strnlen(buf, INET6_ADDRSTRLEN + 6)] = ':'; - - rc = osmo_sock_get_local_ip_port(link->ofd.fd, buf + strnlen(buf, INET6_ADDRSTRLEN + 6), 6); - if (rc < 0) - return NULL; - - return buf; -} - -/*! \brief Get Osmocom File Descriptor of the stream server link - * \param[in] link Stream Server Link - * \returns Pointer to \ref osmo_fd */ -struct osmo_fd * -osmo_stream_srv_link_get_ofd(struct osmo_stream_srv_link *link) -{ - return &link->ofd; -} - -/*! \brief Set the accept() call-back of the stream server link - * \param[in] link Stream Server Link - * \param[in] accept_cb Call-back function executed upon accept() */ -void osmo_stream_srv_link_set_accept_cb(struct osmo_stream_srv_link *link, - int (*accept_cb)(struct osmo_stream_srv_link *link, int fd)) - -{ - link->accept_cb = accept_cb; -} - -/*! \brief Destroy the stream server link. Closes + Releases Memory. - * \param[in] link Stream Server Link */ -void osmo_stream_srv_link_destroy(struct osmo_stream_srv_link *link) -{ - osmo_stream_srv_link_close(link); - talloc_free(link); -} - -/*! \brief Open the stream server link. This actually initializes the - * underlying socket and binds it to the configured ip/port - * \param[in] link Stream Server Link to open */ -int osmo_stream_srv_link_open(struct osmo_stream_srv_link *link) -{ - int ret; - - if (link->ofd.fd >= 0) { - /* No reconfigure needed for existing socket, we are fine */ - if (!(link->flags & OSMO_STREAM_SRV_F_RECONF)) - return 0; - /* we are reconfiguring this socket, close existing first. */ - osmo_stream_srv_link_close(link); - } - - link->flags &= ~OSMO_STREAM_SRV_F_RECONF; - - ret = osmo_sock_init(AF_INET, SOCK_STREAM, link->proto, - link->addr, link->port, OSMO_SOCK_F_BIND); - if (ret < 0) - return ret; - - link->ofd.fd = ret; - if (osmo_fd_register(&link->ofd) < 0) { - close(ret); - link->ofd.fd = -1; - return -EIO; - } - return 0; -} - -/*! \brief Close the stream server link and unregister from select loop - * Does not destroy the server link, merely closes it! - * \param[in] link Stream Server Link to close */ -void osmo_stream_srv_link_close(struct osmo_stream_srv_link *link) -{ - if (link->ofd.fd == -1) - return; - osmo_fd_unregister(&link->ofd); - close(link->ofd.fd); - link->ofd.fd = -1; -} - -#define OSMO_STREAM_SRV_F_FLUSH_DESTROY (1 << 0) - -struct osmo_stream_srv { - struct osmo_stream_srv_link *srv; - struct osmo_fd ofd; - struct llist_head tx_queue; - int (*closed_cb)(struct osmo_stream_srv *peer); - int (*cb)(struct osmo_stream_srv *peer); - void *data; - int flags; -}; - -static int osmo_stream_srv_read(struct osmo_stream_srv *conn) -{ - int rc = 0; - - LOGP(DLINP, LOGL_DEBUG, "message received\n"); - - if (conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY) { - LOGP(DLINP, LOGL_DEBUG, "Connection is being flushed and closed; ignoring received message\n"); - return 0; - } - - if (conn->cb) - rc = conn->cb(conn); - - return rc; -} - -static void osmo_stream_srv_write(struct osmo_stream_srv *conn) +/*! wrapper for regular synchronous sctp_recvmsg(3) */ +int stream_sctp_recvmsg_wrapper(int fd, struct msgb *msg, const char *log_pfx) { -#ifdef HAVE_LIBSCTP struct sctp_sndrcvinfo sinfo; -#endif - struct msgb *msg; - struct llist_head *lh; + int flags = 0; int ret; - LOGP(DLINP, LOGL_DEBUG, "sending data\n"); - - if (llist_empty(&conn->tx_queue)) { - conn->ofd.when &= ~BSC_FD_WRITE; - return; - } - lh = conn->tx_queue.next; - llist_del(lh); - msg = llist_entry(lh, struct msgb, list); - - switch (conn->srv->proto) { -#ifdef HAVE_LIBSCTP - case IPPROTO_SCTP: - memset(&sinfo, 0, sizeof(sinfo)); - sinfo.sinfo_ppid = htonl(msgb_sctp_ppid(msg)); - sinfo.sinfo_stream = msgb_sctp_stream(msg); - ret = sctp_send(conn->ofd.fd, msg->data, msgb_length(msg), - &sinfo, MSG_NOSIGNAL); - break; -#endif - case IPPROTO_TCP: - default: - ret = send(conn->ofd.fd, msg->data, msg->len, 0); - break; - } - if (ret < 0) { - LOGP(DLINP, LOGL_ERROR, "error to send\n"); - } - msgb_free(msg); - - if (llist_empty(&conn->tx_queue) && (conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY)) - osmo_stream_srv_destroy(conn); + ret = sctp_recvmsg(fd, msg->tail, msgb_tailroom(msg), NULL, NULL, &sinfo, &flags); + return stream_sctp_recvmsg_trailer(log_pfx, msg, ret, &sinfo, flags); } -static int osmo_stream_srv_cb(struct osmo_fd *ofd, unsigned int what) +/*! wrapper for osmo_io asynchronous recvmsg response */ +int stream_iofd_sctp_recvmsg_trailer(struct osmo_io_fd *iofd, struct msgb *msg, int ret, const struct msghdr *msgh) { - struct osmo_stream_srv *conn = ofd->data; - int rc = 0; - - LOGP(DLINP, LOGL_DEBUG, "connected read/write\n"); - if (what & BSC_FD_READ) - rc = osmo_stream_srv_read(conn); - if (rc != -EBADF && (what & BSC_FD_WRITE)) - osmo_stream_srv_write(conn); - - return rc; -} + const struct sctp_sndrcvinfo *sinfo = NULL; + struct cmsghdr *cmsg = NULL; -/*! \brief Create a Stream Server inside the specified link - * \param[in] ctx talloc allocation context from which to allocate - * \param[in] link Stream Server Link to which we belong - * \returns Stream Server in case of success; NULL on error */ -struct osmo_stream_srv * -osmo_stream_srv_create(void *ctx, struct osmo_stream_srv_link *link, - int fd, - int (*cb)(struct osmo_stream_srv *conn), - int (*closed_cb)(struct osmo_stream_srv *conn), void *data) -{ - struct osmo_stream_srv *conn; - - conn = talloc_zero(ctx, struct osmo_stream_srv); - if (conn == NULL) { - LOGP(DLINP, LOGL_ERROR, "cannot allocate new peer in srv, " - "reason=`%s'\n", strerror(errno)); - return NULL; - } - conn->srv = link; - conn->ofd.fd = fd; - conn->ofd.data = conn; - conn->ofd.cb = osmo_stream_srv_cb; - conn->ofd.when = BSC_FD_READ; - conn->cb = cb; - conn->closed_cb = closed_cb; - conn->data = data; - INIT_LLIST_HEAD(&conn->tx_queue); - - if (osmo_fd_register(&conn->ofd) < 0) { - LOGP(DLINP, LOGL_ERROR, "could not register FD\n"); - talloc_free(conn); - return NULL; + for (cmsg = CMSG_FIRSTHDR((struct msghdr *) msgh); cmsg != NULL; + cmsg = CMSG_NXTHDR((struct msghdr *) msgh, cmsg)) { + if (cmsg->cmsg_level == IPPROTO_SCTP && cmsg->cmsg_type == SCTP_SNDRCV) { + sinfo = (const struct sctp_sndrcvinfo *)CMSG_DATA(cmsg); + break; + } } - return conn; -} -/*! \brief Prepare to send out all pending messages on the connection's Tx queue - * and then automatically destroy the stream with osmo_stream_srv_destroy(). - * This function disables queuing of new messages on the connection and also - * disables reception of new messages on the connection. - * \param[in] conn Stream Server to modify */ -void osmo_stream_srv_set_flush_and_destroy(struct osmo_stream_srv *conn) -{ - conn->flags |= OSMO_STREAM_SRV_F_FLUSH_DESTROY; -} - -/*! \brief Set application private data of the stream server - * \param[in] conn Stream Server to modify - * \param[in] data User-specific data (available in call-back functions) */ -void -osmo_stream_srv_set_data(struct osmo_stream_srv *conn, - void *data) -{ - conn->data = data; -} - -/*! \brief Get application private data of the stream server - * \param[in] conn Stream Server - * \returns Application private data, as set by \ref osmo_stream_srv_set_data() */ -void *osmo_stream_srv_get_data(struct osmo_stream_srv *conn) -{ - return conn->data; + return stream_sctp_recvmsg_trailer(osmo_iofd_get_name(iofd), msg, ret, sinfo, msgh->msg_flags); } -/*! \brief Get Osmocom File Descriptor of the stream server - * \param[in] conn Stream Server - * \returns Pointer to \ref osmo_fd */ -struct osmo_fd * -osmo_stream_srv_get_ofd(struct osmo_stream_srv *conn) +/*! Send a message through a connected SCTP socket, similar to sctp_sendmsg(). + * + * Appends the message to the internal transmit queue. + * If the function returns success (0), it will take ownership of the msgb and + * internally call msgb_free() after the write request completes. + * In case of an error the msgb needs to be freed by the caller. + * + * \param[in] iofd file descriptor to write to + * \param[in] msg message buffer to send; uses msgb_sctp_ppid/msg_sctp_stream + * \param[in] sendmsg_flags Flags to pass to the send call + * \returns 0 in case of success; a negative value in case of error + */ +int stream_iofd_sctp_send_msgb(struct osmo_io_fd *iofd, struct msgb *msg, int sendmsg_flags) { - return &conn->ofd; -} + struct msghdr outmsg = {}; + char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; + struct sctp_sndrcvinfo *sinfo; + struct cmsghdr *cmsg; -/*! \brief Get the master (Link) from a Stream Server - * \param[in] conn Stream Server of which we want to know the Link - * \returns Link through which the given Stream Server is established */ -struct osmo_stream_srv_link *osmo_stream_srv_get_master(struct osmo_stream_srv *conn) -{ - return conn->srv; -} + outmsg.msg_control = outcmsg; + outmsg.msg_controllen = sizeof(outcmsg); -/*! \brief Destroy given Stream Server - * This function closes the Stream Server socket, unregisters from - * select loop, invokes the connection's closed_cb() callback to allow API - * users to clean up any associated state they have for this connection, - * and then de-allocates associated memory. - * \param[in] conn Stream Server to be destroyed */ -void osmo_stream_srv_destroy(struct osmo_stream_srv *conn) -{ - close(conn->ofd.fd); - osmo_fd_unregister(&conn->ofd); - if (conn->closed_cb) - conn->closed_cb(conn); - msgb_queue_free(&conn->tx_queue); - talloc_free(conn); -} + cmsg = CMSG_FIRSTHDR(&outmsg); + cmsg->cmsg_level = IPPROTO_SCTP; + cmsg->cmsg_type = SCTP_SNDRCV; + cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); -/*! \brief Enqueue data to be sent via an Osmocom stream server - * \param[in] conn Stream Server through which we want to send - * \param[in] msg Message buffer to enqueue in transmit queue */ -void osmo_stream_srv_send(struct osmo_stream_srv *conn, struct msgb *msg) -{ - if (conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY) { - LOGP(DLINP, LOGL_DEBUG, "Connection is being flushed and closed; ignoring new outgoing message\n"); - return; - } + outmsg.msg_controllen = cmsg->cmsg_len; + sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg); + memset(sinfo, 0, sizeof(struct sctp_sndrcvinfo)); + sinfo->sinfo_ppid = htonl(msgb_sctp_ppid(msg)); + sinfo->sinfo_stream = msgb_sctp_stream(msg); - msgb_enqueue(&conn->tx_queue, msg); - conn->ofd.when |= BSC_FD_WRITE; + return osmo_iofd_sendmsg_msgb(iofd, msg, sendmsg_flags, &outmsg); } - -/*! \brief Receive data via Osmocom stream server - * \param[in] conn Stream Server from which to receive - * \param msg pre-allocate message buffer to which received data is appended - * \returns number of bytes read, negative on error. - */ -int osmo_stream_srv_recv(struct osmo_stream_srv *conn, struct msgb *msg) -{ -#ifdef HAVE_LIBSCTP - struct sctp_sndrcvinfo sinfo; - int flags = 0; #endif - int ret; - - if (!msg) - return -EINVAL; - - switch (conn->srv->proto) { -#ifdef HAVE_LIBSCTP - case IPPROTO_SCTP: - ret = sctp_recvmsg(conn->ofd.fd, msgb_data(msg), msgb_tailroom(msg), - NULL, NULL, &sinfo, &flags); - if (flags & MSG_NOTIFICATION) { - union sctp_notification *notif = (union sctp_notification *) msgb_data(msg); - LOGP(DLINP, LOGL_DEBUG, "NOTIFICATION %u flags=0x%x\n", notif->sn_header.sn_type, notif->sn_header.sn_flags); - switch (notif->sn_header.sn_type) { - case SCTP_ASSOC_CHANGE: - LOGP(DLINP, LOGL_DEBUG, "===> ASSOC CHANGE:"); - switch (notif->sn_assoc_change.sac_state) { - case SCTP_COMM_UP: - LOGPC(DLINP, LOGL_DEBUG, " UP\n"); - break; - case SCTP_COMM_LOST: - LOGPC(DLINP, LOGL_DEBUG, " LOST\n"); - break; - case SCTP_RESTART: - LOGPC(DLINP, LOGL_DEBUG, " RESTART\n"); - break; - case SCTP_SHUTDOWN_COMP: - LOGPC(DLINP, LOGL_DEBUG, " SHUTDOWN COMP\n"); - break; - case SCTP_CANT_STR_ASSOC: - LOGPC(DLINP, LOGL_DEBUG, " CANT STR ASSOC\n"); - break; - } - break; - case SCTP_PEER_ADDR_CHANGE: - LOGP(DLINP, LOGL_DEBUG, "===> PEER ADDR CHANGE\n"); - break; - case SCTP_SHUTDOWN_EVENT: - LOGP(DLINP, LOGL_DEBUG, "===> SHUTDOWN EVT\n"); - /* Handle this like a regular disconnect */ - return 0; - break; - } - return -EAGAIN; - } - msgb_sctp_ppid(msg) = ntohl(sinfo.sinfo_ppid); - msgb_sctp_stream(msg) = sinfo.sinfo_stream; - break; -#endif - case IPPROTO_TCP: - default: - ret = recv(conn->ofd.fd, msgb_data(msg), msgb_tailroom(msg), 0); - break; - } - - if (ret < 0) { - if (errno == EPIPE || errno == ECONNRESET) { - LOGP(DLINP, LOGL_ERROR, - "lost connection with srv\n"); - } - return ret; - } else if (ret == 0) { - LOGP(DLINP, LOGL_ERROR, "connection closed with srv\n"); - return ret; - } - msgb_put(msg, ret); - LOGP(DLINP, LOGL_DEBUG, "received %d bytes from client\n", ret); - return ret; -} -/*! @} */ +/*! \endccond */ diff --git a/src/stream_cli.c b/src/stream_cli.c new file mode 100644 index 0000000..d4067d6 --- /dev/null +++ b/src/stream_cli.c @@ -0,0 +1,1237 @@ +/* (C) 2011 by Pablo Neira Ayuso <pablo@gnumonks.org> + * (C) 2015-2016 by Harald Welte <laforge@gnumonks.org> + * (C) 2023 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de> + * All Rights Reserved. + * + * SPDX-License-Identifier: GPL-2.0+ + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + */ + +#include <stdio.h> +#include <unistd.h> +#include <stdlib.h> +#include <errno.h> +#include <string.h> +#include <time.h> +#include <sys/fcntl.h> +#include <sys/socket.h> +#include <sys/ioctl.h> +#include <arpa/inet.h> +#include <netinet/in.h> +#include <netinet/tcp.h> + +#include <osmocom/core/timer.h> +#include <osmocom/core/select.h> +#include <osmocom/core/utils.h> +#include <osmocom/gsm/tlv.h> +#include <osmocom/core/msgb.h> +#include <osmocom/core/osmo_io.h> +#include <osmocom/core/panic.h> +#include <osmocom/core/logging.h> +#include <osmocom/core/talloc.h> +#include <osmocom/core/socket.h> + +#include <osmocom/netif/stream.h> +#include <osmocom/netif/stream_private.h> + +#include "config.h" + +#include <osmocom/netif/sctp.h> + +/*! \file stream_cli.c */ + +#define LOGSCLI(cli, level, fmt, args...) \ + LOGP(DLINP, level, "CLICONN(%s,%s){%s} " fmt, \ + cli->name ? : "", \ + cli->sockname, \ + get_value_string(stream_cli_state_names, (cli)->state), \ + ## args) + +/* + * Client side. + */ + +enum osmo_stream_cli_state { + STREAM_CLI_STATE_CLOSED, /* No fd associated, no timer active */ + STREAM_CLI_STATE_WAIT_RECONNECT, /* No fd associated, has timer active to try to connect again */ + STREAM_CLI_STATE_CONNECTING, /* Fd associated, but connection not yet confirmed by peer or lower layers */ + STREAM_CLI_STATE_CONNECTED, /* Fd associated and connection is established */ + STREAM_CLI_STATE_MAX +}; + +static const struct value_string stream_cli_state_names[] = { + { STREAM_CLI_STATE_CLOSED, "CLOSED" }, + { STREAM_CLI_STATE_WAIT_RECONNECT, "WAIT_RECONNECT" }, + { STREAM_CLI_STATE_CONNECTING, "CONNECTING" }, + { STREAM_CLI_STATE_CONNECTED, "CONNECTED" }, + { 0, NULL } +}; + +#define OSMO_STREAM_CLI_F_RECONF (1 << 0) +#define OSMO_STREAM_CLI_F_NODELAY (1 << 1) + +struct osmo_stream_cli { + char *name; + char sockname[OSMO_SOCK_NAME_MAXLEN]; + enum osmo_stream_mode mode; + union { + struct osmo_fd ofd; + struct osmo_io_fd *iofd; + }; + struct llist_head tx_queue; + struct osmo_timer_list timer; + enum osmo_stream_cli_state state; + char *addr[OSMO_STREAM_MAX_ADDRS]; + uint8_t addrcnt; + uint16_t port; + char *local_addr[OSMO_STREAM_MAX_ADDRS]; + uint8_t local_addrcnt; + uint16_t local_port; + int sk_domain; + int sk_type; + uint16_t proto; + osmo_stream_cli_connect_cb_t connect_cb; + osmo_stream_cli_disconnect_cb_t disconnect_cb; + osmo_stream_cli_read_cb_t read_cb; + osmo_stream_cli_read_cb2_t iofd_read_cb; + osmo_stream_cli_segmentation_cb_t segmentation_cb; + void *data; + int flags; + int reconnect_timeout; + struct osmo_sock_init2_multiaddr_pars ma_pars; +}; + +void osmo_stream_cli_close(struct osmo_stream_cli *cli); + +/*! \addtogroup stream_cli + * @{ + */ + +/*! Re-connect an Osmocom Stream Client. + * If re-connection is enabled for this client + * (which is the case unless negative timeout was explicitly set via osmo_stream_cli_set_reconnect_timeout() call), + * we close any existing connection (if any) and schedule a re-connect timer */ +void osmo_stream_cli_reconnect(struct osmo_stream_cli *cli) +{ + osmo_stream_cli_close(cli); + + if (cli->reconnect_timeout < 0) { + LOGSCLI(cli, LOGL_INFO, "not reconnecting, disabled\n"); + return; + } + + cli->state = STREAM_CLI_STATE_WAIT_RECONNECT; + LOGSCLI(cli, LOGL_INFO, "retrying reconnect in %d seconds...\n", + cli->reconnect_timeout); + osmo_timer_schedule(&cli->timer, cli->reconnect_timeout, 0); +} + +/*! Check if Osmocom Stream Client is in connected state. + * \param[in] cli Osmocom Stream Client + * \return true if connected, false otherwise + */ +bool osmo_stream_cli_is_connected(struct osmo_stream_cli *cli) +{ + return cli->state == STREAM_CLI_STATE_CONNECTED; +} + +static void osmo_stream_cli_close_iofd(struct osmo_stream_cli *cli) +{ + if (!cli->iofd) + return; + + osmo_iofd_free(cli->iofd); + cli->iofd = NULL; +} + +static void osmo_stream_cli_close_ofd(struct osmo_stream_cli *cli) +{ + if (cli->ofd.fd == -1) + return; + osmo_fd_unregister(&cli->ofd); + close(cli->ofd.fd); + cli->ofd.fd = -1; +} + +/*! Close an Osmocom Stream Client. + * \param[in] cli Osmocom Stream Client to be closed + * We unregister the socket fd from the osmocom select() loop + * abstraction and close the socket */ +void osmo_stream_cli_close(struct osmo_stream_cli *cli) +{ + int old_state = cli->state; + + if (cli->state == STREAM_CLI_STATE_CLOSED) + return; + if (cli->state == STREAM_CLI_STATE_WAIT_RECONNECT) { + osmo_timer_del(&cli->timer); + cli->state = STREAM_CLI_STATE_CLOSED; + return; + } + + + switch (cli->mode) { + case OSMO_STREAM_MODE_OSMO_FD: + osmo_stream_cli_close_ofd(cli); + break; + case OSMO_STREAM_MODE_OSMO_IO: + osmo_stream_cli_close_iofd(cli); + break; + default: + OSMO_ASSERT(false); + } + + cli->state = STREAM_CLI_STATE_CLOSED; + + if (old_state == STREAM_CLI_STATE_CONNECTED) { + LOGSCLI(cli, LOGL_DEBUG, "connection closed\n"); + if (cli->disconnect_cb) + cli->disconnect_cb(cli); + } +} + +/*! Retrieve file descriptor of the stream client socket. + * \param[in] cli Stream Client of which we want to obtain the file descriptor + * \returns File descriptor or negative in case of error */ +int +osmo_stream_cli_get_fd(const struct osmo_stream_cli *cli) +{ + switch (cli->mode) { + case OSMO_STREAM_MODE_OSMO_FD: + return cli->ofd.fd; + case OSMO_STREAM_MODE_OSMO_IO: + if (cli->iofd) + return osmo_iofd_get_fd(cli->iofd); + default: + break; + } + return -EINVAL; +} + +/*! Retrieve osmo_io descriptor of the stream client socket. + * This function must not be called on a stream client in legacy osmo_fd mode! + * The iofd is only valid once/after osmo_stream_cli_open() has successfully returned. + * \param[in] cli Stream Client of which we want to obtain the file descriptor + * \returns osmo_io_fd of stream client, or NULL if stream not yet opened. */ +struct osmo_io_fd * +osmo_stream_cli_get_iofd(const struct osmo_stream_cli *cli) +{ + OSMO_ASSERT(cli->mode == OSMO_STREAM_MODE_OSMO_IO); + return cli->iofd; +} + +static void osmo_stream_cli_read(struct osmo_stream_cli *cli) +{ + LOGSCLI(cli, LOGL_DEBUG, "message received\n"); + + if (cli->read_cb) + cli->read_cb(cli); +} + +static int osmo_stream_cli_write(struct osmo_stream_cli *cli) +{ +#ifdef HAVE_LIBSCTP + struct sctp_sndrcvinfo sinfo; +#endif + struct msgb *msg; + int ret; + + if (llist_empty(&cli->tx_queue)) { + osmo_fd_write_disable(&cli->ofd); + return 0; + } + msg = llist_first_entry(&cli->tx_queue, struct msgb, list); + llist_del(&msg->list); + + if (!osmo_stream_cli_is_connected(cli)) { + LOGSCLI(cli, LOGL_ERROR, "send: not connected, dropping data!\n"); + return 0; + } + + LOGSCLI(cli, LOGL_DEBUG, "sending %u bytes of data\n", msgb_length(msg)); + + switch (cli->sk_domain) { + case AF_UNIX: + ret = send(cli->ofd.fd, msgb_data(msg), msgb_length(msg), 0); + break; + case AF_UNSPEC: + case AF_INET: + case AF_INET6: + switch (cli->proto) { +#ifdef HAVE_LIBSCTP + case IPPROTO_SCTP: + memset(&sinfo, 0, sizeof(sinfo)); + sinfo.sinfo_ppid = htonl(msgb_sctp_ppid(msg)); + sinfo.sinfo_stream = msgb_sctp_stream(msg); + ret = sctp_send(cli->ofd.fd, msgb_data(msg), msgb_length(msg), + &sinfo, MSG_NOSIGNAL); + break; +#endif + case IPPROTO_TCP: + default: + ret = send(cli->ofd.fd, msgb_data(msg), msgb_length(msg), 0); + break; + } + break; + default: + ret = -ENOTSUP; + } + + if (ret >= 0 && ret < msgb_length(msg)) { + LOGSCLI(cli, LOGL_ERROR, "short send: %d < exp %u\n", ret, msgb_length(msg)); + /* Update msgb and re-add it at the start of the queue: */ + msgb_pull(msg, ret); + llist_add(&msg->list, &cli->tx_queue); + return 0; + } + + if (ret < 0) { + int err = errno; + LOGSCLI(cli, LOGL_ERROR, "send(len=%u) error: %s\n", msgb_length(msg), strerror(err)); + if (err == EAGAIN) { + /* Re-add at the start of the queue to re-attempt: */ + llist_add(&msg->list, &cli->tx_queue); + return 0; + } + msgb_free(msg); + osmo_stream_cli_reconnect(cli); + return 0; + } + + msgb_free(msg); + + if (llist_empty(&cli->tx_queue)) + osmo_fd_write_disable(&cli->ofd); + + return 0; +} + +static int _setsockopt_nosigpipe(struct osmo_stream_cli *cli) +{ +#ifdef SO_NOSIGPIPE + int ret; + int val = 1; + ret = setsockopt(osmo_stream_cli_get_fd(cli), SOL_SOCKET, SO_NOSIGPIPE, (void *)&val, sizeof(val)); + if (ret < 0) + LOGSCLI(cli, LOGL_ERROR, "Failed setting SO_NOSIGPIPE: %s\n", strerror(errno)); + return ret; +#else + return 0; +#endif +} + +static void stream_cli_handle_connecting(struct osmo_stream_cli *cli, int res) +{ + int error, ret = res; + socklen_t len = sizeof(error); + + int fd = osmo_stream_cli_get_fd(cli); + OSMO_ASSERT(fd >= 0); + + if (ret < 0) { + osmo_stream_cli_reconnect(cli); + return; + } + ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len); + if (ret >= 0 && error > 0) { + osmo_stream_cli_reconnect(cli); + return; + } + + /* If messages got enqueued while 'connecting', keep WRITE flag + up to dispatch them upon next main loop step */ + if (cli->mode == OSMO_STREAM_MODE_OSMO_FD && llist_empty(&cli->tx_queue)) + osmo_fd_write_disable(&cli->ofd); + + /* Update sockname based on socket info: */ + osmo_sock_get_name_buf(cli->sockname, sizeof(cli->sockname), osmo_stream_cli_get_fd(cli)); + + LOGSCLI(cli, LOGL_INFO, "connection established\n"); + cli->state = STREAM_CLI_STATE_CONNECTED; + switch (cli->sk_domain) { + case AF_UNIX: + _setsockopt_nosigpipe(cli); + break; + case AF_UNSPEC: + case AF_INET: + case AF_INET6: + if (cli->proto == IPPROTO_SCTP) { + _setsockopt_nosigpipe(cli); + stream_sctp_sock_activate_events(fd); + } + break; + default: + break; + } + if (cli->connect_cb) + cli->connect_cb(cli); +} + +static int osmo_stream_cli_fd_cb(struct osmo_fd *ofd, unsigned int what) +{ + struct osmo_stream_cli *cli = ofd->data; + + switch (cli->state) { + case STREAM_CLI_STATE_CONNECTING: + stream_cli_handle_connecting(cli, 0); + break; + case STREAM_CLI_STATE_CONNECTED: + if (what & OSMO_FD_READ) { + LOGSCLI(cli, LOGL_DEBUG, "connected read\n"); + osmo_stream_cli_read(cli); + } + if (what & OSMO_FD_WRITE) { + LOGSCLI(cli, LOGL_DEBUG, "connected write\n"); + osmo_stream_cli_write(cli); + } + break; + default: + /* Only CONNECTING and CONNECTED states are expected, since they are the only states + * where FD exists: */ + osmo_panic("%s() called with unexpected state %d\n", __func__, cli->state); + } + return 0; +} + +static void cli_timer_cb(void *data); + +/*! Create an Osmocom stream client. + * \param[in] ctx talloc context from which to allocate memory + * This function allocates a new \ref osmo_stream_cli and initializes + * it with default values (5s reconnect timer, TCP protocol) + * \return allocated stream client, or NULL in case of error + */ +struct osmo_stream_cli *osmo_stream_cli_create(void *ctx) +{ + struct osmo_stream_cli *cli; + + cli = talloc_zero(ctx, struct osmo_stream_cli); + if (!cli) + return NULL; + + cli->mode = OSMO_STREAM_MODE_UNKNOWN; + cli->sk_domain = AF_UNSPEC; + cli->sk_type = SOCK_STREAM; + cli->proto = IPPROTO_TCP; + + cli->state = STREAM_CLI_STATE_CLOSED; + osmo_timer_setup(&cli->timer, cli_timer_cb, cli); + cli->reconnect_timeout = 5; /* default is 5 seconds. */ + cli->segmentation_cb = NULL; + INIT_LLIST_HEAD(&cli->tx_queue); + + cli->ma_pars.sctp.version = 0; + + return cli; +} + +static void stream_cli_iofd_read_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg) +{ + struct osmo_stream_cli *cli = osmo_iofd_get_data(iofd); + + switch (cli->state) { + case STREAM_CLI_STATE_CONNECTING: + msgb_free(msg); + stream_cli_handle_connecting(cli, res); + break; + case STREAM_CLI_STATE_CONNECTED: + switch (res) { + case -EPIPE: + case -ECONNRESET: + LOGSCLI(cli, LOGL_ERROR, "lost connection with srv (%d)\n", res); + osmo_stream_cli_reconnect(cli); + break; + case 0: + LOGSCLI(cli, LOGL_NOTICE, "connection closed with srv\n"); + osmo_stream_cli_reconnect(cli); + break; + default: + LOGSCLI(cli, LOGL_DEBUG, "received %d bytes from srv\n", res); + break; + } + /* Notify user of new data or error: */ + if (cli->iofd_read_cb) + cli->iofd_read_cb(cli, res, msg); + else + msgb_free(msg); + break; + default: + osmo_panic("%s() called with unexpected state %d\n", __func__, cli->state); + } +} + +static void stream_cli_iofd_write_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg) +{ + struct osmo_stream_cli *cli = osmo_iofd_get_data(iofd); + + switch (cli->state) { + case STREAM_CLI_STATE_CONNECTING: + stream_cli_handle_connecting(cli, res); + break; + case STREAM_CLI_STATE_CONNECTED: + if (msg && res <= 0) { + osmo_stream_cli_reconnect(cli); + LOGSCLI(cli, LOGL_ERROR, "received error %d in response to send\n", res); + } + break; + default: + osmo_panic("%s() called with unexpected state %d\n", __func__, cli->state); + } +} + +static const struct osmo_io_ops osmo_stream_cli_ioops = { + .read_cb = stream_cli_iofd_read_cb, + .write_cb = stream_cli_iofd_write_cb, + + .segmentation_cb = NULL, +}; + +#ifdef HAVE_LIBSCTP +static void stream_cli_iofd_recvmsg_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg, const struct msghdr *msgh) +{ + struct osmo_stream_cli *cli = osmo_iofd_get_data(iofd); + + res = stream_iofd_sctp_recvmsg_trailer(iofd, msg, res, msgh); + + switch (cli->state) { + case STREAM_CLI_STATE_CONNECTING: + msgb_free(msg); + stream_cli_handle_connecting(cli, res); + break; + case STREAM_CLI_STATE_CONNECTED: + switch (res) { + case -EPIPE: + case -ECONNRESET: + LOGSCLI(cli, LOGL_ERROR, "lost connection with srv (%d)\n", res); + osmo_stream_cli_reconnect(cli); + break; + case 0: + LOGSCLI(cli, LOGL_NOTICE, "connection closed with srv\n"); + osmo_stream_cli_reconnect(cli); + break; + default: + break; + } + /* Notify user of new data or error: */ + if (cli->iofd_read_cb) + cli->iofd_read_cb(cli, res, msg); + else + msgb_free(msg); + break; + default: + osmo_panic("%s() called with unexpected state %d\n", __func__, cli->state); + } +} + +static const struct osmo_io_ops osmo_stream_cli_ioops_sctp = { + .recvmsg_cb = stream_cli_iofd_recvmsg_cb, + .sendmsg_cb = stream_cli_iofd_write_cb, + + .segmentation_cb = NULL, +}; +#endif + + +/*! Set a name on the cli object (used during logging). + * \param[in] cli stream_cli whose name is to be set + * \param[in] name the name to be set on cli + */ +void osmo_stream_cli_set_name(struct osmo_stream_cli *cli, const char *name) +{ + osmo_talloc_replace_string(cli, &cli->name, name); + if (cli->mode == OSMO_STREAM_MODE_OSMO_IO && cli->iofd) + osmo_iofd_set_name(cli->iofd, name); +} + +/*! Retrieve name previously set on the cli object (see osmo_stream_cli_set_name()). + * \param[in] cli stream_cli whose name is to be retrieved + * \returns The name to be set on cli; NULL if never set + */ +const char *osmo_stream_cli_get_name(const struct osmo_stream_cli *cli) +{ + return cli->name; +} + +/*! Set the remote address to which we connect. + * Any changes to this setting will only become active upon next (re)connect. + * \param[in] cli Stream Client to modify + * \param[in] addr Remote IP address + */ +void +osmo_stream_cli_set_addr(struct osmo_stream_cli *cli, const char *addr) +{ + osmo_stream_cli_set_addrs(cli, &addr, 1); +} + +/*! Set the remote address set to which we connect. + * Useful for protocols allowing connecting to more than one address (such as SCTP) + * Any changes to this setting will only become active upon next (re)connect. + * \param[in] cli Stream Client to modify + * \param[in] addr Remote IP address set + * \return negative on error, 0 on success + */ +int osmo_stream_cli_set_addrs(struct osmo_stream_cli *cli, const char **addr, size_t addrcnt) +{ + int i = 0; + + if (addrcnt > OSMO_STREAM_MAX_ADDRS) + return -EINVAL; + + for (; i < addrcnt; i++) + osmo_talloc_replace_string(cli, &cli->addr[i], addr[i]); + for (; i < cli->addrcnt; i++) { + talloc_free(cli->addr[i]); + cli->addr[i] = NULL; + } + + cli->addrcnt = addrcnt; + cli->flags |= OSMO_STREAM_CLI_F_RECONF; + return 0; +} + +/*! Set the remote port number to which we connect. + * Any changes to this setting will only become active upon next (re)connect. + * \param[in] cli Stream Client to modify + * \param[in] port Remote port number + */ +void +osmo_stream_cli_set_port(struct osmo_stream_cli *cli, uint16_t port) +{ + cli->port = port; + cli->flags |= OSMO_STREAM_CLI_F_RECONF; +} + +/*! Set the local port number for the socket (to be bound to). + * Any changes to this setting will only become active upon next (re)connect. + * \param[in] cli Stream Client to modify + * \param[in] port Local port number + */ +void +osmo_stream_cli_set_local_port(struct osmo_stream_cli *cli, uint16_t port) +{ + cli->local_port = port; + cli->flags |= OSMO_STREAM_CLI_F_RECONF; +} + +/*! Set the local address for the socket (to be bound to). + * Any changes to this setting will only become active upon next (re)connect. + * \param[in] cli Stream Client to modify + * \param[in] port Local host name + */ +void +osmo_stream_cli_set_local_addr(struct osmo_stream_cli *cli, const char *addr) +{ + osmo_stream_cli_set_local_addrs(cli, &addr, 1); +} + +/*! Set the local address set to which we bind. + * Useful for protocols allowing bind to more than one address (such as SCTP) + * Any changes to this setting will only become active upon next (re)connect. + * \param[in] cli Stream Client to modify + * \param[in] addr Local IP address set + * \return negative on error, 0 on success + */ +int osmo_stream_cli_set_local_addrs(struct osmo_stream_cli *cli, const char **addr, size_t addrcnt) +{ + int i = 0; + + if (addrcnt > OSMO_STREAM_MAX_ADDRS) + return -EINVAL; + + for (; i < addrcnt; i++) + osmo_talloc_replace_string(cli, &cli->local_addr[i], addr[i]); + for (; i < cli->local_addrcnt; i++) { + talloc_free(cli->local_addr[i]); + cli->local_addr[i] = NULL; + } + + cli->local_addrcnt = addrcnt; + cli->flags |= OSMO_STREAM_CLI_F_RECONF; + return 0; +} + +/*! Set the protocol for the stream client socket. + * Any changes to this setting will only become active upon next (re)connect. + * \param[in] cli Stream Client to modify + * \param[in] proto Protocol (like IPPROTO_TCP (default), IPPROTO_SCTP, ...) + */ +void +osmo_stream_cli_set_proto(struct osmo_stream_cli *cli, uint16_t proto) +{ + cli->proto = proto; + cli->flags |= OSMO_STREAM_CLI_F_RECONF; +} + +/* Configure client side segmentation for the iofd */ +static void configure_cli_segmentation_cb(struct osmo_stream_cli *cli, + osmo_stream_cli_segmentation_cb_t segmentation_cb) +{ + /* Copy default settings */ + struct osmo_io_ops client_ops; + osmo_iofd_get_ioops(cli->iofd, &client_ops); + /* Set segmentation cb for this client */ + client_ops.segmentation_cb = segmentation_cb; + osmo_iofd_set_ioops(cli->iofd, &client_ops); +} + +/*! Set the segmentation callback for the client. + * \param[in,out] cli Stream Client to modify + * \param[in] segmentation_cb Target segmentation callback + */ +void osmo_stream_cli_set_segmentation_cb(struct osmo_stream_cli *cli, + osmo_stream_cli_segmentation_cb_t segmentation_cb) +{ + cli->segmentation_cb = segmentation_cb; + if (cli->iofd) /* Otherwise, this will be done in osmo_stream_cli_open() */ + configure_cli_segmentation_cb(cli, segmentation_cb); +} + +/*! Set the socket type for the stream server link. + * Any changes to this setting will only become active upon next (re)connect. + * \param[in] cli Stream Client to modify + * \param[in] type Socket Type (like SOCK_STREAM (default), SOCK_SEQPACKET, ...) + * \returns zero on success, negative -errno on error. + */ +int osmo_stream_cli_set_type(struct osmo_stream_cli *cli, int type) +{ + switch (type) { + case SOCK_STREAM: + case SOCK_SEQPACKET: + break; + default: + return -ENOTSUP; + } + cli->sk_type = type; + cli->flags |= OSMO_STREAM_CLI_F_RECONF; + return 0; +} + +/*! Set the socket type for the stream server link. + * Any changes to this setting will only become active upon next (re)connect. + * \param[in] cli Stream Client to modify + * \param[in] type Socket Domain (like AF_UNSPEC (default for IP), AF_UNIX, AF_INET, ...) + * \returns zero on success, negative -errno on error. + */ +int osmo_stream_cli_set_domain(struct osmo_stream_cli *cli, int domain) +{ + switch (domain) { + case AF_UNSPEC: + case AF_INET: + case AF_INET6: + case AF_UNIX: + break; + default: + return -ENOTSUP; + } + cli->sk_domain = domain; + cli->flags |= OSMO_STREAM_CLI_F_RECONF; + return 0; +} + +/*! Set the reconnect time of the stream client socket. + * \param[in] cli Stream Client to modify + * \param[in] timeout Re-connect timeout in seconds or negative value to disable auto-reconnection */ +void +osmo_stream_cli_set_reconnect_timeout(struct osmo_stream_cli *cli, int timeout) +{ + cli->reconnect_timeout = timeout; +} + +/*! Set application private data of the stream client socket. + * \param[in] cli Stream Client to modify + * \param[in] data User-specific data (available in call-back functions) */ +void +osmo_stream_cli_set_data(struct osmo_stream_cli *cli, void *data) +{ + cli->data = data; +} + +/*! Retrieve application private data of the stream client socket. + * \param[in] cli Stream Client to modify + * \returns Application private data, as set by \ref osmo_stream_cli_set_data() */ +void *osmo_stream_cli_get_data(struct osmo_stream_cli *cli) +{ + return cli->data; +} + +/*! Retrieve the stream client socket description. + * Calling this function will build a string that describes the socket in terms of its local/remote + * address/port. The returned name is stored in a static buffer; it is hence not re-entrant or thread-safe. + * \param[in] cli Stream Client to examine + * \returns Socket description or NULL in case of error */ +char *osmo_stream_cli_get_sockname(const struct osmo_stream_cli *cli) +{ + static char buf[OSMO_STREAM_MAX_ADDRS * OSMO_SOCK_NAME_MAXLEN]; + + osmo_sock_multiaddr_get_name_buf(buf, sizeof(buf), + osmo_stream_cli_get_fd(cli), cli->proto); + + return buf; +} + +/*! Retrieve Osmocom File Descriptor of the stream client socket. + * This function only works in case you operate osmo_stream_cli in osmo_fd mode! + * \param[in] cli Stream Client to modify + * \returns Pointer to \ref osmo_fd */ +struct osmo_fd * +osmo_stream_cli_get_ofd(struct osmo_stream_cli *cli) +{ + OSMO_ASSERT(cli->mode == OSMO_STREAM_MODE_OSMO_FD); + return &cli->ofd; +} + +/*! Set the call-back function called on connect of the stream client socket. + * The call-back function registered via this function will be called upon completion of the non-blocking + * outbound connect operation. + * \param[in] cli Stream Client to modify + * \param[in] connect_cb Call-back function to be called upon connect */ +void +osmo_stream_cli_set_connect_cb(struct osmo_stream_cli *cli, + osmo_stream_cli_connect_cb_t connect_cb) +{ + cli->connect_cb = connect_cb; +} + +/*! Set the call-back function called on disconnect of the stream client socket. + * \param[in] cli Stream Client to modify + * \param[in] disconnect_cb Call-back function to be called upon disconnect */ +void osmo_stream_cli_set_disconnect_cb(struct osmo_stream_cli *cli, + osmo_stream_cli_disconnect_cb_t disconnect_cb) +{ + cli->disconnect_cb = disconnect_cb; +} + +/*! Set the call-back function called to read from the stream client socket. + * This function will implicitly configure osmo_stream_cli to use legacy osmo_ofd mode. + * \param[in] cli Stream Client to modify + * \param[in] read_cb Call-back function to be called when we want to read */ +void +osmo_stream_cli_set_read_cb(struct osmo_stream_cli *cli, + osmo_stream_cli_read_cb_t read_cb) +{ + OSMO_ASSERT(cli->mode != OSMO_STREAM_MODE_OSMO_IO); + cli->mode = OSMO_STREAM_MODE_OSMO_FD; + cli->read_cb = read_cb; +} + +/*! Set the call-back function called to read from the stream client socket. + * This function will implicitly configure osmo_stream_cli to use osmo_iofd mode. + * \param[in] cli Stream Client to modify + * \param[in] read_cb Call-back function to be called when data was read from the socket */ +void +osmo_stream_cli_set_read_cb2(struct osmo_stream_cli *cli, + osmo_stream_cli_read_cb2_t read_cb) +{ + OSMO_ASSERT(cli->mode != OSMO_STREAM_MODE_OSMO_FD); + cli->mode = OSMO_STREAM_MODE_OSMO_IO; + cli->iofd_read_cb = read_cb; +} + +/*! Destroy a Osmocom stream client (includes close). + * \param[in] cli Stream Client to destroy */ +void osmo_stream_cli_destroy(struct osmo_stream_cli *cli) +{ + osmo_stream_cli_close(cli); + osmo_timer_del(&cli->timer); + msgb_queue_free(&cli->tx_queue); + talloc_free(cli); +} + +/*! DEPRECATED: use osmo_stream_cli_set_reconnect_timeout() or osmo_stream_cli_reconnect() instead! + * Open connection of an Osmocom stream client + * \param[in] cli Stream Client to connect + * \param[in] reconect 1 if we should not automatically reconnect + * \return negative on error, 0 on success + */ +int osmo_stream_cli_open2(struct osmo_stream_cli *cli, int reconnect) +{ + int ret; + + /* we are reconfiguring this socket, close existing first. */ + if ((cli->flags & OSMO_STREAM_CLI_F_RECONF) && cli->ofd.fd >= 0) + osmo_stream_cli_close(cli); + + cli->flags &= ~OSMO_STREAM_CLI_F_RECONF; + + switch (cli->proto) { +#ifdef HAVE_LIBSCTP + case IPPROTO_SCTP: + ret = osmo_sock_init2_multiaddr2(AF_UNSPEC, SOCK_STREAM, cli->proto, + (const char **)cli->local_addr, cli->local_addrcnt, cli->local_port, + (const char **)cli->addr, cli->addrcnt, cli->port, + OSMO_SOCK_F_CONNECT|OSMO_SOCK_F_BIND|OSMO_SOCK_F_NONBLOCK, + &cli->ma_pars); + break; +#endif + default: + ret = osmo_sock_init2(AF_UNSPEC, SOCK_STREAM, cli->proto, + cli->local_addr[0], cli->local_port, + cli->addr[0], cli->port, + OSMO_SOCK_F_CONNECT|OSMO_SOCK_F_BIND|OSMO_SOCK_F_NONBLOCK); + } + + if (ret < 0) { + if (reconnect) + osmo_stream_cli_reconnect(cli); + return ret; + } + osmo_fd_setup(&cli->ofd, ret, OSMO_FD_READ | OSMO_FD_WRITE, osmo_stream_cli_fd_cb, cli, 0); + + if (cli->flags & OSMO_STREAM_CLI_F_NODELAY) { + ret = stream_setsockopt_nodelay(cli->ofd.fd, cli->proto, 1); + if (ret < 0) + goto error_close_socket; + } + + if (osmo_fd_register(&cli->ofd) < 0) + goto error_close_socket; + + cli->state = STREAM_CLI_STATE_CONNECTING; + return 0; + +error_close_socket: + close(cli->ofd.fd); + cli->ofd.fd = -1; + return -EIO; +} + +/*! Set the NODELAY socket option to avoid Nagle-like behavior. + * Setting this to nodelay=true will automatically set the NODELAY + * socket option on any socket established via \ref osmo_stream_cli_open + * or any re-connect. You have to set this _before_ opening the + * socket. + * \param[in] cli Stream client whose sockets are to be configured + * \param[in] nodelay whether to set (true) NODELAY before connect() + */ +void osmo_stream_cli_set_nodelay(struct osmo_stream_cli *cli, bool nodelay) +{ + if (nodelay) + cli->flags |= OSMO_STREAM_CLI_F_NODELAY; + else + cli->flags &= ~OSMO_STREAM_CLI_F_NODELAY; +} + +/*! Open connection of an Osmocom stream client. + * This will initiate an non-blocking outbound connect to the configured destination (server) address. + * By default the client will automatically attempt to reconnect after default timeout. + * To disable this, use osmo_stream_cli_set_reconnect_timeout() before calling this function. + * \param[in] cli Stream Client to connect + * \return negative on error, 0 on success */ +int osmo_stream_cli_open(struct osmo_stream_cli *cli) +{ + int ret, flags; + int fd = -1; + unsigned int local_addrcnt; + + /* we are reconfiguring this socket, close existing first. */ + if ((cli->flags & OSMO_STREAM_CLI_F_RECONF) && osmo_stream_cli_get_fd(cli) >= 0) + osmo_stream_cli_close(cli); + + cli->flags &= ~OSMO_STREAM_CLI_F_RECONF; + + switch (cli->sk_domain) { + case AF_UNIX: + ret = osmo_sock_unix_init(cli->sk_type, 0, cli->addr[0], OSMO_SOCK_F_CONNECT|OSMO_SOCK_F_BIND|OSMO_SOCK_F_NONBLOCK); + break; + case AF_INET: + case AF_INET6: + case AF_UNSPEC: + switch (cli->proto) { +#ifdef HAVE_LIBSCTP + case IPPROTO_SCTP: + local_addrcnt = cli->local_addrcnt; + flags = OSMO_SOCK_F_CONNECT|OSMO_SOCK_F_NONBLOCK; + if (cli->local_addrcnt > 0 || cli->local_port > 0) { /* explicit bind required? */ + flags |= OSMO_SOCK_F_BIND; + /* If no local addr configured, use local_addr[0]=NULL by default when creating the socket. */ + if (cli->local_addrcnt == 0) + local_addrcnt = 1; + } + ret = osmo_sock_init2_multiaddr2(cli->sk_domain, cli->sk_type, cli->proto, + (const char **)cli->local_addr, local_addrcnt, cli->local_port, + (const char **)cli->addr, cli->addrcnt, cli->port, + flags, &cli->ma_pars); + break; +#endif + default: + ret = osmo_sock_init2(cli->sk_domain, cli->sk_type, cli->proto, + cli->local_addr[0], cli->local_port, + cli->addr[0], cli->port, + OSMO_SOCK_F_CONNECT|OSMO_SOCK_F_BIND|OSMO_SOCK_F_NONBLOCK); + } + break; + default: + return -ENOTSUP; + } + + if (ret < 0) { + osmo_stream_cli_reconnect(cli); + return ret; + } + + fd = ret; + + if (cli->flags & OSMO_STREAM_CLI_F_NODELAY) { + ret = stream_setsockopt_nodelay(fd, cli->proto, 1); + if (ret < 0) + goto error_close_socket; + } + + switch (cli->mode) { + case OSMO_STREAM_MODE_OSMO_FD: + osmo_fd_setup(&cli->ofd, fd, OSMO_FD_READ | OSMO_FD_WRITE, osmo_stream_cli_fd_cb, cli, 0); + if (osmo_fd_register(&cli->ofd) < 0) + goto error_close_socket; + break; + case OSMO_STREAM_MODE_OSMO_IO: + /* Be sure that previous osmo_io instance is freed before creating a new one. */ + osmo_stream_cli_close_iofd(cli); +#ifdef HAVE_LIBSCTP + if (cli->proto == IPPROTO_SCTP) { + cli->iofd = osmo_iofd_setup(cli, fd, cli->name, OSMO_IO_FD_MODE_RECVMSG_SENDMSG, + &osmo_stream_cli_ioops_sctp, cli); + if (cli->iofd) + osmo_iofd_set_cmsg_size(cli->iofd, CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))); + } else { +#else + if (true) { +#endif + cli->iofd = osmo_iofd_setup(cli, fd, cli->name, OSMO_IO_FD_MODE_READ_WRITE, + &osmo_stream_cli_ioops, cli); + } + if (!cli->iofd) + goto error_close_socket; + + osmo_iofd_notify_connected(cli->iofd); + + configure_cli_segmentation_cb(cli, cli->segmentation_cb); + + if (osmo_iofd_register(cli->iofd, fd) < 0) + goto error_close_socket; + break; + default: + OSMO_ASSERT(false); + } + + cli->state = STREAM_CLI_STATE_CONNECTING; + return 0; + +error_close_socket: + cli->state = STREAM_CLI_STATE_CLOSED; + close(fd); + if (cli->mode == OSMO_STREAM_MODE_OSMO_FD) + cli->ofd.fd = -1; + return -EIO; +} + +static void cli_timer_cb(void *data) +{ + struct osmo_stream_cli *cli = data; + + LOGSCLI(cli, LOGL_DEBUG, "reconnecting\n"); + osmo_stream_cli_open(cli); +} + +/*! Enqueue data to be sent via an Osmocom stream client.. + * This is the function you use for writing/sending/transmitting data via the osmo_stream_cli. + * \param[in] cli Stream Client through which we want to send + * \param[in] msg Message buffer to enqueue in transmit queue */ +void osmo_stream_cli_send(struct osmo_stream_cli *cli, struct msgb *msg) +{ + int rc; + + OSMO_ASSERT(cli); + OSMO_ASSERT(msg); + + if (!osmo_stream_cli_is_connected(cli)) { + LOGSCLI(cli, LOGL_ERROR, "send: not connected, dropping data!\n"); + msgb_free(msg); + return; + } + + switch (cli->mode) { + case OSMO_STREAM_MODE_OSMO_FD: + msgb_enqueue(&cli->tx_queue, msg); + osmo_fd_write_enable(&cli->ofd); + break; + case OSMO_STREAM_MODE_OSMO_IO: + /* whenever osmo_stream_cli_is_connected() [see above check], we should have an iofd */ + OSMO_ASSERT(cli->iofd); + if (cli->proto == IPPROTO_SCTP) + rc = stream_iofd_sctp_send_msgb(cli->iofd, msg, MSG_NOSIGNAL); + else + rc = osmo_iofd_write_msgb(cli->iofd, msg); + if (rc < 0) + msgb_free(msg); + break; + default: + OSMO_ASSERT(false); + } +} + +/*! Receive data via an Osmocom stream client in osmo_fd mode. + * \param[in] cli Stream Client through which we want to send + * \param msg pre-allocate message buffer to which received data is appended + * \returns number of bytes read; <=0 in case of error + * + * Application programs using the legacy osmo_fd mode of osmo_stream_cli will use + * this function to read/receive from a stream client socket after they have been notified that + * it is readable (via select/poll). + * + * If conn is an SCTP connection, additional specific considerations shall be taken: + * - msg->cb is always filled with SCTP ppid, and SCTP stream values, see msgb_sctp_*() APIs. + * - If an SCTP notification was received when reading from the SCTP socket, + * msgb_sctp_msg_flags(msg) will contain bit flag + * OSMO_STREAM_SCTP_MSG_FLAGS_NOTIFICATION set, and the msgb will + * contain a "union sctp_notification" instead of user data. In this case the + * return code will be either 0 (if conn is considered dead after the + * notification) or -EAGAIN (if conn is considered still alive after the + * notification) resembling the standard recv() API. + */ +int osmo_stream_cli_recv(struct osmo_stream_cli *cli, struct msgb *msg) +{ + int ret; + OSMO_ASSERT(cli); + OSMO_ASSERT(msg); + OSMO_ASSERT(cli->mode == OSMO_STREAM_MODE_OSMO_FD); + + switch (cli->sk_domain) { + case AF_UNIX: + ret = recv(cli->ofd.fd, msg->tail, msgb_tailroom(msg), 0); + break; + case AF_INET: + case AF_INET6: + case AF_UNSPEC: + switch (cli->proto) { +#ifdef HAVE_LIBSCTP + case IPPROTO_SCTP: + { + char log_pfx[128]; + snprintf(log_pfx, sizeof(log_pfx), "CLICONN(%s,%s)", cli->name ? : "", cli->sockname); + ret = stream_sctp_recvmsg_wrapper(cli->ofd.fd, msg, log_pfx); + break; + } +#endif + case IPPROTO_TCP: + default: + ret = recv(cli->ofd.fd, msg->tail, msgb_tailroom(msg), 0); + break; + } + break; + default: + ret = -ENOTSUP; + } + + if (ret < 0) { + if (ret == -EAGAIN) + return ret; + if (errno == EPIPE || errno == ECONNRESET) + LOGSCLI(cli, LOGL_ERROR, "lost connection with srv\n"); + osmo_stream_cli_reconnect(cli); + return ret; + } else if (ret == 0) { + LOGSCLI(cli, LOGL_ERROR, "connection closed with srv\n"); + osmo_stream_cli_reconnect(cli); + return ret; + } + msgb_put(msg, ret); + LOGSCLI(cli, LOGL_DEBUG, "received %d bytes from srv\n", ret); + return ret; +} + +/*! Clear the transmit queue of the stream client. + * Calling this function wil clear (delete) any pending, not-yet transmitted data from the transmit queue. */ +void osmo_stream_cli_clear_tx_queue(struct osmo_stream_cli *cli) +{ + switch (cli->mode) { + case OSMO_STREAM_MODE_OSMO_FD: + msgb_queue_free(&cli->tx_queue); + /* If in state 'connecting', keep WRITE flag up to receive + * socket connection signal and then transition to STATE_CONNECTED: */ + if (cli->state == STREAM_CLI_STATE_CONNECTED) + osmo_fd_write_disable(&cli->ofd); + break; + case OSMO_STREAM_MODE_OSMO_IO: + osmo_iofd_txqueue_clear(cli->iofd); + break; + default: + OSMO_ASSERT(false); + } +} + +/*! Set given parameter of stream client to given value. + * \param[in] cli stream client on which to set parameter. + * \param[in] par identifier of the parameter to be set. + * \param[in] val value of the parameter to be set. + * \param[in] val_len length of the parameter value. + * \returns 0 in success; negative -errno on error. */ +int osmo_stream_cli_set_param(struct osmo_stream_cli *cli, enum osmo_stream_cli_param par, void *val, size_t val_len) +{ + OSMO_ASSERT(cli); + uint8_t val8; + + switch (par) { + case OSMO_STREAM_CLI_PAR_SCTP_SOCKOPT_AUTH_SUPPORTED: + if (!val || val_len != sizeof(uint8_t)) + return -EINVAL; + val8 = *(uint8_t *)val; + cli->ma_pars.sctp.sockopt_auth_supported.set = true; + cli->ma_pars.sctp.sockopt_auth_supported.abort_on_failure = val8 > 1; + cli->ma_pars.sctp.sockopt_auth_supported.value = (val8 == 1 || val8 == 3) ? 1 : 0; + break; + case OSMO_STREAM_CLI_PAR_SCTP_SOCKOPT_ASCONF_SUPPORTED: + if (!val || val_len != sizeof(uint8_t)) + return -EINVAL; + val8 = *(uint8_t *)val; + cli->ma_pars.sctp.sockopt_asconf_supported.set = true; + cli->ma_pars.sctp.sockopt_asconf_supported.abort_on_failure = val8 > 1; + cli->ma_pars.sctp.sockopt_asconf_supported.value = (val8 == 1 || val8 == 3) ? 1 : 0; + break; + case OSMO_STREAM_CLI_PAR_SCTP_INIT_NUM_OSTREAMS: + if (!val || val_len != sizeof(uint16_t)) + return -EINVAL; + cli->ma_pars.sctp.sockopt_initmsg.set = true; + cli->ma_pars.sctp.sockopt_initmsg.num_ostreams_present = true; + cli->ma_pars.sctp.sockopt_initmsg.num_ostreams_value = *(uint16_t *)val; + break; + case OSMO_STREAM_CLI_PAR_SCTP_INIT_MAX_INSTREAMS: + if (!val || val_len != sizeof(uint16_t)) + return -EINVAL; + cli->ma_pars.sctp.sockopt_initmsg.set = true; + cli->ma_pars.sctp.sockopt_initmsg.max_instreams_present = true; + cli->ma_pars.sctp.sockopt_initmsg.max_instreams_value = *(uint16_t *)val; + break; + case OSMO_STREAM_CLI_PAR_SCTP_INIT_MAX_ATTEMPTS: + if (!val || val_len != sizeof(uint16_t)) + return -EINVAL; + cli->ma_pars.sctp.sockopt_initmsg.set = true; + cli->ma_pars.sctp.sockopt_initmsg.max_attempts_present = true; + cli->ma_pars.sctp.sockopt_initmsg.max_attempts_value = *(uint16_t *)val; + break; + case OSMO_STREAM_CLI_PAR_SCTP_INIT_TIMEOUT: + if (!val || val_len != sizeof(uint16_t)) + return -EINVAL; + cli->ma_pars.sctp.sockopt_initmsg.set = true; + cli->ma_pars.sctp.sockopt_initmsg.max_init_timeo_present = true; + cli->ma_pars.sctp.sockopt_initmsg.max_init_timeo_value = *(uint16_t *)val; + break; + default: + return -ENOENT; + }; + return 0; +} + +/*! @} */ diff --git a/src/stream_srv.c b/src/stream_srv.c new file mode 100644 index 0000000..dad6b7a --- /dev/null +++ b/src/stream_srv.c @@ -0,0 +1,1215 @@ +/* (C) 2011 by Pablo Neira Ayuso <pablo@gnumonks.org> + * (C) 2015-2016 by Harald Welte <laforge@gnumonks.org> + * (C) 2023 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de> + * All Rights Reserved. + * + * SPDX-License-Identifier: GPL-2.0+ + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + */ + +#include <stdio.h> +#include <unistd.h> +#include <stdlib.h> +#include <errno.h> +#include <string.h> +#include <time.h> +#include <sys/fcntl.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <sys/ioctl.h> +#include <arpa/inet.h> +#include <netinet/in.h> +#include <netinet/tcp.h> + +#include <osmocom/core/timer.h> +#include <osmocom/core/select.h> +#include <osmocom/core/utils.h> +#include <osmocom/gsm/tlv.h> +#include <osmocom/core/msgb.h> +#include <osmocom/core/osmo_io.h> +#include <osmocom/core/panic.h> +#include <osmocom/core/logging.h> +#include <osmocom/core/talloc.h> +#include <osmocom/core/socket.h> + +#include <osmocom/netif/stream.h> +#include <osmocom/netif/stream_private.h> + +#include "config.h" + +#include <osmocom/netif/sctp.h> + +/*! \file stream_srv.c */ + +#define LOGSLNK(link, level, fmt, args...) \ + LOGP(DLINP, level, "SRV(%s,%s) " fmt, \ + link->name ? : "", \ + link->sockname, \ + ## args) + +#define LOGSSRV(srv, level, fmt, args...) \ + LOGP(DLINP, level, "SRVCONN(%s,%s) " fmt, \ + srv->name ? : "", \ + srv->sockname, \ + ## args) +/* + * Server side. + */ + +#define OSMO_STREAM_SRV_F_RECONF (1 << 0) +#define OSMO_STREAM_SRV_F_NODELAY (1 << 1) + +struct osmo_stream_srv_link { + struct osmo_fd ofd; + char *name; + char sockname[OSMO_SOCK_MULTIADDR_PEER_STR_MAXLEN]; + char *addr[OSMO_STREAM_MAX_ADDRS]; + uint8_t addrcnt; + uint16_t port; + int sk_domain; + int sk_type; + uint16_t proto; + osmo_stream_srv_link_accept_cb_t accept_cb; + void *data; + int flags; + struct osmo_sock_init2_multiaddr_pars ma_pars; +}; + +static int _setsockopt_nosigpipe(struct osmo_stream_srv_link *link, int new_fd) +{ +#ifdef SO_NOSIGPIPE + int ret; + int val = 1; + ret = setsockopt(new_fd, SOL_SOCKET, SO_NOSIGPIPE, (void *)&val, sizeof(val)); + if (ret < 0) + LOGSLNK(link, LOGL_ERROR, "Failed setting SO_NOSIGPIPE: %s\n", strerror(errno)); + return ret; +#else + return 0; +#endif +} + +static int osmo_stream_srv_link_ofd_cb(struct osmo_fd *ofd, unsigned int what) +{ + int ret; + int sock_fd; + struct osmo_sockaddr osa; + socklen_t sa_len = sizeof(osa.u.sas); + struct osmo_stream_srv_link *link = ofd->data; + + ret = accept(ofd->fd, &osa.u.sa, &sa_len); + if (ret < 0) { + LOGSLNK(link, LOGL_ERROR, "failed to accept from origin peer, reason=`%s'\n", + strerror(errno)); + return ret; + } + sock_fd = ret; + + switch (osa.u.sa.sa_family) { + case AF_UNIX: + LOGSLNK(link, LOGL_INFO, "accept()ed new link on fd %d\n", + sock_fd); + _setsockopt_nosigpipe(link, sock_fd); + break; + case AF_INET6: + case AF_INET: + LOGSLNK(link, LOGL_INFO, "accept()ed new link from %s\n", + osmo_sockaddr_to_str(&osa)); + + if (link->proto == IPPROTO_SCTP) { + _setsockopt_nosigpipe(link, sock_fd); + ret = stream_sctp_sock_activate_events(sock_fd); + if (ret < 0) + goto error_close_socket; + } + break; + default: + LOGSLNK(link, LOGL_ERROR, "accept()ed unexpected address family %d\n", + osa.u.sa.sa_family); + goto error_close_socket; + } + + if (link->flags & OSMO_STREAM_SRV_F_NODELAY) { + ret = stream_setsockopt_nodelay(sock_fd, link->proto, 1); + if (ret < 0) + goto error_close_socket; + } + + if (!link->accept_cb) { + ret = -ENOTSUP; + goto error_close_socket; + } + + ret = link->accept_cb(link, sock_fd); + if (ret) + goto error_close_socket; + return 0; + +error_close_socket: + close(sock_fd); + return ret; +} + +/*! \addtogroup stream_srv + * @{ + */ + +/*! Create an Osmocom Stream Server Link. + * A Stream Server Link is the listen()+accept() "parent" to individual connections from remote clients. + * \param[in] ctx talloc allocation context + * \returns Stream Server Link with default values (AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP) + */ +struct osmo_stream_srv_link *osmo_stream_srv_link_create(void *ctx) +{ + struct osmo_stream_srv_link *link; + + link = talloc_zero(ctx, struct osmo_stream_srv_link); + if (!link) + return NULL; + + link->sk_domain = AF_UNSPEC; + link->sk_type = SOCK_STREAM; + link->proto = IPPROTO_TCP; + osmo_fd_setup(&link->ofd, -1, OSMO_FD_READ | OSMO_FD_WRITE, osmo_stream_srv_link_ofd_cb, link, 0); + + link->ma_pars.sctp.version = 0; + + return link; +} + +/*! Set a name on the srv_link object (used during logging). + * \param[in] link server link whose name is to be set. The name is copied into the osmo_stream_srv_link, so + * the caller memory is not required to be valid beyond the call of this function. + * \param[in] name the name to be set on link + */ +void osmo_stream_srv_link_set_name(struct osmo_stream_srv_link *link, const char *name) +{ + osmo_talloc_replace_string(link, &link->name, name); +} + +/*! Retrieve name previously set on the srv_link object (see osmo_stream_srv_link_set_name()). + * \param[in] link server link whose name is to be retrieved + * \returns The name to be set on link; NULL if never set + */ +const char *osmo_stream_srv_link_get_name(const struct osmo_stream_srv_link *link) +{ + return link->name; +} + +/*! Set the NODELAY socket option to avoid Nagle-like behavior. + * Setting this to nodelay=true will automatically set the NODELAY + * socket option on any socket established via this server link, before + * calling the accept_cb() + * \param[in] link server link whose sockets are to be configured + * \param[in] nodelay whether to set (true) NODELAY after accept + */ +void osmo_stream_srv_link_set_nodelay(struct osmo_stream_srv_link *link, bool nodelay) +{ + if (nodelay) + link->flags |= OSMO_STREAM_SRV_F_NODELAY; + else + link->flags &= ~OSMO_STREAM_SRV_F_NODELAY; +} + +/*! Set the local address to which we bind. + * Any changes to this setting will only become active upon next (re)connect. + * \param[in] link Stream Server Link to modify + * \param[in] addr Local IP address + */ +void osmo_stream_srv_link_set_addr(struct osmo_stream_srv_link *link, + const char *addr) +{ + osmo_stream_srv_link_set_addrs(link, &addr, 1); +} + +/*! Set the local address set to which we bind. + * Useful for protocols allowing bind on more than one address (such as SCTP) + * Any changes to this setting will only become active upon next (re)connect. + * \param[in] link Stream Server Link to modify + * \param[in] addr Local IP address + * \return negative on error, 0 on success + */ +int osmo_stream_srv_link_set_addrs(struct osmo_stream_srv_link *link, const char **addr, size_t addrcnt) +{ + int i = 0; + + if (addrcnt > OSMO_STREAM_MAX_ADDRS) + return -EINVAL; + + for (; i < addrcnt; i++) + osmo_talloc_replace_string(link, &link->addr[i], addr[i]); + for (; i < link->addrcnt; i++) { + talloc_free(link->addr[i]); + link->addr[i] = NULL; + } + + link->addrcnt = addrcnt; + link->flags |= OSMO_STREAM_SRV_F_RECONF; + return 0; +} + +/*! Set the local port number to which we bind. + * Any changes to this setting will only become active upon next (re)connect. + * \param[in] link Stream Server Link to modify + * \param[in] port Local port number + */ +void osmo_stream_srv_link_set_port(struct osmo_stream_srv_link *link, + uint16_t port) +{ + link->port = port; + link->flags |= OSMO_STREAM_SRV_F_RECONF; +} + +/*! Set the protocol for the stream server link. + * Any changes to this setting will only become active upon next (re)connect. + * \param[in] link Stream Server Link to modify + * \param[in] proto Protocol (like IPPROTO_TCP (default), IPPROTO_SCTP, ...) + */ +void +osmo_stream_srv_link_set_proto(struct osmo_stream_srv_link *link, + uint16_t proto) +{ + link->proto = proto; + link->flags |= OSMO_STREAM_SRV_F_RECONF; +} + + +/*! Set the socket type for the stream server link. + * Any changes to this setting will only become active upon next (re)connect. + * \param[in] link Stream Server Link to modify + * \param[in] type Socket Type (like SOCK_STREAM (default), SOCK_SEQPACKET, ...) + * \returns zero on success, negative on error. + */ +int osmo_stream_srv_link_set_type(struct osmo_stream_srv_link *link, int type) +{ + switch (type) { + case SOCK_STREAM: + case SOCK_SEQPACKET: + break; + default: + return -ENOTSUP; + } + link->sk_type = type; + link->flags |= OSMO_STREAM_SRV_F_RECONF; + return 0; +} + +/*! Set the socket type for the stream server link. + * Any changes to this setting will only become active upon next (re)connect. + * \param[in] link Stream Server Link to modify + * \param[in] type Socket Domain (like AF_UNSPEC (default for IP), AF_UNIX, AF_INET, ...) + * \returns zero on success, negative on error. + */ +int osmo_stream_srv_link_set_domain(struct osmo_stream_srv_link *link, int domain) +{ + switch (domain) { + case AF_UNSPEC: + case AF_INET: + case AF_INET6: + case AF_UNIX: + break; + default: + return -ENOTSUP; + } + link->sk_domain = domain; + link->flags |= OSMO_STREAM_SRV_F_RECONF; + return 0; +} + +/*! Set application private data of the stream server link. + * \param[in] link Stream Server Link to modify + * \param[in] data User-specific data (available in call-back functions) */ +void +osmo_stream_srv_link_set_data(struct osmo_stream_srv_link *link, + void *data) +{ + link->data = data; +} + +/*! Retrieve application private data of the stream server link. + * \param[in] link Stream Server Link to modify + * \returns Application private data, as set by \ref osmo_stream_cli_set_data() */ +void *osmo_stream_srv_link_get_data(struct osmo_stream_srv_link *link) +{ + return link->data; +} + +/* Similar to osmo_sock_multiaddr_get_name_buf(), but aimed at listening sockets (only local part): */ +static char *get_local_sockname_buf(char *buf, size_t buf_len, const struct osmo_stream_srv_link *link) +{ + struct osmo_strbuf sb = { .buf = buf, .len = buf_len }; + int rc; + + if (buf_len > 0) + buf[0] = '\0'; + + switch (link->sk_domain) { + case AF_UNSPEC: + /* we assume INET(6) by default upon link creation: */ + case AF_INET: + case AF_INET6: + { + char hostbuf[OSMO_STREAM_MAX_ADDRS][INET6_ADDRSTRLEN]; + size_t num_hostbuf = ARRAY_SIZE(hostbuf); + char portbuf[6]; + bool need_more_bufs; + rc = osmo_sock_multiaddr_get_ip_and_port(link->ofd.fd, link->proto, &hostbuf[0][0], + &num_hostbuf, sizeof(hostbuf[0]), + portbuf, sizeof(portbuf), true); + if (rc < 0) + return NULL; + need_more_bufs = num_hostbuf > ARRAY_SIZE(hostbuf); + if (need_more_bufs) + num_hostbuf = ARRAY_SIZE(hostbuf); + OSMO_STRBUF_APPEND(sb, osmo_multiaddr_ip_and_port_snprintf, + &hostbuf[0][0], num_hostbuf, sizeof(hostbuf[0]), portbuf); + if (need_more_bufs) + OSMO_STRBUF_PRINTF(sb, "<need-more-bufs!>"); + return buf; + } + case AF_UNIX: + { + struct osmo_sockaddr osa; + struct sockaddr_un *sun; + socklen_t len = sizeof(osa.u.sas); + rc = getsockname(link->ofd.fd, &osa.u.sa, &len); + if (rc < 0) { + OSMO_STRBUF_PRINTF(sb, "<error-in-getsockname>"); + return buf; + } + /* Make sure sun_path is NULL terminated: */ + sun = (struct sockaddr_un *)&osa.u.sa; + sun->sun_path[sizeof(sun->sun_path) - 1] = '\0'; + OSMO_STRBUF_PRINTF(sb, "%s", sun->sun_path); + return buf; + } + default: + return NULL; + } +} + +/*! Retrieve description of the stream server link e. g. 127.0.0.1:1234. + * Calling this function will build a string that describes the socket in terms of its local/remote + * address/port. The returned name is stored in a static buffer; it is hence not re-entrant or thread-safe. + * \param[in] link Stream Server Link to examine + * \returns Link description or NULL in case of error */ +char *osmo_stream_srv_link_get_sockname(const struct osmo_stream_srv_link *link) +{ + static char buf[sizeof(link->sockname)]; + + if (!get_local_sockname_buf(buf, sizeof(buf), link)) + return NULL; + return buf; +} + +/*! Retrieve Osmocom File Descriptor of the stream server link. + * \param[in] link Stream Server Link + * \returns Pointer to \ref osmo_fd */ +struct osmo_fd * +osmo_stream_srv_link_get_ofd(struct osmo_stream_srv_link *link) +{ + return &link->ofd; +} + +/*! Retrieve File Descriptor of the stream server link. + * \param[in] conn Stream Server Link + * \returns file descriptor or negative on error */ +int osmo_stream_srv_link_get_fd(const struct osmo_stream_srv_link *link) +{ + return link->ofd.fd; +} + +/*! Set the accept() call-back of the stream server link. + * The provided call-back will be called whenever a new inbound connection + * is accept()ed. The call-back then typically creates a new osmo_stream_srv. + * If the call-back returns a negative value, the file descriptor will be closed. + * \param[in] link Stream Server Link + * \param[in] accept_cb Call-back function executed upon accept() */ +void osmo_stream_srv_link_set_accept_cb(struct osmo_stream_srv_link *link, + int (*accept_cb)(struct osmo_stream_srv_link *link, int fd)) + +{ + link->accept_cb = accept_cb; +} + +/*! Destroy the stream server link. Closes + Releases Memory. + * \param[in] link Stream Server Link */ +void osmo_stream_srv_link_destroy(struct osmo_stream_srv_link *link) +{ + osmo_stream_srv_link_close(link); + talloc_free(link); +} + +/*! Open the stream server link. This actually initializes the + * underlying socket and binds it to the configured ip/port. + * \param[in] link Stream Server Link to open + * \return negative on error, 0 on success */ +int osmo_stream_srv_link_open(struct osmo_stream_srv_link *link) +{ + int ret; + + if (link->ofd.fd >= 0) { + /* No reconfigure needed for existing socket, we are fine */ + if (!(link->flags & OSMO_STREAM_SRV_F_RECONF)) + return 0; + /* we are reconfiguring this socket, close existing first. */ + osmo_stream_srv_link_close(link); + } + + link->flags &= ~OSMO_STREAM_SRV_F_RECONF; + + switch (link->sk_domain) { + case AF_UNIX: + ret = osmo_sock_unix_init(link->sk_type, 0, link->addr[0], OSMO_SOCK_F_BIND); + break; + case AF_UNSPEC: + case AF_INET: + case AF_INET6: + switch (link->proto) { +#ifdef HAVE_LIBSCTP + case IPPROTO_SCTP: + ret = osmo_sock_init2_multiaddr2(link->sk_domain, link->sk_type, link->proto, + (const char **)link->addr, link->addrcnt, link->port, + NULL, 0, 0, OSMO_SOCK_F_BIND, &link->ma_pars); + break; +#endif + default: + ret = osmo_sock_init(link->sk_domain, link->sk_type, link->proto, + link->addr[0], link->port, OSMO_SOCK_F_BIND); + } + break; + default: + ret = -ENOTSUP; + } + if (ret < 0) + return ret; + + link->ofd.fd = ret; + if (osmo_fd_register(&link->ofd) < 0) { + close(ret); + link->ofd.fd = -1; + return -EIO; + } + + get_local_sockname_buf(link->sockname, sizeof(link->sockname), link); + return 0; +} + +/*! Check whether the stream server link is opened. + * \param[in] link Stream Server Link to check */ +bool osmo_stream_srv_link_is_opened(const struct osmo_stream_srv_link *link) +{ + if (!link) + return false; + + if (link->ofd.fd == -1) + return false; + + return true; +} + +/*! Close the stream server link and unregister from select loop. + * Does not destroy the server link, merely closes it! + * \param[in] link Stream Server Link to close */ +void osmo_stream_srv_link_close(struct osmo_stream_srv_link *link) +{ + if (!osmo_stream_srv_link_is_opened(link)) + return; + + osmo_fd_unregister(&link->ofd); + close(link->ofd.fd); + link->ofd.fd = -1; +} + +/*! Set given parameter of stream_srv_link to given value. + * \param[in] cli stream client on which to set parameter. + * \param[in] par identifier of the parameter to be set. + * \param[in] val value of the parameter to be set. + * \param[in] val_len length of the parameter value. + * \returns 0 in success; negative -errno on error. */ +int osmo_stream_srv_link_set_param(struct osmo_stream_srv_link *link, enum osmo_stream_srv_link_param par, + void *val, size_t val_len) +{ + OSMO_ASSERT(link); + uint8_t val8; + + switch (par) { + case OSMO_STREAM_SRV_LINK_PAR_SCTP_SOCKOPT_AUTH_SUPPORTED: + if (!val || val_len != sizeof(uint8_t)) + return -EINVAL; + val8 = *(uint8_t *)val; + link->ma_pars.sctp.sockopt_auth_supported.set = true; + link->ma_pars.sctp.sockopt_auth_supported.abort_on_failure = val8 > 1; + link->ma_pars.sctp.sockopt_auth_supported.value = (val8 == 1 || val8 == 3) ? 1 : 0; + break; + case OSMO_STREAM_SRV_LINK_PAR_SCTP_SOCKOPT_ASCONF_SUPPORTED: + if (!val || val_len != sizeof(uint8_t)) + return -EINVAL; + val8 = *(uint8_t *)val; + link->ma_pars.sctp.sockopt_asconf_supported.set = true; + link->ma_pars.sctp.sockopt_asconf_supported.abort_on_failure = val8 > 1; + link->ma_pars.sctp.sockopt_asconf_supported.value = (val8 == 1 || val8 == 3) ? 1 : 0; + break; + case OSMO_STREAM_SRV_LINK_PAR_SCTP_INIT_NUM_OSTREAMS: + if (!val || val_len != sizeof(uint16_t)) + return -EINVAL; + link->ma_pars.sctp.sockopt_initmsg.set = true; + link->ma_pars.sctp.sockopt_initmsg.num_ostreams_present = true; + link->ma_pars.sctp.sockopt_initmsg.num_ostreams_value = *(uint16_t *)val; + break; + case OSMO_STREAM_SRV_LINK_PAR_SCTP_INIT_MAX_INSTREAMS: + if (!val || val_len != sizeof(uint16_t)) + return -EINVAL; + link->ma_pars.sctp.sockopt_initmsg.set = true; + link->ma_pars.sctp.sockopt_initmsg.max_instreams_present = true; + link->ma_pars.sctp.sockopt_initmsg.max_instreams_value = *(uint16_t *)val; + break; + default: + return -ENOENT; + }; + return 0; +} + +/*! @} */ + +#define OSMO_STREAM_SRV_F_FLUSH_DESTROY (1 << 0) + +struct osmo_stream_srv { + struct osmo_stream_srv_link *srv; + char *name; + char sockname[OSMO_SOCK_NAME_MAXLEN]; + enum osmo_stream_mode mode; + union { + struct osmo_fd ofd; + struct osmo_io_fd *iofd; + }; + struct llist_head tx_queue; + osmo_stream_srv_closed_cb_t closed_cb; + osmo_stream_srv_read_cb_t read_cb; + osmo_stream_srv_read_cb2_t iofd_read_cb; + void *data; + int flags; +}; + +/*! \addtogroup stream_srv + * @{ + */ + +static void stream_srv_iofd_read_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg) +{ + struct osmo_stream_srv *conn = osmo_iofd_get_data(iofd); + + switch (res) { + case -EPIPE: + case -ECONNRESET: + LOGSSRV(conn, LOGL_ERROR, "lost connection with client (%d)\n", res); + break; + case 0: + LOGSSRV(conn, LOGL_NOTICE, "connection closed with client\n"); + break; + default: + LOGSSRV(conn, LOGL_DEBUG, "received %d bytes from client\n", res); + break; + } + if (OSMO_UNLIKELY(conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY)) { + LOGSSRV(conn, LOGL_INFO, "Connection is being flushed and closed; ignoring received message\n"); + msgb_free(msg); + if (osmo_iofd_txqueue_len(iofd) == 0) + osmo_stream_srv_destroy(conn); + return; + } + + if (conn->iofd_read_cb) + conn->iofd_read_cb(conn, res, msg); + else + msgb_free(msg); +} + +static void stream_srv_iofd_write_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg) +{ + struct osmo_stream_srv *conn = osmo_iofd_get_data(iofd); + LOGSSRV(conn, LOGL_DEBUG, "connected write\n"); + + if (res < 0) + LOGSSRV(conn, LOGL_ERROR, "error to send: %s\n", strerror(errno)); + + if (osmo_iofd_txqueue_len(iofd) == 0) + if (conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY) + osmo_stream_srv_destroy(conn); +} + +static const struct osmo_io_ops srv_ioops = { + .read_cb = stream_srv_iofd_read_cb, + .write_cb = stream_srv_iofd_write_cb, +}; + +#ifdef HAVE_LIBSCTP +static void stream_srv_iofd_recvmsg_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg, const struct msghdr *msgh) +{ + struct osmo_stream_srv *conn = osmo_iofd_get_data(iofd); + LOGSSRV(conn, LOGL_DEBUG, "message received (res=%d)\n", res); + + res = stream_iofd_sctp_recvmsg_trailer(iofd, msg, res, msgh); + + switch (res) { + case -EPIPE: + case -ECONNRESET: + LOGSSRV(conn, LOGL_ERROR, "lost connection with client (%d)\n", res); + break; + case 0: + LOGSSRV(conn, LOGL_NOTICE, "connection closed with client\n"); + break; + default: + if (OSMO_LIKELY(res > 0)) + LOGSSRV(conn, LOGL_DEBUG, "received %u bytes from client\n", res); + break; + } + if (OSMO_UNLIKELY(conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY)) { + LOGSSRV(conn, LOGL_INFO, "Connection is being flushed and closed; ignoring received message\n"); + msgb_free(msg); + if (osmo_iofd_txqueue_len(iofd) == 0) + osmo_stream_srv_destroy(conn); + return; + } + + if (conn->iofd_read_cb) + conn->iofd_read_cb(conn, res, msg); + else + msgb_free(msg); +} + +static const struct osmo_io_ops srv_ioops_sctp = { + .recvmsg_cb = stream_srv_iofd_recvmsg_cb, + .sendmsg_cb = stream_srv_iofd_write_cb, +}; +#endif + +static int osmo_stream_srv_read(struct osmo_stream_srv *conn) +{ + int rc = 0; + + LOGSSRV(conn, LOGL_DEBUG, "message received\n"); + + if (conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY) { + LOGSSRV(conn, LOGL_INFO, "Connection is being flushed and closed; ignoring received message\n"); + return 0; + } + + if (conn->read_cb) + rc = conn->read_cb(conn); + + return rc; +} + +static void osmo_stream_srv_write(struct osmo_stream_srv *conn) +{ +#ifdef HAVE_LIBSCTP + struct sctp_sndrcvinfo sinfo; +#endif + struct msgb *msg; + int ret; + + if (llist_empty(&conn->tx_queue)) { + osmo_fd_write_disable(&conn->ofd); + return; + } + msg = llist_first_entry(&conn->tx_queue, struct msgb, list); + llist_del(&msg->list); + + LOGSSRV(conn, LOGL_DEBUG, "sending %u bytes of data\n", msg->len); + + switch (conn->srv->sk_domain) { + case AF_UNIX: + ret = send(conn->ofd.fd, msgb_data(msg), msgb_length(msg), 0); + break; + case AF_INET: + case AF_INET6: + case AF_UNSPEC: + switch (conn->srv->proto) { +#ifdef HAVE_LIBSCTP + case IPPROTO_SCTP: + memset(&sinfo, 0, sizeof(sinfo)); + sinfo.sinfo_ppid = htonl(msgb_sctp_ppid(msg)); + sinfo.sinfo_stream = msgb_sctp_stream(msg); + ret = sctp_send(conn->ofd.fd, msgb_data(msg), msgb_length(msg), + &sinfo, MSG_NOSIGNAL); + break; +#endif + case IPPROTO_TCP: + default: + ret = send(conn->ofd.fd, msgb_data(msg), msgb_length(msg), 0); + break; + } + break; + default: + ret = -1; + errno = ENOTSUP; + } + + if (ret >= 0 && ret < msgb_length(msg)) { + LOGSSRV(conn, LOGL_ERROR, "short send: %d < exp %u\n", ret, msgb_length(msg)); + /* Update msgb and re-add it at the start of the queue: */ + msgb_pull(msg, ret); + llist_add(&msg->list, &conn->tx_queue); + return; + } + + if (ret == -1) {/* send(): On error -1 is returned, and errno is set appropriately */ + int err = errno; + LOGSSRV(conn, LOGL_ERROR, "send(len=%u) error: %s\n", msgb_length(msg), strerror(err)); + if (err == EAGAIN) { + /* Re-add at the start of the queue to re-attempt: */ + llist_add(&msg->list, &conn->tx_queue); + return; + } + msgb_free(msg); + osmo_stream_srv_destroy(conn); + return; + } + + msgb_free(msg); + + if (llist_empty(&conn->tx_queue)) { + osmo_fd_write_disable(&conn->ofd); + if (conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY) + osmo_stream_srv_destroy(conn); + } +} + +static int osmo_stream_srv_cb(struct osmo_fd *ofd, unsigned int what) +{ + struct osmo_stream_srv *conn = ofd->data; + int rc = 0; + + LOGSSRV(conn, LOGL_DEBUG, "connected read/write (what=0x%x)\n", what); + if (what & OSMO_FD_READ) + rc = osmo_stream_srv_read(conn); + if (rc != -EBADF && (what & OSMO_FD_WRITE)) + osmo_stream_srv_write(conn); + + return rc; +} + + +/*! Create a legacy osmo_fd mode Stream Server inside the specified link. + * + * This is the function an application traditionally calls from within the + * accept_cb call-back of the osmo_stream_srv_link. It creates a new + * osmo_stream_srv within that link. + * + * New users/programs should use osmo_stream_srv_create2 to operate in osmo_io + * mode instead. + * + * \param[in] ctx talloc allocation context from which to allocate + * \param[in] link Stream Server Link to which we belong + * \param[in] fd system file descriptor of the new connection + * \param[in] read_cb Call-back to call when the socket is readable + * \param[in] closed_cb Call-back to call when the connection is closed + * \param[in] data User data to save in the new Stream Server struct + * \returns Stream Server in case of success; NULL on error */ +struct osmo_stream_srv * +osmo_stream_srv_create(void *ctx, struct osmo_stream_srv_link *link, int fd, + osmo_stream_srv_read_cb_t read_cb, + osmo_stream_srv_closed_cb_t closed_cb, + void *data) +{ + struct osmo_stream_srv *conn; + + OSMO_ASSERT(link); + + conn = talloc_zero(ctx, struct osmo_stream_srv); + if (conn == NULL) + return NULL; + + conn->mode = OSMO_STREAM_MODE_OSMO_FD; + conn->srv = link; + osmo_fd_setup(&conn->ofd, fd, OSMO_FD_READ, osmo_stream_srv_cb, conn, 0); + conn->read_cb = read_cb; + conn->closed_cb = closed_cb; + conn->data = data; + INIT_LLIST_HEAD(&conn->tx_queue); + + osmo_sock_get_name_buf(conn->sockname, sizeof(conn->sockname), fd); + + if (osmo_fd_register(&conn->ofd) < 0) { + LOGSSRV(conn, LOGL_ERROR, "could not register FD\n"); + talloc_free(conn); + return NULL; + } + return conn; +} + +/*! Create an osmo_iofd mode Stream Server inside the specified link. + * + * This is the function an application typically calls from within the + * accept_cb call-back of the osmo_stream_srv_link. It creates a new + * osmo_stream_srv in osmo_io mode within that link. + * + * \param[in] ctx talloc allocation context from which to allocate + * \param[in] link Stream Server Link to which we belong + * \param[in] fd system file descriptor of the new connection + * \param[in] data User data to save in the new Stream Server struct + * \returns Stream Server in case of success; NULL on error */ +struct osmo_stream_srv * +osmo_stream_srv_create2(void *ctx, struct osmo_stream_srv_link *link, int fd, void *data) +{ + struct osmo_stream_srv *conn; + + OSMO_ASSERT(link); + + conn = talloc_zero(ctx, struct osmo_stream_srv); + if (conn == NULL) + return NULL; + + conn->mode = OSMO_STREAM_MODE_OSMO_IO; + conn->srv = link; + + osmo_sock_get_name_buf(conn->sockname, sizeof(conn->sockname), fd); + + if (link->proto == IPPROTO_SCTP) { + conn->iofd = osmo_iofd_setup(conn, fd, conn->sockname, OSMO_IO_FD_MODE_RECVMSG_SENDMSG, + &srv_ioops_sctp, conn); + if (conn->iofd) + osmo_iofd_set_cmsg_size(conn->iofd, CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))); + } else { + conn->iofd = osmo_iofd_setup(conn, fd, conn->sockname, OSMO_IO_FD_MODE_READ_WRITE, + &srv_ioops, conn); + } + if (!conn->iofd) { + talloc_free(conn); + return NULL; + } + conn->data = data; + + if (osmo_iofd_register(conn->iofd, fd) < 0) { + LOGSSRV(conn, LOGL_ERROR, "could not register FD %d\n", fd); + talloc_free(conn); + return NULL; + } + + return conn; +} + +/*! Set a name on the srv object (used during logging). + * \param[in] conn server whose name is to be set. The name is copied into the osmo_stream_srv_link, so + * the caller memory is not required to be valid beyond the call of this function. + * \param[in] name the name to be set on conn + */ +void osmo_stream_srv_set_name(struct osmo_stream_srv *conn, const char *name) +{ + osmo_talloc_replace_string(conn, &conn->name, name); + if (conn->mode == OSMO_STREAM_MODE_OSMO_IO && conn->iofd) + osmo_iofd_set_name(conn->iofd, name); +} + +/*! Retrieve name previously set on the srv object (see osmo_stream_srv_set_name()). + * \param[in] conn server whose name is to be retrieved + * \returns The name to be set on conn; NULL if never set + */ +const char *osmo_stream_srv_get_name(const struct osmo_stream_srv *conn) +{ + return conn->name; +} + +/*! Set the call-back function for incoming data on an osmo_io stream_srv. + * + * This function only works with osmo_stream_srv in osmo_io mode, created by osmo_stream_srv_create2()! + * + * Whenever data is received on the osmo_stram_srv, the read_cb call-back function of the user application is + * called. + * + * \param[in] conn Stream Server to modify + * \param[in] read_cb Call-back function to be called when data was read */ +void osmo_stream_srv_set_read_cb(struct osmo_stream_srv *conn, + osmo_stream_srv_read_cb2_t read_cb) +{ + OSMO_ASSERT(conn && conn->mode == OSMO_STREAM_MODE_OSMO_IO); + conn->iofd_read_cb = read_cb; +} + +/*! Set the call-back function called when the stream server socket was closed. + * Whenever the socket was closed (network error, client disconnect, etc.), the user-provided + * call-back function given here is called. This is typically used by the application to clean up any of its + * internal state related to this specific client/connection. + * \param[in] conn Stream Server to modify + * \param[in] closed_cb Call-back function to be called when the connection was closed */ +void osmo_stream_srv_set_closed_cb(struct osmo_stream_srv *conn, + osmo_stream_srv_closed_cb_t closed_cb) +{ + OSMO_ASSERT(conn); + conn->closed_cb = closed_cb; +} + +/*! Prepare to send out all pending messages on the connection's Tx queue. + * and then automatically destroy the stream with osmo_stream_srv_destroy(). + * This function disables queuing of new messages on the connection and also + * disables reception of new messages on the connection. + * \param[in] conn Stream Server to modify */ +void osmo_stream_srv_set_flush_and_destroy(struct osmo_stream_srv *conn) +{ + conn->flags |= OSMO_STREAM_SRV_F_FLUSH_DESTROY; +} + +/*! Set application private data of the stream server. + * \param[in] conn Stream Server to modify + * \param[in] data User-specific data (available in call-back functions) */ +void +osmo_stream_srv_set_data(struct osmo_stream_srv *conn, + void *data) +{ + conn->data = data; +} + +/*! Set the segmentation callback for target osmo_stream_srv structure. + * + * A segmentation call-back can optionally be used when a packet based protocol (like TCP) is used within a + * STREAM style socket that does not preserve message boundaries within the stream. If a segmentation + * call-back is given, the osmo_stream_srv library code will makes sure that the read_cb called only for + * complete single messages, and not arbitrary segments of the stream. + * + * This function only works with osmo_stream_srv in osmo_io mode, created by osmo_stream_srv_create2()! + * The connection has to have been established prior to calling this function. + * + * \param[in,out] conn Target Stream Server to modify + * \param[in] segmentation_cb Segmentation callback to be set */ +void osmo_stream_srv_set_segmentation_cb(struct osmo_stream_srv *conn, + osmo_stream_srv_segmentation_cb_t segmentation_cb) +{ + /* Note that the following implies that iofd != NULL, since + * osmo_stream_srv_create2() creates the iofd member, too */ + OSMO_ASSERT(conn->mode == OSMO_STREAM_MODE_OSMO_IO); + /* Copy default settings */ + struct osmo_io_ops conn_ops; + osmo_iofd_get_ioops(conn->iofd, &conn_ops); + /* Set segmentation cb for this connection */ + conn_ops.segmentation_cb = segmentation_cb; + osmo_iofd_set_ioops(conn->iofd, &conn_ops); +} + +/*! Retrieve application private data of the stream server + * \param[in] conn Stream Server + * \returns Application private data, as set by \ref osmo_stream_srv_set_data() */ +void *osmo_stream_srv_get_data(struct osmo_stream_srv *conn) +{ + return conn->data; +} + +/*! Retrieve the stream server socket description. + * The returned name is stored in a static buffer; it is hence not re-entrant or thread-safe! + * \param[in] cli Stream Server to examine + * \returns Socket description or NULL in case of error */ +const char *osmo_stream_srv_get_sockname(const struct osmo_stream_srv *conn) +{ + static char buf[OSMO_STREAM_MAX_ADDRS * OSMO_SOCK_NAME_MAXLEN]; + + osmo_sock_multiaddr_get_name_buf(buf, sizeof(buf), + osmo_stream_srv_get_fd(conn), conn->srv->proto); + + return buf; +} + +/*! Retrieve Osmocom File Descriptor of a stream server in osmo_fd mode. + * \param[in] conn Stream Server + * \returns Pointer to \ref osmo_fd */ +struct osmo_fd * +osmo_stream_srv_get_ofd(struct osmo_stream_srv *conn) +{ + OSMO_ASSERT(conn->mode == OSMO_STREAM_MODE_OSMO_FD); + return &conn->ofd; +} + +/*! Retrieve File Descriptor of the stream server + * \param[in] conn Stream Server + * \returns file descriptor or negative on error */ +int +osmo_stream_srv_get_fd(const struct osmo_stream_srv *conn) +{ + switch (conn->mode) { + case OSMO_STREAM_MODE_OSMO_FD: + return conn->ofd.fd; + case OSMO_STREAM_MODE_OSMO_IO: + if (conn->iofd) + return osmo_iofd_get_fd(conn->iofd); + default: + break; + } + return -EINVAL; +} + +/*! Retrieve osmo_io descriptor of the stream server socket. + * This function must not be called on a stream server in legacy osmo_fd mode! + * \param[in] srv Stream Server of which we want to obtain the osmo_io descriptor + * \returns osmo_io_fd of stream server. */ +struct osmo_io_fd * +osmo_stream_srv_get_iofd(const struct osmo_stream_srv *srv) +{ + OSMO_ASSERT(srv->mode == OSMO_STREAM_MODE_OSMO_IO); + return srv->iofd; +} + +/*! Retrieve the master (Link) from a Stream Server. + * \param[in] conn Stream Server of which we want to know the Link + * \returns Link through which the given Stream Server is established */ +struct osmo_stream_srv_link *osmo_stream_srv_get_master(struct osmo_stream_srv *conn) +{ + return conn->srv; +} + +/*! Destroy given Stream Server. + * This function closes the Stream Server socket, unregisters from the underlying I/O mechanism, invokes the + * connection's closed_cb() callback to allow API users to clean up any associated state they have for this + * connection, and then de-allocates associated memory. + * \param[in] conn Stream Server to be destroyed */ +void osmo_stream_srv_destroy(struct osmo_stream_srv *conn) +{ + switch (conn->mode) { + case OSMO_STREAM_MODE_OSMO_FD: + osmo_fd_unregister(&conn->ofd); + close(conn->ofd.fd); + msgb_queue_free(&conn->tx_queue); + conn->ofd.fd = -1; + break; + case OSMO_STREAM_MODE_OSMO_IO: + osmo_iofd_free(conn->iofd); + conn->iofd = NULL; + break; + default: + OSMO_ASSERT(false); + } + if (conn->closed_cb) + conn->closed_cb(conn); + talloc_free(conn); +} + +/*! Enqueue data to be sent via an Osmocom stream server. + * \param[in] conn Stream Server through which we want to send + * \param[in] msg Message buffer to enqueue in transmit queue */ +void osmo_stream_srv_send(struct osmo_stream_srv *conn, struct msgb *msg) +{ + int rc; + + OSMO_ASSERT(conn); + OSMO_ASSERT(msg); + if (conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY) { + LOGSSRV(conn, LOGL_DEBUG, "Connection is being flushed and closed; ignoring new outgoing message\n"); + msgb_free(msg); + return; + } + + switch (conn->mode) { + case OSMO_STREAM_MODE_OSMO_FD: + msgb_enqueue(&conn->tx_queue, msg); + osmo_fd_write_enable(&conn->ofd); + break; + case OSMO_STREAM_MODE_OSMO_IO: + if (conn->srv->proto == IPPROTO_SCTP) + rc = stream_iofd_sctp_send_msgb(conn->iofd, msg, MSG_NOSIGNAL); + else + rc = osmo_iofd_write_msgb(conn->iofd, msg); + if (rc < 0) + msgb_free(msg); + break; + default: + OSMO_ASSERT(false); + } +} + +/*! Receive data via an Osmocom stream server in osmo_fd mode. + * \param[in] conn Stream Server from which to receive + * \param msg pre-allocate message buffer to which received data is appended + * \returns number of bytes read, negative on error. + * + * Application programs using the legacy osmo_fd mode of osmo_stream_srv will use + * this function to read/receive from a stream socket after they have been notified that + * it is readable (via select/poll). + * + * If conn is an SCTP connection, additional specific considerations shall be taken: + * - msg->cb is always filled with SCTP ppid, and SCTP stream values, see msgb_sctp_*() APIs. + * - If an SCTP notification was received when reading from the SCTP socket, + * msgb_sctp_msg_flags(msg) will contain bit flag + * OSMO_STREAM_SCTP_MSG_FLAGS_NOTIFICATION set, and the msgb will + * contain a "union sctp_notification" instead of user data. In this case the + * return code will be either 0 (if conn is considered dead after the + * notification) or -EAGAIN (if conn is considered still alive after the + * notification) resembling the standard recv() API. + */ +int osmo_stream_srv_recv(struct osmo_stream_srv *conn, struct msgb *msg) +{ + int ret; + OSMO_ASSERT(conn); + OSMO_ASSERT(msg); + OSMO_ASSERT(conn->mode == OSMO_STREAM_MODE_OSMO_FD); + + switch (conn->srv->sk_domain) { + case AF_UNIX: + ret = recv(conn->ofd.fd, msg->tail, msgb_tailroom(msg), 0); + break; + case AF_INET: + case AF_INET6: + case AF_UNSPEC: + switch (conn->srv->proto) { +#ifdef HAVE_LIBSCTP + case IPPROTO_SCTP: + { + char log_pfx[128]; + snprintf(log_pfx, sizeof(log_pfx), "SRV(%s,%s)", conn->name ? : "", conn->sockname); + ret = stream_sctp_recvmsg_wrapper(conn->ofd.fd, msg, log_pfx); + break; + } +#endif + case IPPROTO_TCP: + default: + ret = recv(conn->ofd.fd, msg->tail, msgb_tailroom(msg), 0); + break; + } + break; + default: + ret = -ENOTSUP; + } + + if (ret < 0) { + if (errno == EPIPE || errno == ECONNRESET) + LOGSSRV(conn, LOGL_ERROR, "lost connection with client\n"); + return ret; + } else if (ret == 0) { + LOGSSRV(conn, LOGL_ERROR, "connection closed with client\n"); + return ret; + } + msgb_put(msg, ret); + LOGSSRV(conn, LOGL_DEBUG, "received %d bytes from client\n", ret); + return ret; +} + +void osmo_stream_srv_clear_tx_queue(struct osmo_stream_srv *conn) +{ + switch (conn->mode) { + case OSMO_STREAM_MODE_OSMO_FD: + msgb_queue_free(&conn->tx_queue); + osmo_fd_write_disable(&conn->ofd); + break; + case OSMO_STREAM_MODE_OSMO_IO: + osmo_iofd_txqueue_clear(conn->iofd); + break; + case OSMO_STREAM_MODE_UNKNOWN: + default: + break; + } + + if (conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY) + osmo_stream_srv_destroy(conn); +} + +/*! @} */ |