aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am13
-rw-r--r--src/amr.c207
-rw-r--r--src/datagram.c46
-rw-r--r--src/ipa.c82
-rw-r--r--src/jibuf.c9
-rw-r--r--src/osmux.c998
-rw-r--r--src/osmux_input.c857
-rw-r--r--src/osmux_output.c396
-rw-r--r--src/prim.c473
-rw-r--r--src/rs232.c32
-rw-r--r--src/rtp.c6
-rw-r--r--src/sctp.c95
-rw-r--r--src/stream.c1232
-rw-r--r--src/stream_cli.c1237
-rw-r--r--src/stream_srv.c1215
15 files changed, 4810 insertions, 2088 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 4d8b900..7b38ac0 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1,6 +1,6 @@
# This is _NOT_ the library release version, it's an API version.
# Please read Chapter 6 "Library interface versions" of the libtool documentation before making any modification
-LIBVERSION=7:0:1
+LIBVERSION=12:0:1
AM_CPPFLAGS = -I$(top_srcdir)/include -I$(top_builddir)
AM_CFLAGS= -fPIC -Wall $(LIBOSMOCORE_CFLAGS) $(LIBOSMOGSM_CFLAGS) $(LIBOSMOABIS_CFLAGS) $(COVERAGE_CFLAGS) $(LIBSCTP_CFLAGS)
@@ -9,7 +9,7 @@ AM_LDFLAGS = $(COVERAGE_LDFLAGS)
lib_LTLIBRARIES = libosmonetif.la
libosmonetif_la_LIBADD = $(LIBOSMOCORE_LIBS) $(LIBOSMOGSM_LIBS) $(LIBSCTP_LIBS)
-libosmonetif_la_LDFLAGS = -version-info $(LIBVERSION) -no-undefined
+libosmonetif_la_LDFLAGS = $(AM_LDFLAGS) -version-info $(LIBVERSION) -no-undefined
libosmonetif_la_SOURCES = amr.c \
datagram.c \
@@ -17,6 +17,15 @@ libosmonetif_la_SOURCES = amr.c \
ipa_unit.c \
jibuf.c \
osmux.c \
+ osmux_input.c \
+ osmux_output.c \
+ prim.c \
rs232.c \
rtp.c \
+ stream_cli.c \
+ stream_srv.c \
stream.c
+
+if ENABLE_LIBSCTP
+libosmonetif_la_SOURCES += sctp.c
+endif
diff --git a/src/amr.c b/src/amr.c
index 5609c46..f201e8a 100644
--- a/src/amr.c
+++ b/src/amr.c
@@ -14,9 +14,11 @@
#include <unistd.h>
#include <string.h>
#include <stdbool.h>
+
+#include <osmocom/core/utils.h>
#include <osmocom/netif/amr.h>
-/* According to TS 26.101:
+/* According to TS 26.101 Table A.1b:
*
* Frame type AMR code bits bytes
* 0 4.75 95 12
@@ -27,8 +29,35 @@
* 5 7.95 159 20
* 6 10.20 204 26
* 7 12.20 244 31
+ * 8 AMR SID 39 5
+ * 9 GSM-EFR SID 43 5
+ * 10 TDMA-EFR SID 38 5
+ * 11 PDC-EFR SID 37 5
+ * 12 NOT USED
+ * 13 NOT USED
+ * 14 NOT USED
+ * 15 NO DATA 0 0
*/
+static int amr_ft_to_bits[AMR_FT_MAX] = {
+ [AMR_FT_0] = AMR_FT_0_LEN_BITS,
+ [AMR_FT_1] = AMR_FT_1_LEN_BITS,
+ [AMR_FT_2] = AMR_FT_2_LEN_BITS,
+ [AMR_FT_3] = AMR_FT_3_LEN_BITS,
+ [AMR_FT_4] = AMR_FT_4_LEN_BITS,
+ [AMR_FT_5] = AMR_FT_5_LEN_BITS,
+ [AMR_FT_6] = AMR_FT_6_LEN_BITS,
+ [AMR_FT_7] = AMR_FT_7_LEN_BITS,
+ [AMR_FT_SID] = AMR_FT_SID_LEN_BITS,
+ [AMR_FT_GSM_EFR_SID] = AMR_FT_GSM_EFR_SID_LEN_BITS,
+ [AMR_FT_TDMA_EFR_SID] = AMR_FT_TDMA_EFR_SID_LEN_BITS,
+ [AMR_FT_PDC_EFR_SID] = AMR_FT_PDC_EFR_SID_LEN_BITS,
+ [12] = 0, /* for future use */
+ [13] = 0, /* for future use */
+ [14] = 0, /* for future use */
+ [AMR_FT_NO_DATA] = AMR_FT_NO_DATA_LEN_BITS,
+};
+
static size_t amr_ft_to_bytes[AMR_FT_MAX] = {
[AMR_FT_0] = AMR_FT_0_LEN,
[AMR_FT_1] = AMR_FT_1_LEN,
@@ -39,44 +68,59 @@ static size_t amr_ft_to_bytes[AMR_FT_MAX] = {
[AMR_FT_6] = AMR_FT_6_LEN,
[AMR_FT_7] = AMR_FT_7_LEN,
[AMR_FT_SID] = AMR_FT_SID_LEN,
+ [AMR_FT_GSM_EFR_SID] = AMR_FT_GSM_EFR_SID_LEN,
+ [AMR_FT_TDMA_EFR_SID] = AMR_FT_TDMA_EFR_SID_LEN,
+ [AMR_FT_PDC_EFR_SID] = AMR_FT_PDC_EFR_SID_LEN,
+ [12] = 0, /* for future use */
+ [13] = 0, /* for future use */
+ [14] = 0, /* for future use */
+ [AMR_FT_NO_DATA] = AMR_FT_NO_DATA_LEN,
};
+size_t osmo_amr_bits(uint8_t amr_ft)
+{
+ OSMO_ASSERT(amr_ft < AMR_FT_MAX);
+ return amr_ft_to_bits[amr_ft];
+}
+
size_t osmo_amr_bytes(uint8_t amr_ft)
{
+ OSMO_ASSERT(amr_ft < AMR_FT_MAX);
return amr_ft_to_bytes[amr_ft];
}
+int osmo_amr_bytes_to_ft(size_t bytes)
+{
+ int ft;
+
+ for (ft = 0; ft < AMR_FT_PDC_EFR_SID; ft++) {
+ if (amr_ft_to_bytes[ft] == bytes)
+ return ft;
+ }
+ /* 12-14 not used, jump to 15 (AMR_FT_NO_DATA): */
+ if (amr_ft_to_bytes[AMR_FT_NO_DATA] == bytes)
+ return AMR_FT_NO_DATA;
+
+ return -1;
+}
+
int osmo_amr_ft_valid(uint8_t amr_ft)
{
- /*
- * Extracted from RFC3267:
- *
- * "... with a FT value in the range 9-14 for AMR ... the whole packet
- * SHOULD be discarded."
- *
- * "... packets containing only NO_DATA frames (FT=15) SHOULD NOT be
- * transmitted."
- *
- * So, let's discard frames with a AMR FT >= 9.
- */
- if (amr_ft >= AMR_FT_MAX)
- return 0;
-
- return 1;
+ return amr_ft <= AMR_FT_PDC_EFR_SID || amr_ft == AMR_FT_NO_DATA;
}
/*! Check if an AMR frame is octet aligned by looking at the padding bits.
* \param[inout] payload user provided memory containing the AMR payload.
* \param[in] payload_len overall length of the AMR payload.
* \returns true when the payload is octet aligned. */
-bool osmo_amr_is_oa(uint8_t *payload, unsigned int payload_len)
+bool osmo_amr_is_oa(const uint8_t *payload, unsigned int payload_len)
{
/* NOTE: The distinction between octet-aligned and bandwith-efficient
* mode normally relys on out of band methods that explicitly select
* one of the two modes. (See also RFC 3267, chapter 3.8). However the
* A interface in GSM does not provide ways to communicate which mode
- * is used exactly used. The following functions uses some heuristics
- * to check if an AMR payload is octet aligned or not. */
+ * is exactly used. The following functions uses some heuristics to
+ * check if an AMR payload is octet aligned or not. */
struct amr_hdr *oa_hdr = (struct amr_hdr *)payload;
unsigned int frame_len;
@@ -103,7 +147,7 @@ bool osmo_amr_is_oa(uint8_t *payload, unsigned int payload_len)
/* Match the length of the received payload against the expected frame
* length that is defined by the frame type. */
- if(!osmo_amr_ft_valid(oa_hdr->ft))
+ if (!osmo_amr_ft_valid(oa_hdr->ft))
return false;
frame_len = osmo_amr_bytes(oa_hdr->ft);
if (frame_len != payload_len - sizeof(struct amr_hdr))
@@ -119,8 +163,10 @@ bool osmo_amr_is_oa(uint8_t *payload, unsigned int payload_len)
int osmo_amr_oa_to_bwe(uint8_t *payload, unsigned int payload_len)
{
struct amr_hdr *oa_hdr = (struct amr_hdr *)payload;
+ unsigned int ft = oa_hdr->ft;
unsigned int frame_len = payload_len - sizeof(struct amr_hdr);
unsigned int i;
+ int bwe_payload_len;
/* This implementation is not capable to handle multi-frame
* packets, so we need to make sure that the frame we operate on
@@ -128,8 +174,12 @@ int osmo_amr_oa_to_bwe(uint8_t *payload, unsigned int payload_len)
if (oa_hdr->f != 0)
return -1;
+ /* Check for valid FT (AMR mode) value */
+ if (!osmo_amr_ft_valid(oa_hdr->ft))
+ return -1;
+
/* Move TOC close to CMR */
- payload[0] |= (payload[1] >> 4) & 0x0f;
+ payload[0] = (payload[0] & 0xf0) | ((payload[1] >> 4) & 0x0f);
payload[1] = (payload[1] << 4) & 0xf0;
for (i = 0; i < frame_len; i++) {
@@ -137,8 +187,10 @@ int osmo_amr_oa_to_bwe(uint8_t *payload, unsigned int payload_len)
payload[i + 2] = payload[i + 2] << 6;
}
- /* The overall saving is one byte! */
- return payload_len - 1;
+ /* Calculate new payload length */
+ bwe_payload_len = OSMO_BYTES_FOR_BITS(AMR_HDR_BWE_LEN_BITS + osmo_amr_bits(ft));
+
+ return bwe_payload_len;
}
/*! Convert an AMR frame from bandwith-efficient mode to octet-aligned mode.
@@ -150,10 +202,15 @@ int osmo_amr_bwe_to_oa(uint8_t *payload, unsigned int payload_len,
unsigned int payload_maxlen)
{
uint8_t buf[256];
- unsigned int frame_len = payload_len - 1;
+ /* The header is only valid after shifting first two bytes to OA mode */
+ struct amr_hdr_bwe *bwe_hdr = (struct amr_hdr_bwe *)payload;
+ struct amr_hdr *oa_hdr;
unsigned int i;
+ unsigned int oa_payload_len;
+ uint8_t ft;
- memset(buf, 0, sizeof(buf));
+ if (payload_len < sizeof(struct amr_hdr_bwe))
+ return -1;
if (payload_len + 1 > payload_maxlen)
return -1;
@@ -161,16 +218,98 @@ int osmo_amr_bwe_to_oa(uint8_t *payload, unsigned int payload_len,
if (payload_len + 1 > sizeof(buf))
return -1;
- buf[0] = payload[0] & 0xf0;
- buf[1] = payload[0] << 4;
- buf[1] |= (payload[1] >> 4) & 0x0c;
+ ft = (bwe_hdr->ft_hi << 1) | bwe_hdr->ft_lo;
+ if (!osmo_amr_ft_valid(ft))
+ return -1;
+ if (OSMO_BYTES_FOR_BITS(AMR_HDR_BWE_LEN_BITS + osmo_amr_bits(ft)) > payload_len)
+ return -1;
+
+ memset(buf, 0, sizeof(buf));
+ oa_hdr = (struct amr_hdr *)buf;
+ oa_hdr->cmr = bwe_hdr->cmr;
+ oa_hdr->f = bwe_hdr->f;
+ oa_hdr->ft = ft;
+ oa_hdr->q = bwe_hdr->q;
+
+ /* Calculate new payload length */
+ oa_payload_len = 2 + osmo_amr_bytes(ft);
+
+ for (i = 2; i < oa_payload_len - 1; i++) {
+ buf[i] = payload[i - 1] << 2;
+ buf[i] |= payload[i] >> 6;
+ }
+ buf[i] = payload[i - 1] << 2;
+
+ memcpy(payload, buf, oa_payload_len);
+ return oa_payload_len;
+}
+
+/*! Convert an AMR frame from bandwith-efficient mode to IuuP/IuFP payload.
+ * The IuuP/IuPF payload only contains the class a, b, c bits. No header.
+ * \param[inout] payload user provided memory containing the AMR payload.
+ * \param[in] payload_len overall length of the AMR payload.
+ * \param[in] payload_maxlen maximum length of the user provided memory.
+ * \returns resulting payload length, negative on error. */
+int osmo_amr_bwe_to_iuup(uint8_t *payload, unsigned int payload_len)
+{
+ /* The header is only valid after shifting first two bytes to OA mode */
+ unsigned int i, required_len_bits;
+ unsigned int amr_speech_len_bytes, amr_speech_len_bits;
+ uint8_t ft;
+
+ if (payload_len < 2)
+ return -1;
+
+ /* Calculate new payload length */
+ ft = ((payload[0] & 0x07) << 1) | ((payload[1] & 0x80) >> 7);
+ if (!osmo_amr_ft_valid(ft))
+ return -1;
+
+ amr_speech_len_bits = osmo_amr_bits(ft);
+ amr_speech_len_bytes = osmo_amr_bytes(ft);
- for (i = 0; i < frame_len - 1; i++) {
- buf[i + 2] = payload[i + 1] << 2;
- buf[i + 2] |= payload[i + 2] >> 6;
+ /* shift of AMR_HDR_BWE_LEN_BITS (10) bits, aka remove BWE Hdr + ToC: */
+ required_len_bits = AMR_HDR_BWE_LEN_BITS + amr_speech_len_bits;
+ if (payload_len < OSMO_BYTES_FOR_BITS(required_len_bits))
+ return -1;
+
+ for (i = 0; i < amr_speech_len_bytes; i++) {
+ /* we have to shift the payload by 10 bits to get only the Class A, B, C bits */
+ payload[i] = (payload[i + 1] << 2) | ((payload[i + 2]) >> 6);
}
- buf[i + 2] = payload[i + 1] << 2;
- memcpy(payload, buf, payload_len + 1);
- return payload_len + 1;
+ return amr_speech_len_bytes;
+}
+
+/*! Convert an AMR frame from IuuP/IuFP payload to bandwith-efficient mode.
+ * The IuuP/IuPF payload only contains the class a, b, c bits. No header.
+ * The resulting buffer has space at the start prepared to be filled by CMR, TOC.
+ * \param[inout] payload user provided memory containing the AMR payload.
+ * \param[in] payload_len overall length of the AMR payload.
+ * \param[in] payload_maxlen maximum length of the user provided memory (payload_len + 2 required).
+ * \returns resulting payload length, negative on error. */
+int osmo_amr_iuup_to_bwe(uint8_t *payload, unsigned int payload_len,
+ unsigned int payload_maxlen)
+{
+ /* shift all bits by AMR_HDR_BWE_LEN_BITS (10) */
+ unsigned int i, required_len_bits, required_len_bytes;
+
+ int ft = osmo_amr_bytes_to_ft(payload_len);
+ if (ft < 0)
+ return ft;
+
+ required_len_bits = osmo_amr_bits(ft) + 10;
+ required_len_bytes = OSMO_BYTES_FOR_BITS(required_len_bits);
+ if (payload_maxlen < required_len_bytes)
+ return -1;
+
+ i = payload_len + 1;
+ payload[i] = (payload[i - 2] << 6);
+ for (i = payload_len; i >= 2; i--) {
+ /* we have to shift the payload by AMR_HDR_BWE_LEN_BITS (10) bits to prepend BWE Hdr + ToC */
+ payload[i] = (payload[i - 1] >> 2) | (payload[i - 2] << 6);
+ }
+ payload[i] = (payload[i - 1] >> 2);
+ payload[0] = 0;
+ return required_len_bytes;
}
diff --git a/src/datagram.c b/src/datagram.c
index 634e702..9df0630 100644
--- a/src/datagram.c
+++ b/src/datagram.c
@@ -41,13 +41,12 @@
#include <osmocom/netif/datagram.h>
-/*! \addtogroup datagram Osmocom Datagram Socket
+#define OSMO_DGRAM_CLI_F_RECONF (1 << 0)
+
+#define OSMO_DGRAM_RX_F_RECONF (1 << 0)
+
+/*! \addtogroup datagram
* @{
- *
- * This code is intended to abstract any use of datagram type sockets,
- * such as UDP. It offers both transmitter and receiver side
- * implementations, fully integrated with the libosmocore select loop
- * abstraction.
*/
/*! \file datagram.c
@@ -59,8 +58,6 @@
* Client side.
*/
-#define OSMO_DGRAM_CLI_F_RECONF (1 << 0)
-
struct osmo_dgram_tx {
struct osmo_fd ofd;
struct llist_head tx_queue;
@@ -95,7 +92,7 @@ static int osmo_dgram_tx_write(struct osmo_dgram_tx *conn)
LOGP(DLINP, LOGL_DEBUG, "sending data\n");
if (llist_empty(&conn->tx_queue)) {
- conn->ofd.when &= ~BSC_FD_WRITE;
+ osmo_fd_write_disable(&conn->ofd);
return 0;
}
lh = conn->tx_queue.next;
@@ -115,11 +112,11 @@ static int osmo_dgram_tx_fd_cb(struct osmo_fd *ofd, unsigned int what)
{
struct osmo_dgram_tx *conn = ofd->data;
- if (what & BSC_FD_WRITE) {
+ if (what & OSMO_FD_WRITE) {
LOGP(DLINP, LOGL_DEBUG, "write\n");
osmo_dgram_tx_write(conn);
}
- return 0;
+ return 0;
}
/*! \brief Create an Osmocom datagram transmitter
@@ -135,11 +132,7 @@ struct osmo_dgram_tx *osmo_dgram_tx_create(void *ctx)
if (!conn)
return NULL;
- conn->ofd.fd = -1;
- conn->ofd.when |= BSC_FD_READ;
- conn->ofd.priv_nr = 0; /* XXX */
- conn->ofd.cb = osmo_dgram_tx_fd_cb;
- conn->ofd.data = conn;
+ osmo_fd_setup(&conn->ofd, -1, OSMO_FD_READ, osmo_dgram_tx_fd_cb, conn, 0);
INIT_LLIST_HEAD(&conn->tx_queue);
return conn;
@@ -240,22 +233,20 @@ void osmo_dgram_tx_send(struct osmo_dgram_tx *conn,
struct msgb *msg)
{
msgb_enqueue(&conn->tx_queue, msg);
- conn->ofd.when |= BSC_FD_WRITE;
+ osmo_fd_write_enable(&conn->ofd);
}
/*
* Server side.
*/
-#define OSMO_DGRAM_RX_F_RECONF (1 << 0)
-
struct osmo_dgram_rx {
- struct osmo_fd ofd;
- char *addr;
- uint16_t port;
+ struct osmo_fd ofd;
+ char *addr;
+ uint16_t port;
int (*cb)(struct osmo_dgram_rx *conn);
- void *data;
- unsigned int flags;
+ void *data;
+ unsigned int flags;
};
/*! \brief Receive data via Osmocom datagram receiver
@@ -290,7 +281,7 @@ static int osmo_dgram_rx_cb(struct osmo_fd *ofd, unsigned int what)
struct osmo_dgram_rx *conn = ofd->data;
LOGP(DLINP, LOGL_DEBUG, "read\n");
- if (what & BSC_FD_READ)
+ if (what & OSMO_FD_READ)
osmo_dgram_rx_read(conn);
return 0;
@@ -309,10 +300,7 @@ struct osmo_dgram_rx *osmo_dgram_rx_create(void *ctx)
if (!conn)
return NULL;
- conn->ofd.fd = -1;
- conn->ofd.when |= BSC_FD_READ;
- conn->ofd.cb = osmo_dgram_rx_cb;
- conn->ofd.data = conn;
+ osmo_fd_setup(&conn->ofd, -1, OSMO_FD_READ, osmo_dgram_rx_cb, conn, 0);
return conn;
}
diff --git a/src/ipa.c b/src/ipa.c
index 197a47f..8720427 100644
--- a/src/ipa.c
+++ b/src/ipa.c
@@ -97,6 +97,11 @@ struct msgb *osmo_ipa_msg_alloc(int headroom)
return msg;
}
+struct msgb *osmo_ipa_ext_msg_alloc(size_t headroom)
+{
+ return osmo_ipa_msg_alloc(sizeof(struct ipa_head_ext) + headroom);
+}
+
void osmo_ipa_msg_push_header(struct msgb *msg, uint8_t proto)
{
struct ipa_head *hh;
@@ -366,3 +371,80 @@ osmo_ipa_parse_msg_id_resp(struct msgb *msg, struct ipaccess_unit *unit_data)
return 0;
}
+
+#define MSG_CB_IPA_INFO_OFFSET 0
+
+/* Check and remove headers (in case of p == IPAC_PROTO_OSMO, also the IPA extension header).
+ * Returns a negative number on error, otherwise the number of octets removed */
+static inline int ipa_check_pull_headers(struct msgb *msg)
+{
+ int ret;
+ size_t octets_removed = 0;
+ msg->l1h = msg->data;
+ struct ipa_head *ih = (struct ipa_head *)msg->data;
+ osmo_ipa_msgb_cb_proto(msg) = ih->proto;
+
+ if ((ret = osmo_ipa_process_msg(msg)) < 0) {
+ LOGP(DLINP, LOGL_ERROR, "Error processing IPA message\n");
+ return -EIO;
+ }
+ msgb_pull(msg, sizeof(struct ipa_head));
+ octets_removed += sizeof(struct ipa_head);
+ msg->l2h = msg->data;
+ if (ih->proto != IPAC_PROTO_OSMO)
+ return octets_removed;
+
+ osmo_ipa_msgb_cb_proto_ext(msg) = msg->data[0];
+ msgb_pull(msg, sizeof(struct ipa_head_ext));
+ octets_removed += sizeof(struct ipa_head_ext);
+ return octets_removed;
+}
+
+/*! Segmentation callback used by libosmo-netif streaming backend
+ * See definition of `struct osmo_io_ops` for callback semantics
+ * \param[out] msg Original `struct msgb` received via osmo_io
+ * \returns The total packet length indicated by the first header,
+ * otherwise negative number on error. Constants:
+ * -EAGAIN, if the header has not been read yet,
+ * -ENOBUFS, if the header declares a payload too large
+ */
+int osmo_ipa_segmentation_cb(struct msgb *msg)
+{
+ const struct ipa_head *hh = (const struct ipa_head *) msg->data;
+ size_t payload_len, total_len;
+ size_t available = msgb_length(msg) + msgb_tailroom(msg);
+ int removed_octets = 0;
+
+ if (msgb_length(msg) < sizeof(*hh)) {
+ /* Haven't even read the entire header */
+ return -EAGAIN;
+ }
+ payload_len = osmo_ntohs(hh->len);
+ total_len = sizeof(*hh) + payload_len;
+ if (OSMO_UNLIKELY(available < total_len)) {
+ LOGP(DLINP, LOGL_ERROR, "Not enough space left in message buffer. "
+ "Have %zu octets, but need %zu\n",
+ available, total_len);
+ return -ENOBUFS;
+ }
+ if (total_len <= msgb_length(msg)) {
+ removed_octets = ipa_check_pull_headers(msg);
+ if (removed_octets < 0) {
+ LOGP(DLINP, LOGL_ERROR, "Error pulling IPA headers\n");
+ return removed_octets;
+ }
+ }
+ return total_len;
+}
+
+/*! Push IPA headers to a message
+ * If we have IPAC_PROTO_OSMO this also takes care of the extension header
+ * \param[out] msg Target message
+ * \param p Target IPA protocol
+ * \param pe Target IPA protocol extension. Ignored, unless p equals IPAC_PROTO_OSMO. */
+void osmo_ipa_msg_push_headers(struct msgb *msg, enum ipaccess_proto p, enum ipaccess_proto_ext pe)
+{
+ if (p == IPAC_PROTO_OSMO)
+ ipa_prepend_header_ext(msg, pe);
+ osmo_ipa_msg_push_header(msg, p);
+}
diff --git a/src/jibuf.c b/src/jibuf.c
index 502f6e5..a351248 100644
--- a/src/jibuf.c
+++ b/src/jibuf.c
@@ -26,10 +26,6 @@
#include <arpa/inet.h>
-/*! \addtogroup jibuf Osmocom Jitter Buffer
- * @{
- */
-
/*! \file jibuf.c
* \brief Osmocom Jitter Buffer helpers
*/
@@ -290,6 +286,11 @@ static void recalc_threshold_delay(struct osmo_jibuf *jb)
}
+/*! \addtogroup jibuf Osmocom Jitter Buffer
+ * @{
+ */
+
+
//----------------------------------
/*! \brief Allocate a new jitter buffer instance
diff --git a/src/osmux.c b/src/osmux.c
index 8b6a115..999aedd 100644
--- a/src/osmux.c
+++ b/src/osmux.c
@@ -1,7 +1,7 @@
/*
* (C) 2012-2017 by Pablo Neira Ayuso <pablo@gnumonks.org>
* (C) 2012 by On Waves ehf <http://www.on-waves.com>
- * (C) 2015-2017 by sysmocom - s.f.m.c. GmbH
+ * (C) 2015-2022 by sysmocom - s.f.m.c. GmbH
*
* SPDX-License-Identifier: GPL-2.0+
*
@@ -27,6 +27,14 @@
#include <arpa/inet.h>
+#define SNPRINTF_BUFFER_SIZE(ret, remain, offset) \
+ if (ret < 0) \
+ ret = 0; \
+ offset += ret; \
+ if (ret > remain) \
+ ret = remain; \
+ remain -= ret;
+
/*! \addtogroup osmux Osmocom Multiplex Protocol
* @{
*
@@ -44,997 +52,22 @@
* \brief Osmocom multiplex protocol helpers
*/
-
-/* This allows you to debug timing reconstruction in the output path */
-#if 0
-#define DEBUG_TIMING 0
-#endif
-
-/* This allows you to debug osmux message transformations (spamming) */
-#if 0
-#define DEBUG_MSG 0
-#endif
-
-/* delta time between two RTP messages (in microseconds) */
-#define DELTA_RTP_MSG 20000
-/* delta time between two RTP messages (in samples, 8kHz) */
-#define DELTA_RTP_TIMESTAMP 160
-
-static void *osmux_ctx;
-
static uint32_t osmux_get_payload_len(struct osmux_hdr *osmuxh)
{
return osmo_amr_bytes(osmuxh->amr_ft) * (osmuxh->ctr+1);
}
-static uint32_t osmux_ft_dummy_size(uint8_t amr_ft, uint8_t batch_factor)
-{
- return sizeof(struct osmux_hdr) + (osmo_amr_bytes(amr_ft) * batch_factor);
-}
-
-struct osmux_hdr *osmux_xfrm_output_pull(struct msgb *msg)
-{
- struct osmux_hdr *osmuxh;
-next:
- osmuxh = NULL;
- if (msg->len > sizeof(struct osmux_hdr)) {
- size_t len;
-
- osmuxh = (struct osmux_hdr *)msg->data;
-
- switch (osmuxh->ft) {
- case OSMUX_FT_VOICE_AMR:
- break;
- case OSMUX_FT_DUMMY:
- len = osmux_ft_dummy_size(osmuxh->amr_ft, osmuxh->ctr + 1);
- if (msgb_length(msg) < len) {
- LOGP(DLMUX, LOGL_ERROR, "Discarding bad Dummy FT: %s\n",
- osmo_hexdump(msg->data, msgb_length(msg)));
- return NULL;
- }
- msgb_pull(msg, len);
- goto next;
- default:
- LOGP(DLMUX, LOGL_ERROR, "Discarding unsupported Osmux FT %d\n",
- osmuxh->ft);
- return NULL;
- }
- if (!osmo_amr_ft_valid(osmuxh->amr_ft)) {
- LOGP(DLMUX, LOGL_ERROR, "Discarding bad AMR FT %d\n",
- osmuxh->amr_ft);
- return NULL;
- }
-
- len = osmo_amr_bytes(osmuxh->amr_ft) * (osmuxh->ctr+1) +
- sizeof(struct osmux_hdr);
-
- if (msgb_length(msg) < len) {
- LOGP(DLMUX, LOGL_ERROR,
- "Discarding malformed OSMUX message: %s\n",
- osmo_hexdump(msg->data, msgb_length(msg)));
- return NULL;
- }
-
- msgb_pull(msg, len);
- } else if (msg->len > 0) {
- LOGP(DLMUX, LOGL_ERROR,
- "remaining %d bytes, broken osmuxhdr?\n", msg->len);
- }
-
- return osmuxh;
-}
-
-static struct msgb *
-osmux_rebuild_rtp(struct osmux_out_handle *h, struct osmux_hdr *osmuxh,
- void *payload, int payload_len, bool first_pkt)
-{
- struct msgb *prev_msg, *out_msg;
- struct timespec *prev_ts, *out_ts;
- struct rtp_hdr *rtph;
- struct amr_hdr *amrh;
- struct timespec delta = { .tv_sec = 0, .tv_nsec = DELTA_RTP_MSG*1000 };
-
- out_msg = msgb_alloc(sizeof(struct rtp_hdr) +
- sizeof(struct amr_hdr) +
- osmo_amr_bytes(osmuxh->amr_ft),
- "OSMUX test");
- if (out_msg == NULL)
- return NULL;
-
- /* Reconstruct RTP header */
- rtph = (struct rtp_hdr *)out_msg->data;
- rtph->csrc_count = 0;
- rtph->extension = 0;
- rtph->version = RTP_VERSION;
- rtph->payload_type = h->rtp_payload_type;
- /* ... emulate timestamp and ssrc */
- rtph->timestamp = htonl(h->rtp_timestamp);
- rtph->sequence = htons(h->rtp_seq);
- rtph->ssrc = htonl(h->rtp_ssrc);
- /* rtp packet with the marker bit is always guaranteed to be the first
- * one. We want to notify with marker in 2 scenarios:
- * 1- Sender told us through osmux frame rtp_m.
- * 2- Sntermediate osmux frame lost (seq gap), otherwise rtp receiver only sees
- * steady increase of delay
- */
- rtph->marker = first_pkt &&
- (osmuxh->rtp_m || (osmuxh->seq != h->osmux_seq_ack + 1));
-
- msgb_put(out_msg, sizeof(struct rtp_hdr));
-
- /* Reconstruct AMR header */
- amrh = (struct amr_hdr *)out_msg->tail;
- amrh->cmr = osmuxh->amr_cmr;
- amrh->f = osmuxh->amr_f;
- amrh->ft = osmuxh->amr_ft;
- amrh->q = osmuxh->amr_q;
-
- msgb_put(out_msg, sizeof(struct amr_hdr));
-
- /* add AMR speech data */
- memcpy(out_msg->tail, payload, payload_len);
- msgb_put(out_msg, payload_len);
-
- /* bump last RTP sequence number and timestamp that has been used */
- h->rtp_seq++;
- h->rtp_timestamp += DELTA_RTP_TIMESTAMP;
-
- out_ts = ((struct timespec *)&((out_msg)->cb[0]));
- if (first_pkt || llist_empty(&h->list)) {
- osmo_clock_gettime(CLOCK_MONOTONIC, out_ts);
- } else {
- prev_msg = llist_last_entry(&h->list, struct msgb, list);
- prev_ts = ((struct timespec *)&((prev_msg)->cb[0]));
- timespecadd(prev_ts, &delta, out_ts);
- }
-
- return out_msg;
-}
-
-int osmux_xfrm_output(struct osmux_hdr *osmuxh, struct osmux_out_handle *h,
- struct llist_head *list)
-{
- struct msgb *msg;
- int i;
-
- INIT_LLIST_HEAD(list);
-
- for (i=0; i<osmuxh->ctr+1; i++) {
- struct rtp_hdr *rtph;
-
- msg = osmux_rebuild_rtp(h, osmuxh,
- osmux_get_payload(osmuxh) +
- i * osmo_amr_bytes(osmuxh->amr_ft),
- osmo_amr_bytes(osmuxh->amr_ft), !i);
- if (msg == NULL)
- continue;
-
- rtph = osmo_rtp_get_hdr(msg);
- if (rtph == NULL)
- continue;
-
-#ifdef DEBUG_MSG
- {
- char buf[4096];
-
- osmo_rtp_snprintf(buf, sizeof(buf), msg);
- buf[sizeof(buf)-1] = '\0';
- LOGP(DLMUX, LOGL_DEBUG, "to BTS: %s\n", buf);
- }
-#endif
- llist_add_tail(&msg->list, list);
- }
-
- /* Update last seen seq number: */
- h->osmux_seq_ack = osmuxh->seq;
-
- return i;
-}
-
-static void osmux_xfrm_output_trigger(void *data)
-{
- struct osmux_out_handle *h = data;
- struct timespec delay_ts, now;
- struct msgb *msg, *next;
-
- llist_for_each_entry_safe(msg, next, &h->list, list) {
- osmo_clock_gettime(CLOCK_MONOTONIC, &now);
- struct timespec *msg_ts = ((struct timespec *)&((msg)->cb[0]));
- if (timespeccmp(msg_ts, &now, >)) {
- timespecsub(msg_ts, &now, &delay_ts);
- osmo_timer_schedule(&h->timer,
- delay_ts.tv_sec, delay_ts.tv_nsec / 1000);
- return;
- }
-
- /* Transmit the rtp packet */
- llist_del(&msg->list);
- if (h->tx_cb)
- h->tx_cb(msg, h->data);
- else
- msgb_free(msg);
- }
-}
-
-/*! \brief Generate RTP packets from osmux frame AMR payload set and schedule
- * them for transmission at appropiate time.
- * \param[in] h the osmux out handle handling a specific CID
- * \param[in] osmuxh Buffer pointing to osmux frame header structure and AMR payload
- * \return Number of generated RTP packets
- *
- * The osmux frame passed to this function must be of the type OSMUX_FT_VOICE_AMR.
- * The generated RTP packets are kept into h's internal list and sent to the
- * callback configured through osmux_xfrm_output_set_tx_cb when are ready to be
- * transmitted according to schedule.
- */
-int osmux_xfrm_output_sched(struct osmux_out_handle *h, struct osmux_hdr *osmuxh)
-{
- struct timespec now, *msg_ts;
- struct msgb *msg;
- int i;
- bool was_empty = llist_empty(&h->list);
-
- if (!was_empty) {
- /* If we received new data it means we are behind schedule and
- * we should flush all previous quickly */
- osmo_clock_gettime(CLOCK_MONOTONIC, &now);
- llist_for_each_entry(msg, &h->list, list) {
- msg_ts = ((struct timespec *)&((msg)->cb[0]));
- *msg_ts = now;
- }
- osmo_timer_schedule(&h->timer, 0, 0);
- }
-
- for (i=0; i<osmuxh->ctr+1; i++) {
- struct rtp_hdr *rtph;
-
- msg = osmux_rebuild_rtp(h, osmuxh,
- osmux_get_payload(osmuxh) +
- i * osmo_amr_bytes(osmuxh->amr_ft),
- osmo_amr_bytes(osmuxh->amr_ft), !i);
- if (msg == NULL)
- continue;
-
- rtph = osmo_rtp_get_hdr(msg);
- if (rtph == NULL)
- continue;
-
- llist_add_tail(&msg->list, &h->list);
- }
-
- /* Update last seen seq number: */
- h->osmux_seq_ack = osmuxh->seq;
-
- /* In case list is still empty after parsing messages, no need to rearm */
- if(was_empty && !llist_empty(&h->list))
- osmux_xfrm_output_trigger(h);
- return i;
-}
-
-/*! \brief Flush all scheduled RTP packets still pending to be transmitted
- * \param[in] h the osmux out handle to flush
- *
- * This function will immediately call the transmit callback for all queued RTP
- * packets, making sure the list ends up empty. It will also stop all internal
- * timers to make sure the osmux_out_handle can be dropped or re-used by calling
- * osmux_xfrm_output on it.
- */
-void osmux_xfrm_output_flush(struct osmux_out_handle *h)
-{
- struct msgb *msg, *next;
-
- if (osmo_timer_pending(&h->timer))
- osmo_timer_del(&h->timer);
-
- llist_for_each_entry_safe(msg, next, &h->list, list) {
- llist_del(&msg->list);
- if (h->tx_cb)
- h->tx_cb(msg, h->data);
- else
- msgb_free(msg);
- }
-}
-
-struct osmux_batch {
- struct osmo_timer_list timer;
- struct osmux_hdr *osmuxh;
- struct llist_head circuit_list;
- unsigned int remaining_bytes;
- uint8_t seq;
- uint32_t nmsgs;
- int ndummy;
-};
-
-struct osmux_circuit {
- struct llist_head head;
- int ccid;
- struct llist_head msg_list;
- int nmsgs;
- int dummy;
-};
-
-static int osmux_batch_enqueue(struct msgb *msg, struct osmux_circuit *circuit,
- uint8_t batch_factor)
-{
- /* Validate amount of messages per batch. The counter field of the
- * osmux header is just 3 bits long, so make sure it doesn't overflow.
- */
- if (circuit->nmsgs >= batch_factor || circuit->nmsgs >= 8) {
- struct rtp_hdr *rtph;
-
- rtph = osmo_rtp_get_hdr(msg);
- if (rtph == NULL)
- return -1;
-
- LOGP(DLMUX, LOGL_DEBUG, "Batch is full for RTP "
- "ssrc=%u\n", rtph->ssrc);
- return -1;
- }
-
- llist_add_tail(&msg->list, &circuit->msg_list);
- circuit->nmsgs++;
- return 0;
-}
-
-static void osmux_batch_dequeue(struct msgb *msg, struct osmux_circuit *circuit)
-{
- llist_del(&msg->list);
- circuit->nmsgs--;
-}
-
-static void osmux_circuit_del_msgs(struct osmux_batch *batch, struct osmux_circuit *circuit)
-{
- struct msgb *cur, *tmp;
- llist_for_each_entry_safe(cur, tmp, &circuit->msg_list, list) {
- osmux_batch_dequeue(cur, circuit);
- msgb_free(cur);
- batch->nmsgs--;
- }
-}
-
-struct osmux_input_state {
- struct msgb *out_msg;
- struct msgb *msg;
- struct rtp_hdr *rtph;
- struct amr_hdr *amrh;
- uint32_t amr_payload_len;
- int ccid;
- int add_osmux_hdr;
-};
-
-static int osmux_batch_put(struct osmux_batch *batch,
- struct osmux_input_state *state)
-{
- struct osmux_hdr *osmuxh;
-
- if (state->add_osmux_hdr) {
- osmuxh = (struct osmux_hdr *)state->out_msg->tail;
- osmuxh->ft = OSMUX_FT_VOICE_AMR;
- osmuxh->ctr = 0;
- osmuxh->rtp_m = osmuxh->rtp_m || state->rtph->marker;
- osmuxh->amr_f = state->amrh->f;
- osmuxh->amr_q= state->amrh->q;
- osmuxh->seq = batch->seq++;
- osmuxh->circuit_id = state->ccid;
- osmuxh->amr_cmr = state->amrh->cmr;
- osmuxh->amr_ft = state->amrh->ft;
- msgb_put(state->out_msg, sizeof(struct osmux_hdr));
-
- /* annotate current osmux header */
- batch->osmuxh = osmuxh;
- } else {
- if (batch->osmuxh->ctr == 0x7) {
- LOGP(DLMUX, LOGL_ERROR, "cannot add msg=%p, "
- "too many messages for this RTP ssrc=%u\n",
- state->msg, state->rtph->ssrc);
- return 0;
- }
- batch->osmuxh->ctr++;
- }
-
- memcpy(state->out_msg->tail, osmo_amr_get_payload(state->amrh),
- state->amr_payload_len);
- msgb_put(state->out_msg, state->amr_payload_len);
-
- return 0;
-}
-
-static int osmux_xfrm_encode_amr(struct osmux_batch *batch,
- struct osmux_input_state *state)
-{
- uint32_t amr_len;
-
- state->amrh = osmo_rtp_get_payload(state->rtph, state->msg, &amr_len);
- if (state->amrh == NULL)
- return -1;
-
- state->amr_payload_len = amr_len - sizeof(struct amr_hdr);
-
- if (osmux_batch_put(batch, state) < 0)
- return -1;
-
- return 0;
-}
-
-static void osmux_encode_dummy(struct osmux_batch *batch, uint8_t batch_factor,
- struct osmux_input_state *state)
-{
- struct osmux_hdr *osmuxh;
- /* TODO: This should be configurable at some point. */
- uint32_t payload_size = osmux_ft_dummy_size(AMR_FT_3, batch_factor) -
- sizeof(struct osmux_hdr);
-
- osmuxh = (struct osmux_hdr *)state->out_msg->tail;
- osmuxh->ft = OSMUX_FT_DUMMY;
- osmuxh->ctr = batch_factor - 1;
- osmuxh->amr_f = 0;
- osmuxh->amr_q= 0;
- osmuxh->seq = 0;
- osmuxh->circuit_id = state->ccid;
- osmuxh->amr_cmr = 0;
- osmuxh->amr_ft = AMR_FT_3;
- msgb_put(state->out_msg, sizeof(struct osmux_hdr));
-
- memset(state->out_msg->tail, 0xff, payload_size);
- msgb_put(state->out_msg, payload_size);
-}
-
-static struct msgb *osmux_build_batch(struct osmux_batch *batch,
- uint32_t batch_size, uint8_t batch_factor)
-{
- struct msgb *batch_msg;
- struct osmux_circuit *circuit;
-
-#ifdef DEBUG_MSG
- LOGP(DLMUX, LOGL_DEBUG, "Now building batch\n");
-#endif
-
- batch_msg = msgb_alloc(batch_size, "osmux");
- if (batch_msg == NULL) {
- LOGP(DLMUX, LOGL_ERROR, "Not enough memory\n");
- return NULL;
- }
-
- llist_for_each_entry(circuit, &batch->circuit_list, head) {
- struct msgb *cur, *tmp;
- int ctr = 0;
-
- if (circuit->dummy) {
- struct osmux_input_state state = {
- .out_msg = batch_msg,
- .ccid = circuit->ccid,
- };
- osmux_encode_dummy(batch, batch_factor, &state);
- continue;
- }
-
- llist_for_each_entry_safe(cur, tmp, &circuit->msg_list, list) {
- struct osmux_input_state state = {
- .msg = cur,
- .out_msg = batch_msg,
- .ccid = circuit->ccid,
- };
-#ifdef DEBUG_MSG
- char buf[4096];
-
- osmo_rtp_snprintf(buf, sizeof(buf), cur);
- buf[sizeof(buf)-1] = '\0';
- LOGP(DLMUX, LOGL_DEBUG, "to BSC-NAT: %s\n", buf);
-#endif
-
- state.rtph = osmo_rtp_get_hdr(cur);
- if (state.rtph == NULL)
- return NULL;
-
- if (ctr == 0) {
-#ifdef DEBUG_MSG
- LOGP(DLMUX, LOGL_DEBUG, "add osmux header\n");
-#endif
- state.add_osmux_hdr = 1;
- }
-
- osmux_xfrm_encode_amr(batch, &state);
- osmux_batch_dequeue(cur, circuit);
- msgb_free(cur);
- ctr++;
- batch->nmsgs--;
- }
- }
- return batch_msg;
-}
-
-void osmux_xfrm_input_deliver(struct osmux_in_handle *h)
-{
- struct msgb *batch_msg;
- struct osmux_batch *batch = (struct osmux_batch *)h->internal_data;
-
-#ifdef DEBUG_MSG
- LOGP(DLMUX, LOGL_DEBUG, "invoking delivery function\n");
-#endif
- batch_msg = osmux_build_batch(batch, h->batch_size, h->batch_factor);
- if (!batch_msg)
- return;
- h->stats.output_osmux_msgs++;
- h->stats.output_osmux_bytes += batch_msg->len;
-
- h->deliver(batch_msg, h->data);
- osmo_timer_del(&batch->timer);
- batch->remaining_bytes = h->batch_size;
-
- if (batch->ndummy) {
- osmo_timer_schedule(&batch->timer, 0,
- h->batch_factor * DELTA_RTP_MSG);
- }
-}
-
-static void osmux_batch_timer_expired(void *data)
-{
- struct osmux_in_handle *h = data;
-
-#ifdef DEBUG_MSG
- LOGP(DLMUX, LOGL_DEBUG, "osmux_batch_timer_expired\n");
-#endif
- osmux_xfrm_input_deliver(h);
-}
-
-static int osmux_rtp_amr_payload_len(struct msgb *msg, struct rtp_hdr *rtph)
-{
- struct amr_hdr *amrh;
- unsigned int amr_len;
- int amr_payload_len;
-
- amrh = osmo_rtp_get_payload(rtph, msg, &amr_len);
- if (amrh == NULL)
- return -1;
-
- if (!osmo_amr_ft_valid(amrh->ft))
- return -1;
-
- amr_payload_len = amr_len - sizeof(struct amr_hdr);
-
- /* The AMR payload does not fit with what we expect */
- if (osmo_amr_bytes(amrh->ft) != amr_payload_len) {
- LOGP(DLMUX, LOGL_ERROR,
- "Bad AMR frame, expected %zd bytes, got %d bytes\n",
- osmo_amr_bytes(amrh->ft), amr_len);
- return -1;
- }
- return amr_payload_len;
-}
-
-static void osmux_replay_lost_packets(struct osmux_circuit *circuit,
- struct rtp_hdr *cur_rtph, int batch_factor)
-{
- int16_t diff;
- struct msgb *last;
- struct rtp_hdr *rtph;
- int i;
-
- /* Have we see any RTP packet in this batch before? */
- if (llist_empty(&circuit->msg_list))
- return;
-
- /* Get last RTP packet seen in this batch */
- last = llist_entry(circuit->msg_list.prev, struct msgb, list);
- rtph = osmo_rtp_get_hdr(last);
- if (rtph == NULL)
- return;
-
- diff = ntohs(cur_rtph->sequence) - ntohs(rtph->sequence);
-
- /* Lifesaver: make sure bugs don't spawn lots of clones */
- if (diff > 16)
- diff = 16;
-
- /* If diff between last RTP packet seen and this one is > 1,
- * then we lost several RTP packets, let's replay them.
- */
- for (i=1; i<diff; i++) {
- struct msgb *clone;
-
- /* Clone last RTP packet seen */
- clone = msgb_alloc(last->data_len, "RTP clone");
- if (!clone)
- continue;
-
- memcpy(clone->data, last->data, last->len);
- msgb_put(clone, last->len);
-
- /* The original RTP message has been already sanity check. */
- rtph = osmo_rtp_get_hdr(clone);
-
- /* Adjust sequence number and timestamp */
- rtph->sequence = htons(ntohs(rtph->sequence) + i);
- rtph->timestamp = htonl(ntohl(rtph->timestamp) +
- DELTA_RTP_TIMESTAMP);
-
- /* No more room in this batch, skip padding with more clones */
- if (osmux_batch_enqueue(clone, circuit, batch_factor) < 0) {
- msgb_free(clone);
- break;
- }
-
- LOGP(DLMUX, LOGL_ERROR, "adding cloned RTP\n");
- }
-}
-
-static struct osmux_circuit *
-osmux_batch_find_circuit(struct osmux_batch *batch, int ccid)
-{
- struct osmux_circuit *circuit;
-
- llist_for_each_entry(circuit, &batch->circuit_list, head) {
- if (circuit->ccid == ccid)
- return circuit;
- }
- return NULL;
-}
-
-static struct osmux_circuit *
-osmux_batch_add_circuit(struct osmux_batch *batch, int ccid, int dummy,
- uint8_t batch_factor)
-{
- struct osmux_circuit *circuit;
-
- circuit = osmux_batch_find_circuit(batch, ccid);
- if (circuit != NULL) {
- LOGP(DLMUX, LOGL_ERROR, "circuit %u already exists!\n", ccid);
- return NULL;
- }
-
- circuit = talloc_zero(osmux_ctx, struct osmux_circuit);
- if (circuit == NULL) {
- LOGP(DLMUX, LOGL_ERROR, "OOM on circuit %u\n", ccid);
- return NULL;
- }
-
- circuit->ccid = ccid;
- INIT_LLIST_HEAD(&circuit->msg_list);
- llist_add_tail(&circuit->head, &batch->circuit_list);
-
- if (dummy) {
- circuit->dummy = dummy;
- batch->ndummy++;
- if (!osmo_timer_pending(&batch->timer))
- osmo_timer_schedule(&batch->timer, 0,
- batch_factor * DELTA_RTP_MSG);
- }
- return circuit;
-}
-
-static void osmux_batch_del_circuit(struct osmux_batch *batch, struct osmux_circuit *circuit)
-{
- if (circuit->dummy)
- batch->ndummy--;
- llist_del(&circuit->head);
- osmux_circuit_del_msgs(batch, circuit);
- talloc_free(circuit);
-}
-
-static int
-osmux_batch_add(struct osmux_batch *batch, uint32_t batch_factor, struct msgb *msg,
- struct rtp_hdr *rtph, int ccid)
-{
- int bytes = 0, amr_payload_len;
- struct osmux_circuit *circuit;
- struct msgb *cur;
-
- circuit = osmux_batch_find_circuit(batch, ccid);
- if (!circuit)
- return -1;
-
- /* We've seen the first RTP message, disable dummy padding */
- if (circuit->dummy) {
- circuit->dummy = 0;
- batch->ndummy--;
- }
- amr_payload_len = osmux_rtp_amr_payload_len(msg, rtph);
- if (amr_payload_len < 0)
- return -1;
-
- /* First check if there is room for this message in the batch */
- bytes += amr_payload_len;
- if (circuit->nmsgs == 0)
- bytes += sizeof(struct osmux_hdr);
-
- /* No room, sorry. You'll have to retry */
- if (bytes > batch->remaining_bytes)
- return 1;
-
- /* Init of talkspurt (RTP M marker bit) needs to be in the first AMR slot
- * of the OSMUX packet, enforce sending previous batch if required:
- */
- if (rtph->marker && circuit->nmsgs != 0)
- return 1;
-
-
- /* Extra validation: check if this message already exists, should not
- * happen but make sure we don't propagate duplicated messages.
- */
- llist_for_each_entry(cur, &circuit->msg_list, list) {
- struct rtp_hdr *rtph2 = osmo_rtp_get_hdr(cur);
- if (rtph2 == NULL)
- return -1;
-
- /* Already exists message with this sequence, skip */
- if (rtph2->sequence == rtph->sequence) {
- LOGP(DLMUX, LOGL_ERROR, "already exists "
- "message with seq=%u, skip it\n",
- rtph->sequence);
- return -1;
- }
- }
- /* Handle RTP packet loss scenario */
- osmux_replay_lost_packets(circuit, rtph, batch_factor);
-
- /* This batch is full, force batch delivery */
- if (osmux_batch_enqueue(msg, circuit, batch_factor) < 0)
- return 1;
-
-#ifdef DEBUG_MSG
- LOGP(DLMUX, LOGL_DEBUG, "adding msg with ssrc=%u to batch\n",
- rtph->ssrc);
-#endif
-
- /* Update remaining room in this batch */
- batch->remaining_bytes -= bytes;
-
- if (batch->nmsgs == 0) {
-#ifdef DEBUG_MSG
- LOGP(DLMUX, LOGL_DEBUG, "osmux start timer batch\n");
-#endif
- osmo_timer_schedule(&batch->timer, 0,
- batch_factor * DELTA_RTP_MSG);
- }
- batch->nmsgs++;
-
- return 0;
-}
-
-/**
- * osmux_xfrm_input - add RTP message to OSmux batch
- * \param msg: RTP message that you want to batch into one OSmux message
- *
- * If 0 is returned, this indicates that the message has been batched or that
- * an error occured and we have skipped the message. If 1 is returned, you
- * have to invoke osmux_xfrm_input_deliver and try again.
- *
- * The function takes care of releasing the messages in case of error and
- * when building the batch.
- */
-int osmux_xfrm_input(struct osmux_in_handle *h, struct msgb *msg, int ccid)
-{
- int ret;
- struct rtp_hdr *rtph;
- struct osmux_batch *batch = (struct osmux_batch *)h->internal_data;
-
- /* Ignore too big RTP/RTCP messages, most likely forged. Sanity check
- * to avoid a possible forever loop in the caller.
- */
- if (msg->len > h->batch_size - sizeof(struct osmux_hdr)) {
- msgb_free(msg);
- return 0;
- }
-
- rtph = osmo_rtp_get_hdr(msg);
- if (rtph == NULL) {
- msgb_free(msg);
- return 0;
- }
-
- switch(rtph->payload_type) {
- case RTP_PT_RTCP:
- msgb_free(msg);
- return 0;
- default:
- /* The RTP payload type is dynamically allocated,
- * although we use static ones. Assume that we always
- * receive AMR traffic.
- */
-
- /* Add this RTP to the OSMUX batch */
- ret = osmux_batch_add(batch, h->batch_factor,
- msg, rtph, ccid);
- if (ret < 0) {
- /* Cannot put this message into the batch.
- * Malformed, duplicated, OOM. Drop it and tell
- * the upper layer that we have digest it.
- */
- msgb_free(msg);
- return 0;
- }
-
- h->stats.input_rtp_msgs++;
- h->stats.input_rtp_bytes += msg->len;
- break;
- }
- return ret;
-}
-
-void osmux_xfrm_input_init(struct osmux_in_handle *h)
-{
- struct osmux_batch *batch;
-
- /* Default to osmux packet size if not specified */
- if (h->batch_size == 0)
- h->batch_size = OSMUX_BATCH_DEFAULT_MAX;
-
- batch = talloc_zero(osmux_ctx, struct osmux_batch);
- if (batch == NULL)
- return;
-
- INIT_LLIST_HEAD(&batch->circuit_list);
- batch->remaining_bytes = h->batch_size;
- osmo_timer_setup(&batch->timer, osmux_batch_timer_expired, h);
-
- h->internal_data = (void *)batch;
-
- LOGP(DLMUX, LOGL_DEBUG, "initialized osmux input converter\n");
-}
-
-/*! \brief Set transmission callback to call when a generated RTP packet is to be transmitted
- * \param[in] h the osmux out handle handling a specific CID
- * \param[in] osmuxh Buffer pointing to osmux frame header structure and AMR payload
- * \return Number of generated RTP packets
- *
- * This Function sets the callback called by the interal timer set by
- * osmux_xfrm_out_sched function.
- */
-void osmux_xfrm_output_set_tx_cb(struct osmux_out_handle *h,
- void (*tx_cb)(struct msgb *msg, void *data),
- void *data)
-{
- h->tx_cb = tx_cb;
- h->data = data;
-}
-
-int osmux_xfrm_input_open_circuit(struct osmux_in_handle *h, int ccid,
- int dummy)
-{
- struct osmux_batch *batch = (struct osmux_batch *)h->internal_data;
-
- return osmux_batch_add_circuit(batch, ccid, dummy, h->batch_factor) ? 0 : -1;
-}
-
-void osmux_xfrm_input_close_circuit(struct osmux_in_handle *h, int ccid)
-{
- struct osmux_batch *batch = (struct osmux_batch *)h->internal_data;
- struct osmux_circuit *circuit;
-
- circuit = osmux_batch_find_circuit(batch, ccid);
- if (circuit == NULL)
- return;
-
- osmux_batch_del_circuit(batch, circuit);
-}
-
-void osmux_xfrm_input_fini(struct osmux_in_handle *h)
-{
- struct osmux_batch *batch = (struct osmux_batch *)h->internal_data;
- struct osmux_circuit *circuit, *next;
-
- llist_for_each_entry_safe(circuit, next, &batch->circuit_list, head)
- osmux_batch_del_circuit(batch, circuit);
-
- osmo_timer_del(&batch->timer);
- talloc_free(batch);
-}
-
-struct osmux_tx_handle {
- struct osmo_timer_list timer;
- struct msgb *msg;
- void (*tx_cb)(struct msgb *msg, void *data);
- void *data;
-#ifdef DEBUG_TIMING
- struct timeval start;
- struct timeval when;
-#endif
-};
-
-static void osmux_tx_cb(void *data)
-{
- struct osmux_tx_handle *h = data;
-#ifdef DEBUG_TIMING
- struct timeval now, diff;
-
- osmo_gettimeofday(&now, NULL);
- timersub(&now, &h->start, &diff);
- timersub(&diff,&h->when, &diff);
- LOGP(DLMUX, LOGL_DEBUG, "we are lagging %lu.%.6lu in scheduled "
- "transmissions\n", diff.tv_sec, diff.tv_usec);
-#endif
-
- h->tx_cb(h->msg, h->data);
-
- talloc_free(h);
-}
-
-static void
-osmux_tx(struct msgb *msg, struct timeval *when,
- void (*tx_cb)(struct msgb *msg, void *data), void *data)
-{
- struct osmux_tx_handle *h;
-
- h = talloc_zero(osmux_ctx, struct osmux_tx_handle);
- if (h == NULL)
- return;
-
- h->msg = msg;
- h->tx_cb = tx_cb;
- h->data = data;
- osmo_timer_setup(&h->timer, osmux_tx_cb, h);
-
-#ifdef DEBUG_TIMING
- osmo_gettimeofday(&h->start, NULL);
- h->when.tv_sec = when->tv_sec;
- h->when.tv_usec = when->tv_usec;
-#endif
- /* send it now */
- if (when->tv_sec == 0 && when->tv_usec == 0) {
- osmux_tx_cb(h);
- return;
- }
- osmo_timer_schedule(&h->timer, when->tv_sec, when->tv_usec);
-}
-
-void
-osmux_tx_sched(struct llist_head *list,
- void (*tx_cb)(struct msgb *msg, void *data), void *data)
-{
- struct msgb *cur, *tmp;
- struct timeval delta = { .tv_sec = 0, .tv_usec = DELTA_RTP_MSG };
- struct timeval when;
-
- timerclear(&when);
-
- llist_for_each_entry_safe(cur, tmp, list, list) {
-
-#ifdef DEBUG_MSG
- LOGP(DLMUX, LOGL_DEBUG, "scheduled transmision in %lu.%6lu "
- "seconds, msg=%p\n", when.tv_sec, when.tv_usec, cur);
-#endif
- llist_del(&cur->list);
- osmux_tx(cur, &when, tx_cb, data);
- timeradd(&when, &delta, &when);
- }
-}
-
-void osmux_xfrm_output_init2(struct osmux_out_handle *h, uint32_t rtp_ssrc, uint8_t rtp_payload_type)
-{
- memset(h, 0, sizeof(*h));
- h->rtp_seq = (uint16_t)random();
- h->rtp_timestamp = (uint32_t)random();
- h->rtp_ssrc = rtp_ssrc;
- h->rtp_payload_type = rtp_payload_type;
- INIT_LLIST_HEAD(&h->list);
- osmo_timer_setup(&h->timer, osmux_xfrm_output_trigger, h);
-}
-
-void osmux_xfrm_output_init(struct osmux_out_handle *h, uint32_t rtp_ssrc)
-{
- /* backward compatibility with old users, where 98 was harcoded in osmux_rebuild_rtp() */
- osmux_xfrm_output_init2(h, rtp_ssrc, 98);
-}
-
-#define SNPRINTF_BUFFER_SIZE(ret, remain, offset) \
- if (ret < 0) \
- ret = 0; \
- offset += ret; \
- if (ret > remain) \
- ret = remain; \
- remain -= ret;
-
static int osmux_snprintf_header(char *buf, size_t size, struct osmux_hdr *osmuxh)
{
unsigned int remain = size, offset = 0;
int ret;
ret = snprintf(buf, remain, "OSMUX seq=%03u ccid=%03u "
- "ft=%01u ctr=%01u "
+ "ft=%01u rtp_m=%01u ctr=%01u "
"amr_f=%01u amr_q=%01u "
- "amr_ft=%02u amr_cmr=%02u ",
+ "amr_ft=%02u amr_cmr=%02u",
osmuxh->seq, osmuxh->circuit_id,
- osmuxh->ft, osmuxh->ctr,
+ osmuxh->ft, osmuxh->rtp_m, osmuxh->ctr,
osmuxh->amr_f, osmuxh->amr_q,
osmuxh->amr_ft, osmuxh->amr_cmr);
SNPRINTF_BUFFER_SIZE(ret, remain, offset);
@@ -1093,7 +126,10 @@ int osmux_snprintf(char *buf, size_t size, struct msgb *msg)
return -1;
}
osmuxh = (struct osmux_hdr *)((uint8_t *)msg->data + msg_off);
-
+ if (msg_off) {
+ ret = snprintf(buf + offset, remain, ", ");
+ SNPRINTF_BUFFER_SIZE(ret, remain, offset);
+ }
ret = osmux_snprintf_header(buf + offset, remain, osmuxh);
SNPRINTF_BUFFER_SIZE(ret, remain, offset);
@@ -1123,6 +159,8 @@ int osmux_snprintf(char *buf, size_t size, struct msgb *msg)
}
if (osmuxh->ft == OSMUX_FT_VOICE_AMR) {
+ ret = snprintf(buf + offset, remain, " ");
+ SNPRINTF_BUFFER_SIZE(ret, remain, offset);
ret = osmux_snprintf_payload(buf + offset, remain,
osmux_get_payload(osmuxh),
payload_len);
diff --git a/src/osmux_input.c b/src/osmux_input.c
new file mode 100644
index 0000000..2184a08
--- /dev/null
+++ b/src/osmux_input.c
@@ -0,0 +1,857 @@
+/*
+ * (C) 2012-2017 by Pablo Neira Ayuso <pablo@gnumonks.org>
+ * (C) 2012 by On Waves ehf <http://www.on-waves.com>
+ * (C) 2015-2022 by sysmocom - s.f.m.c. GmbH
+ *
+ * SPDX-License-Identifier: GPL-2.0+
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <inttypes.h>
+
+#include <osmocom/core/msgb.h>
+#include <osmocom/core/timer.h>
+#include <osmocom/core/timer_compat.h>
+#include <osmocom/core/select.h>
+#include <osmocom/core/talloc.h>
+#include <osmocom/core/logging.h>
+
+#include <osmocom/netif/amr.h>
+#include <osmocom/netif/rtp.h>
+#include <osmocom/netif/osmux.h>
+
+#include <arpa/inet.h>
+
+/* This allows you to debug osmux message transformations (spamming) */
+#if 0
+#define DEBUG_MSG 0
+#endif
+
+/* delta time between two RTP messages (in microseconds) */
+#define DELTA_RTP_MSG 20000
+/* delta time between two RTP messages (in samples, 8kHz) */
+#define DELTA_RTP_TIMESTAMP 160
+
+#define LOGMUXLK_(link, lvl, fmt, args ...) \
+ LOGP(DLMUX, lvl, "[%s,%u/%" PRIu16 "]" fmt, \
+ (link)->name, (link)->h->batch_size - (link)->remaining_bytes, \
+ (link)->h->batch_size, \
+ ## args)
+
+#define LOGMUXLK(link, lvl, fmt, args ...) \
+ LOGMUXLK_(link, lvl, " " fmt, ## args)
+
+#define LOGMUXCID(link, circuit, lvl, fmt, args ...) \
+ LOGMUXLK_(link, lvl, "[CID=%" PRIu8 ",batched=%u/%u] " fmt, \
+ (circuit)->ccid, (circuit)->nmsgs, (link)->h->batch_factor, ## args)
+
+/*! \addtogroup osmux Osmocom Multiplex Protocol
+ * @{
+ *
+ * This code implements a variety of utility functions related to the
+ * OSMUX user-plane multiplexing protocol, an efficient alternative to
+ * plain UDP/RTP streams for voice transport in back-haul of cellular
+ * networks.
+ *
+ * For information about the OSMUX protocol design, please see the
+ * OSMUX reference manual at
+ * http://ftp.osmocom.org/docs/latest/osmux-reference.pdf
+ */
+
+/*! \file osmux_input.c
+ * \brief Osmocom multiplex protocol helpers (input)
+ */
+
+static void *osmux_ctx;
+
+static uint32_t osmux_ft_dummy_size(uint8_t amr_ft, uint8_t batch_factor)
+{
+ return sizeof(struct osmux_hdr) + (osmo_amr_bytes(amr_ft) * batch_factor);
+}
+
+/* This is (struct osmux_in_handle)->internal_data.
+ * TODO: API have been defined to access all fields of osmux_in_handle
+ * (deprecated osmux_xfrm_input_init()), hence at some point we remove struct
+ * osmux_in_handle definition from osmux.h and we move it here internally and
+ * merge it with struct osmux_link.
+ */
+struct osmux_link {
+ struct osmo_timer_list timer;
+ struct osmux_hdr *osmuxh;
+ struct llist_head circuit_list;
+ unsigned int remaining_bytes;
+ uint32_t nmsgs;
+ int ndummy;
+ char *name;
+ struct osmux_in_handle *h; /* backpointer to parent object */
+};
+
+struct osmux_circuit {
+ struct llist_head head;
+ int ccid;
+ struct llist_head msg_list;
+ int nmsgs;
+ int dummy;
+ uint8_t seq;
+ int32_t last_transmitted_rtp_seq; /* -1 = unset */
+ uint32_t last_transmitted_rtp_ts; /* Check last_transmitted_rtp_seq = -1 to detect unset */
+};
+
+/* Used internally to temporarily cache all parsed content of an RTP pkt from user to be transmitted as Osmux */
+struct osmux_in_req {
+ struct osmux_circuit *circuit;
+ struct msgb *msg;
+ struct rtp_hdr *rtph;
+ uint32_t rtp_payload_len;
+ struct amr_hdr *amrh;
+ int amr_payload_len;
+};
+
+/* returns: 1 if batch is full, 0 if batch still not full, negative on error. */
+static int osmux_circuit_enqueue(struct osmux_link *link, struct osmux_circuit *circuit, struct msgb *msg)
+{
+ /* Validate amount of messages per batch. The counter field of the
+ * osmux header is just 3 bits long, so make sure it doesn't overflow.
+ */
+ OSMO_ASSERT(link->h->batch_factor <= 8);
+ if (circuit->nmsgs >= link->h->batch_factor) {
+ struct rtp_hdr *rtph;
+
+ rtph = osmo_rtp_get_hdr(msg);
+ if (rtph == NULL)
+ return -1;
+
+ LOGMUXCID(link, circuit, LOGL_DEBUG, "Batch is full for RTP sssrc=%u\n", rtph->ssrc);
+ return 1;
+ }
+
+ llist_add_tail(&msg->list, &circuit->msg_list);
+ circuit->nmsgs++;
+ return 0;
+}
+
+static void osmux_circuit_dequeue(struct osmux_circuit *circuit, struct msgb *msg)
+{
+ llist_del(&msg->list);
+ circuit->nmsgs--;
+}
+
+static void osmux_circuit_del_msgs(struct osmux_link *link, struct osmux_circuit *circuit)
+{
+ struct msgb *cur, *tmp;
+ llist_for_each_entry_safe(cur, tmp, &circuit->msg_list, list) {
+ osmux_circuit_dequeue(circuit, cur);
+ msgb_free(cur);
+ link->nmsgs--;
+ }
+}
+
+struct osmux_input_state {
+ struct msgb *out_msg;
+ struct msgb *msg;
+ struct rtp_hdr *rtph;
+ struct amr_hdr *amrh;
+ uint32_t amr_payload_len;
+ struct osmux_circuit *circuit;
+ int add_osmux_hdr;
+};
+
+static int osmux_link_put(struct osmux_link *link, struct osmux_input_state *state)
+{
+ uint16_t rtp_seqnum = ntohs(state->rtph->sequence);
+
+ if (state->add_osmux_hdr) {
+ bool seq_jump = state->circuit->last_transmitted_rtp_seq != -1 &&
+ ((state->circuit->last_transmitted_rtp_seq + 1) & 0xffff) != rtp_seqnum;
+ struct osmux_hdr *osmuxh;
+ osmuxh = (struct osmux_hdr *)msgb_put(state->out_msg,
+ sizeof(struct osmux_hdr));
+ osmuxh->ft = OSMUX_FT_VOICE_AMR;
+ osmuxh->ctr = 0;
+ osmuxh->rtp_m = state->rtph->marker || seq_jump;
+ osmuxh->seq = state->circuit->seq++;
+ osmuxh->circuit_id = state->circuit->ccid;
+ osmuxh->amr_ft = state->amrh->ft;
+
+ /* annotate current osmux header */
+ link->osmuxh = osmuxh;
+ } else {
+ if (link->osmuxh->ctr == 0x7) {
+ LOGMUXCID(link, state->circuit, LOGL_ERROR,
+ "Cannot encode RTP pkt ssrc=%u into osmux batch, too many packets\n",
+ state->rtph->ssrc);
+ return 0;
+ }
+ link->osmuxh->ctr++;
+ }
+ /* For fields below, we only use the last included in batch and ignore any previous: */
+ link->osmuxh->amr_cmr = state->amrh->cmr;
+ link->osmuxh->amr_f = state->amrh->f;
+ link->osmuxh->amr_q = state->amrh->q;
+
+ if (state->amr_payload_len > 0) {
+ memcpy(state->out_msg->tail, osmo_amr_get_payload(state->amrh),
+ state->amr_payload_len);
+ msgb_put(state->out_msg, state->amr_payload_len);
+ }
+
+ /* Update circuit state of last transmitted incoming RTP seqnum/ts: */
+ state->circuit->last_transmitted_rtp_seq = rtp_seqnum;
+ state->circuit->last_transmitted_rtp_ts = ntohl(state->rtph->timestamp);
+ return 0;
+}
+
+static void osmux_encode_dummy(struct osmux_link *link, struct osmux_input_state *state)
+{
+ struct osmux_hdr *osmuxh;
+ /* TODO: This should be configurable at some point. */
+ uint32_t payload_size = osmux_ft_dummy_size(AMR_FT_3, link->h->batch_factor) -
+ sizeof(struct osmux_hdr);
+
+ osmuxh = (struct osmux_hdr *)state->out_msg->tail;
+ osmuxh->ft = OSMUX_FT_DUMMY;
+ osmuxh->ctr = link->h->batch_factor - 1;
+ osmuxh->amr_f = 0;
+ osmuxh->amr_q = 0;
+ osmuxh->seq = 0;
+ osmuxh->circuit_id = state->circuit->ccid;
+ osmuxh->amr_cmr = 0;
+ osmuxh->amr_ft = AMR_FT_3;
+ msgb_put(state->out_msg, sizeof(struct osmux_hdr));
+
+ memset(state->out_msg->tail, 0xff, payload_size);
+ msgb_put(state->out_msg, payload_size);
+}
+
+static struct msgb *osmux_build_batch(struct osmux_link *link)
+{
+ struct msgb *batch_msg;
+ struct osmux_circuit *circuit;
+
+#ifdef DEBUG_MSG
+ LOGMUXLK(link, LOGL_DEBUG, "Now building batch\n");
+#endif
+
+ batch_msg = msgb_alloc(link->h->batch_size, "osmux");
+ if (batch_msg == NULL) {
+ LOGMUXLK(link, LOGL_ERROR, "Not enough memory\n");
+ return NULL;
+ }
+
+ llist_for_each_entry(circuit, &link->circuit_list, head) {
+ struct msgb *cur, *tmp;
+ int ctr = 0;
+ int prev_amr_ft;
+
+ if (circuit->dummy) {
+ struct osmux_input_state state = {
+ .out_msg = batch_msg,
+ .circuit = circuit,
+ };
+ osmux_encode_dummy(link, &state);
+ continue;
+ }
+
+ llist_for_each_entry_safe(cur, tmp, &circuit->msg_list, list) {
+ struct osmux_input_state state = {
+ .msg = cur,
+ .out_msg = batch_msg,
+ .circuit = circuit,
+ };
+ uint32_t amr_len;
+#ifdef DEBUG_MSG
+ char buf[4096];
+
+ osmo_rtp_snprintf(buf, sizeof(buf), cur);
+ buf[sizeof(buf)-1] = '\0';
+ LOGMUXCID(link, circuit, LOGL_DEBUG, "to BSC-NAT: %s\n", buf);
+#endif
+
+ state.rtph = osmo_rtp_get_hdr(cur);
+ if (!state.rtph)
+ return NULL;
+ state.amrh = osmo_rtp_get_payload(state.rtph, state.msg, &amr_len);
+ if (!state.amrh)
+ return NULL;
+ state.amr_payload_len = amr_len - sizeof(struct amr_hdr);
+
+ if (ctr == 0) {
+#ifdef DEBUG_MSG
+ LOGMUXCID(link, circuit, LOGL_DEBUG, "Add osmux header (First in batch)\n");
+#endif
+ state.add_osmux_hdr = 1;
+ } else if (prev_amr_ft != state.amrh->ft) {
+ /* If AMR FT changed, we have to generate an extra batch osmux header: */
+#ifdef DEBUG_MSG
+ LOGMUXCID(link, circuit, LOGL_DEBUG, "Add osmux header (New AMR FT)\n");
+#endif
+ state.add_osmux_hdr = 1;
+ }
+
+ osmux_link_put(link, &state);
+ osmux_circuit_dequeue(circuit, cur);
+ prev_amr_ft = state.amrh->ft;
+ ctr++;
+ msgb_free(cur);
+ link->nmsgs--;
+ }
+ }
+ return batch_msg;
+}
+
+void osmux_xfrm_input_deliver(struct osmux_in_handle *h)
+{
+ struct msgb *batch_msg;
+ struct osmux_link *link = (struct osmux_link *)h->internal_data;
+
+#ifdef DEBUG_MSG
+ LOGMUXLK(link, LOGL_DEBUG, "Invoking delivery function\n");
+#endif
+ batch_msg = osmux_build_batch(link);
+ if (!batch_msg)
+ return;
+ h->stats.output_osmux_msgs++;
+ h->stats.output_osmux_bytes += batch_msg->len;
+
+ h->deliver(batch_msg, h->data);
+ osmo_timer_del(&link->timer);
+ link->remaining_bytes = h->batch_size;
+
+ if (link->ndummy)
+ osmo_timer_schedule(&link->timer, 0, h->batch_factor * DELTA_RTP_MSG);
+}
+
+static void osmux_link_timer_expired(void *data)
+{
+ struct osmux_in_handle *h = data;
+
+#ifdef DEBUG_MSG
+ const struct osmux_link *link = (struct osmux_link *)h->internal_data;
+ LOGMUXLK(link, LOGL_DEBUG, "Batch delivery timer timeout\n");
+#endif
+ osmux_xfrm_input_deliver(h);
+}
+
+static int osmux_rtp_amr_payload_len(struct amr_hdr *amrh, uint32_t amr_len)
+{
+ int amr_payload_len;
+
+ if (!osmo_amr_ft_valid(amrh->ft))
+ return -1;
+
+ amr_payload_len = amr_len - sizeof(struct amr_hdr);
+
+ /* The AMR payload does not fit with what we expect */
+ if (osmo_amr_bytes(amrh->ft) != amr_payload_len) {
+ LOGP(DLMUX, LOGL_ERROR,
+ "Bad AMR frame FT=%u, expected %zd bytes, got %d bytes\n",
+ amrh->ft, osmo_amr_bytes(amrh->ft), amr_len);
+ return -1;
+ }
+ return amr_payload_len;
+}
+
+/* Last stored AMR FT to be added in the current osmux batch. -1 if none stored yet */
+static int osmux_circuit_get_last_stored_amr_ft(struct osmux_circuit *circuit)
+{
+ struct msgb *last;
+ struct rtp_hdr *rtph;
+ struct amr_hdr *amrh;
+ uint32_t amr_len;
+ /* Have we seen any RTP packet in this batch before? */
+ if (llist_empty(&circuit->msg_list))
+ return -1;
+ OSMO_ASSERT(circuit->nmsgs > 0);
+
+ /* Get last RTP packet seen in this batch */
+ last = llist_entry(circuit->msg_list.prev, struct msgb, list);
+ rtph = osmo_rtp_get_hdr(last);
+ amrh = osmo_rtp_get_payload(rtph, last, &amr_len);
+ if (amrh == NULL)
+ return -1;
+ return amrh->ft;
+
+}
+
+static struct osmux_circuit *
+osmux_link_find_circuit(struct osmux_link *link, int ccid)
+{
+ struct osmux_circuit *circuit;
+
+ llist_for_each_entry(circuit, &link->circuit_list, head) {
+ if (circuit->ccid == ccid)
+ return circuit;
+ }
+ return NULL;
+}
+
+static void osmux_link_del_circuit(struct osmux_link *link, struct osmux_circuit *circuit)
+{
+ if (circuit->dummy)
+ link->ndummy--;
+ llist_del(&circuit->head);
+ osmux_circuit_del_msgs(link, circuit);
+ talloc_free(circuit);
+}
+
+/* returns: 1 if batch is full, 0 if batch still not full, negative on error. */
+static int osmux_link_add(struct osmux_link *link, const struct osmux_in_req *req)
+{
+ unsigned int needed_bytes = 0;
+ int rc;
+ /* Init of talkspurt (RTP M marker bit) needs to be in the first AMR slot
+ * of the OSMUX packet, enforce sending previous batch if required:
+ */
+ if (req->rtph->marker && req->circuit->nmsgs != 0)
+ return 1;
+
+ /* First check if there is room for this message in the batch */
+ /* First in batch comes after the batch header: */
+ if (req->circuit->nmsgs == 0)
+ needed_bytes += sizeof(struct osmux_hdr);
+ /* If AMR FT changes in the middle of current batch a new header is
+ * required to adapt to size change: */
+ else if (osmux_circuit_get_last_stored_amr_ft(req->circuit) != req->amrh->ft)
+ needed_bytes += sizeof(struct osmux_hdr);
+ needed_bytes += req->amr_payload_len;
+
+ /* No room, sorry. You'll have to retry */
+ if (needed_bytes > link->remaining_bytes)
+ return 1;
+
+ /* This batch is full, force batch delivery */
+ rc = osmux_circuit_enqueue(link, req->circuit, req->msg);
+ if (rc != 0)
+ return rc;
+
+#ifdef DEBUG_MSG
+ LOGMUXCID(link, req->circuit, LOGL_DEBUG, "Adding msg with ssrc=%u to batch\n",
+ req->rtph->ssrc);
+#endif
+
+ /* Update remaining room in this batch */
+ link->remaining_bytes -= needed_bytes;
+
+ if (link->nmsgs == 0) {
+#ifdef DEBUG_MSG
+ LOGMUXLK(link, LOGL_DEBUG, "Osmux start batch delivery timer\n");
+#endif
+ osmo_timer_schedule(&link->timer, 0,
+ link->h->batch_factor * DELTA_RTP_MSG);
+ }
+ link->nmsgs++;
+
+ return 0;
+};
+
+/* returns: 1 if batch is full, 0 if batch still not full, negative on error. */
+static int osmux_replay_lost_packets(struct osmux_link *link, const struct osmux_in_req *req)
+{
+ int16_t diff;
+ uint16_t lost_pkts;
+ struct msgb *copy_from;
+ uint16_t last_seq, cur_seq;
+ uint32_t last_ts;
+ int i, rc;
+ struct osmux_in_req clone_req;
+
+ /* If M bit is set, this is a sync point, so any sort of seq jump is expected and has no real meaning. */
+ if (req->rtph->marker)
+ return 0;
+
+ /* Have we seen any RTP packet in this batch before? */
+ if (llist_empty(&req->circuit->msg_list)) {
+ /* Since current batch is empty, it can be assumed:
+ * 1- circuit->last_transmitted_rtp_seq is either unset or really contains the last RTP enqueued
+ * 2- This pkt will generate an osmuxhdr and hence there's no
+ * restriction on the FT, as opposite to the case where the batch
+ * is half full
+ * Conclusion:
+ * 1- It is fine using circuit->last_transmitted_rtp_seq as last enqueued RTP header to detect seqnum jumps.
+ * 2- It is fine filling holes at the start of the batch by using current req->rtph.
+ */
+ if (req->circuit->last_transmitted_rtp_seq == -1)
+ return 0; /* first message in circuit, do nothing */
+ copy_from = req->msg;
+ last_seq = req->circuit->last_transmitted_rtp_seq;
+ last_ts = req->circuit->last_transmitted_rtp_ts;
+ } else {
+ /* Get last RTP packet seen in this batch, so that we simply keep filling with same osmuxhdr */
+ struct rtp_hdr *last_rtph;
+ copy_from = llist_entry(req->circuit->msg_list.prev, struct msgb, list);
+ last_rtph = osmo_rtp_get_hdr(copy_from);
+ if (last_rtph == NULL)
+ return -1;
+ last_seq = ntohs(last_rtph->sequence);
+ last_ts = ntohl(last_rtph->timestamp);
+ }
+ cur_seq = ntohs(req->rtph->sequence);
+ diff = cur_seq - last_seq;
+
+ /* If diff between last RTP packet seen and this one is > 1,
+ * then we lost several RTP packets, let's replay them.
+ */
+ if (diff <= 1)
+ return 0;
+ lost_pkts = diff - 1;
+
+ LOGMUXCID(link, req->circuit, LOGL_INFO,
+ "RTP seq jump detected: %" PRIu16 " -> %" PRIu16 " (%" PRId16
+ " lost packets)\n",
+ last_seq, cur_seq, lost_pkts);
+
+ /* We know we can feed only up to batch_factor before osmux_link_add()
+ * returning 1 signalling "transmission needed, call deliver() and retry".
+ * Hence, it doesn't make sense to even attempt recreating a big number of
+ * RTP packets (>batch_factor).
+ */
+ if (lost_pkts > link->h->batch_factor - req->circuit->nmsgs) {
+ if (llist_empty(&req->circuit->msg_list)) {
+ /* If we are starting a batch, it doesn't make sense to keep filling entire
+ * batches with lost packets, since it could potentially end up in a loop if
+ * the lost_pkts value is huge. Instead, avoid recreating packets and let the
+ * osmux encoder set an M bit on the osmuxhdr when acting upon current req->rtph.
+ */
+ return 0;
+ }
+ lost_pkts = link->h->batch_factor - req->circuit->nmsgs;
+ }
+
+ rc = 0;
+ clone_req = *req;
+ for (i = 0; i < lost_pkts; i++) {
+ /* Clone last (or new if last not available) RTP packet seen */
+ clone_req.msg = msgb_copy(copy_from, "RTP clone");
+ if (!clone_req.msg)
+ continue;
+
+ /* The original RTP message has been already sanity checked. */
+ clone_req.rtph = osmo_rtp_get_hdr(clone_req.msg);
+ clone_req.amrh = osmo_rtp_get_payload(clone_req.rtph, clone_req.msg, &clone_req.rtp_payload_len);
+ clone_req.amr_payload_len = osmux_rtp_amr_payload_len(clone_req.amrh, clone_req.rtp_payload_len);
+
+ /* Faking a follow up RTP pkt here, so no Marker bit: */
+ clone_req.rtph->marker = false;
+ /* Adjust sequence number and timestamp */
+ clone_req.rtph->sequence = htons(last_seq + 1 + i);
+ clone_req.rtph->timestamp = last_ts + ((1 + i) * DELTA_RTP_TIMESTAMP);
+ rc = osmux_link_add(link, &clone_req);
+ /* No more room in this batch, skip padding with more clones */
+ if (rc != 0) {
+ msgb_free(clone_req.msg);
+ return rc;
+ }
+ }
+ return rc;
+}
+
+/* returns: 1 if batch is full, 0 if batch still not full, negative on error. */
+static int osmux_link_handle_rtp_req(struct osmux_link *link, struct osmux_in_req *req)
+{
+ struct msgb *cur, *next;
+ int rc;
+
+ /* We've seen the first RTP message, disable dummy padding */
+ if (req->circuit->dummy) {
+ req->circuit->dummy = 0;
+ link->ndummy--;
+ }
+
+ /* Extra validation: check if this message already exists, should not
+ * happen but make sure we don't propagate duplicated messages.
+ */
+ llist_for_each_entry_safe(cur, next, &req->circuit->msg_list, list) {
+ struct rtp_hdr *rtph2 = osmo_rtp_get_hdr(cur);
+ OSMO_ASSERT(rtph2);
+
+ /* Already exists message with this sequence. Let's copy over
+ * the new RTP, since there's the chance that the existing one may
+ * be a forged copy we did when we detected a hole. */
+ if (rtph2->sequence == req->rtph->sequence) {
+ if (msgb_length(cur) != msgb_length(req->msg)) {
+ /* Different packet size, AMR FT may have changed. Let's avoid changing it to
+ * break accounted size to be written (would need new osmux_hdr, etc.) */
+ LOGMUXCID(link, req->circuit, LOGL_NOTICE,
+ "RTP pkt with seq=%u and different len %u != %u already exists, skip it\n",
+ ntohs(req->rtph->sequence), msgb_length(cur), msgb_length(req->msg));
+ return -1;
+ }
+ LOGMUXCID(link, req->circuit, LOGL_INFO,
+ "RTP pkt with seq=%u already exists, replace it\n",
+ ntohs(req->rtph->sequence));
+ __llist_add(&req->msg->list, &cur->list, cur->list.next);
+ llist_del(&cur->list);
+ msgb_free(cur);
+ return 0;
+ }
+ }
+
+ /* Handle RTP packet loss scenario */
+ rc = osmux_replay_lost_packets(link, req);
+ if (rc != 0)
+ return rc;
+
+ return osmux_link_add(link, req);
+}
+
+/**
+ * osmux_xfrm_input - add RTP message to OSmux batch
+ * \param msg: RTP message that you want to batch into one OSmux message
+ *
+ * If 0 is returned, this indicates that the message has been batched and the
+ * msgb is now owned by the osmux layer.
+ * If negative value is returned, an error occurred and the message has been
+ * dropped (and freed).
+ * If 1 is returned, you have to invoke osmux_xfrm_input_deliver and try again.
+ *
+ * The function takes care of releasing the messages in case of error and
+ * when building the batch.
+ */
+int osmux_xfrm_input(struct osmux_in_handle *h, struct msgb *msg, int ccid)
+{
+ int ret;
+ struct osmux_link *link = (struct osmux_link *)h->internal_data;
+ struct osmux_in_req req = {
+ .msg = msg,
+ .rtph = osmo_rtp_get_hdr(msg),
+ .circuit = osmux_link_find_circuit(link, ccid),
+ };
+
+ if (!req.circuit) {
+ LOGMUXLK(link, LOGL_INFO, "Couldn't find circuit CID=%u\n", ccid);
+ goto err_free;
+ }
+
+ if (!req.rtph) {
+ LOGMUXCID(link, req.circuit, LOGL_NOTICE, "msg not containing an RTP header\n");
+ goto err_free;
+ }
+
+ /* Ignore too big RTP/RTCP messages, most likely forged. Sanity check
+ * to avoid a possible forever loop in the caller.
+ */
+ if (msg->len > h->batch_size - sizeof(struct osmux_hdr)) {
+ LOGMUXCID(link, req.circuit, LOGL_NOTICE,
+ "RTP payload too big (%u) for configured batch size (%u)\n",
+ msg->len, h->batch_size);
+ goto err_free;
+ }
+
+ switch (req.rtph->payload_type) {
+ case RTP_PT_RTCP:
+ LOGMUXCID(link, req.circuit, LOGL_INFO, "Dropping RTCP packet\n");
+ msgb_free(msg);
+ return 0;
+ default:
+ /* The RTP payload type is dynamically allocated,
+ * although we use static ones. Assume that we always
+ * receive AMR traffic.
+ */
+ req.amrh = osmo_rtp_get_payload(req.rtph, req.msg, &req.rtp_payload_len);
+ if (req.amrh == NULL)
+ goto err_free;
+ req.amr_payload_len = osmux_rtp_amr_payload_len(req.amrh, req.rtp_payload_len);
+ if (req.amr_payload_len < 0) {
+ LOGMUXCID(link, req.circuit, LOGL_NOTICE, "AMR payload invalid\n");
+ goto err_free;
+ }
+
+ /* Add this RTP to the OSMUX batch */
+ ret = osmux_link_handle_rtp_req(link, &req);
+ if (ret < 0) {
+ /* Cannot put this message into the batch.
+ * Malformed, duplicated, OOM. Drop it and tell
+ * the upper layer that we have digest it.
+ */
+ LOGMUXCID(link, req.circuit, LOGL_DEBUG, "Dropping RTP packet instead of adding to batch\n");
+ goto err_free;
+ }
+
+ h->stats.input_rtp_msgs++;
+ h->stats.input_rtp_bytes += msg->len;
+ break;
+ }
+ return ret;
+
+err_free:
+ msgb_free(msg);
+ return -1;
+}
+
+static int osmux_xfrm_input_talloc_destructor(struct osmux_in_handle *h)
+{
+ struct osmux_link *link = (struct osmux_link *)h->internal_data;
+ struct osmux_circuit *circuit, *next;
+
+ llist_for_each_entry_safe(circuit, next, &link->circuit_list, head)
+ osmux_link_del_circuit(link, circuit);
+
+ osmo_timer_del(&link->timer);
+ talloc_free(link);
+ return 0;
+}
+
+static unsigned int next_default_name_idx = 0;
+/*! \brief Allocate a new osmux in handle (osmux source, tx side)
+ * \param[in] ctx talloc context to use when allocating the returned struct
+ * \return Allocated osmux in handle
+ *
+ * This object contains configuration and state to handle a group of circuits (trunk),
+ * receiving RTP packets from the upper layer (API user) and sending batched &
+ * trunked Osmux messages containing all the data of those circuits down the
+ * stack outgoing network Osmux messages.
+ * Returned pointer can be freed with regular talloc_free, all pending messages
+ * in queue and all internal data will be freed. */
+struct osmux_in_handle *osmux_xfrm_input_alloc(void *ctx)
+{
+ struct osmux_in_handle *h;
+ struct osmux_link *link;
+
+ h = talloc_zero(ctx, struct osmux_in_handle);
+ OSMO_ASSERT(h);
+
+ h->batch_size = OSMUX_BATCH_DEFAULT_MAX;
+
+ link = talloc_zero(h, struct osmux_link);
+ OSMO_ASSERT(link);
+ INIT_LLIST_HEAD(&link->circuit_list);
+ link->h = h;
+ link->remaining_bytes = h->batch_size;
+ link->name = talloc_asprintf(link, "input-%u", next_default_name_idx++);
+ osmo_timer_setup(&link->timer, osmux_link_timer_expired, h);
+
+ h->internal_data = (void *)link;
+
+ LOGMUXLK(link, LOGL_DEBUG, "Initialized osmux input converter\n");
+
+ talloc_set_destructor(h, osmux_xfrm_input_talloc_destructor);
+ return h;
+}
+
+/*! \deprecated: Use osmux_xfrm_input_alloc() instead */
+void osmux_xfrm_input_init(struct osmux_in_handle *h)
+{
+ struct osmux_link *link;
+
+ /* Default to osmux packet size if not specified */
+ if (h->batch_size == 0)
+ h->batch_size = OSMUX_BATCH_DEFAULT_MAX;
+
+ link = talloc_zero(osmux_ctx, struct osmux_link);
+ if (link == NULL)
+ return;
+ INIT_LLIST_HEAD(&link->circuit_list);
+ link->h = h;
+ link->remaining_bytes = h->batch_size;
+ link->name = talloc_asprintf(link, "%u", next_default_name_idx++);
+ osmo_timer_setup(&link->timer, osmux_link_timer_expired, h);
+
+ h->internal_data = (void *)link;
+
+ LOGMUXLK(link, LOGL_DEBUG, "Initialized osmux input converter\n");
+}
+
+int osmux_xfrm_input_set_batch_factor(struct osmux_in_handle *h, uint8_t batch_factor)
+{
+ if (batch_factor > 8)
+ return -1;
+ h->batch_factor = batch_factor;
+ return 0;
+}
+
+void osmux_xfrm_input_set_batch_size(struct osmux_in_handle *h, uint16_t batch_size)
+{
+ if (batch_size == 0)
+ h->batch_size = OSMUX_BATCH_DEFAULT_MAX;
+ else
+ h->batch_size = batch_size;
+}
+
+void osmux_xfrm_input_set_initial_seqnum(struct osmux_in_handle *h, uint8_t osmux_seqnum)
+{
+ h->osmux_seq = osmux_seqnum;
+}
+
+void osmux_xfrm_input_set_deliver_cb(struct osmux_in_handle *h,
+ void (*deliver_cb)(struct msgb *msg, void *data), void *data)
+{
+ h->deliver = deliver_cb;
+ h->data = data;
+}
+
+void *osmux_xfrm_input_get_deliver_cb_data(struct osmux_in_handle *h)
+{
+ return h->data;
+}
+
+void osmux_xfrm_input_set_name(struct osmux_in_handle *h, const char *name)
+{
+ struct osmux_link *link = (struct osmux_link *)h->internal_data;
+ osmo_talloc_replace_string(link, &link->name, name);
+}
+
+int osmux_xfrm_input_open_circuit(struct osmux_in_handle *h, int ccid,
+ int dummy)
+{
+ struct osmux_link *link = (struct osmux_link *)h->internal_data;
+ struct osmux_circuit *circuit;
+
+ circuit = osmux_link_find_circuit(link, ccid);
+ if (circuit != NULL) {
+ LOGMUXLK(link, LOGL_ERROR, "circuit %u already exists!\n", ccid);
+ return -1;
+ }
+
+ circuit = talloc_zero(osmux_ctx, struct osmux_circuit);
+ if (circuit == NULL) {
+ LOGMUXLK(link, LOGL_ERROR, "OOM on circuit %u\n", ccid);
+ return -1;
+ }
+
+ circuit->ccid = ccid;
+ circuit->seq = h->osmux_seq;
+ circuit->last_transmitted_rtp_seq = -1; /* field unset */
+ INIT_LLIST_HEAD(&circuit->msg_list);
+ llist_add_tail(&circuit->head, &link->circuit_list);
+
+ if (dummy) {
+ circuit->dummy = dummy;
+ link->ndummy++;
+ if (!osmo_timer_pending(&link->timer))
+ osmo_timer_schedule(&link->timer, 0,
+ h->batch_factor * DELTA_RTP_MSG);
+ }
+ LOGMUXCID(link, circuit, LOGL_INFO, "Circuit opened successfully\n");
+ return 0;
+}
+
+void osmux_xfrm_input_close_circuit(struct osmux_in_handle *h, int ccid)
+{
+ struct osmux_link *link = (struct osmux_link *)h->internal_data;
+ struct osmux_circuit *circuit;
+
+ circuit = osmux_link_find_circuit(link, ccid);
+ if (circuit == NULL) {
+ LOGMUXLK(link, LOGL_NOTICE, "Unable to close circuit %d: Not found\n",
+ ccid);
+ return;
+ }
+
+ LOGMUXCID(link, circuit, LOGL_INFO, "Closing circuit\n");
+
+ osmux_link_del_circuit(link, circuit);
+}
+
+/*! \deprecated: Use talloc_free() instead (will call osmux_xfrm_input_talloc_destructor()) */
+void osmux_xfrm_input_fini(struct osmux_in_handle *h)
+{
+ (void)osmux_xfrm_input_talloc_destructor(h);
+}
+
+/*! @} */
diff --git a/src/osmux_output.c b/src/osmux_output.c
new file mode 100644
index 0000000..664ed60
--- /dev/null
+++ b/src/osmux_output.c
@@ -0,0 +1,396 @@
+/*
+ * (C) 2012-2017 by Pablo Neira Ayuso <pablo@gnumonks.org>
+ * (C) 2012 by On Waves ehf <http://www.on-waves.com>
+ * (C) 2015-2022 by sysmocom - s.f.m.c. GmbH
+ *
+ * SPDX-License-Identifier: GPL-2.0+
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ */
+
+#include <stdio.h>
+#include <string.h>
+
+#include <osmocom/core/msgb.h>
+#include <osmocom/core/timer.h>
+#include <osmocom/core/timer_compat.h>
+#include <osmocom/core/select.h>
+#include <osmocom/core/talloc.h>
+#include <osmocom/core/logging.h>
+
+#include <osmocom/netif/amr.h>
+#include <osmocom/netif/rtp.h>
+#include <osmocom/netif/osmux.h>
+
+#include <arpa/inet.h>
+
+/* delta time between two RTP messages (in microseconds) */
+#define DELTA_RTP_MSG 20000
+/* delta time between two RTP messages (in samples, 8kHz) */
+#define DELTA_RTP_TIMESTAMP 160
+
+
+/*! \addtogroup osmux Osmocom Multiplex Protocol
+ * @{
+ *
+ * This code implements a variety of utility functions related to the
+ * OSMUX user-plane multiplexing protocol, an efficient alternative to
+ * plain UDP/RTP streams for voice transport in back-haul of cellular
+ * networks.
+ *
+ * For information about the OSMUX protocol design, please see the
+ * OSMUX reference manual at
+ * http://ftp.osmocom.org/docs/latest/osmux-reference.pdf
+ */
+
+/*! \file osmux_output.c
+ * \brief Osmocom multiplex protocol helpers (output)
+ */
+static uint32_t osmux_ft_dummy_size(uint8_t amr_ft, uint8_t batch_factor)
+{
+ return sizeof(struct osmux_hdr) + (osmo_amr_bytes(amr_ft) * batch_factor);
+}
+
+struct osmux_hdr *osmux_xfrm_output_pull(struct msgb *msg)
+{
+ struct osmux_hdr *osmuxh;
+ size_t len;
+
+next:
+ if (msgb_length(msg) == 0)
+ return NULL; /* base case, we drained the msg successfully, tell user it is done. */
+
+ if (msgb_length(msg) < sizeof(struct osmux_hdr)) {
+ LOGP(DLMUX, LOGL_ERROR, "remaining %d bytes, broken osmuxhdr?\n", msgb_length(msg));
+ return NULL;
+ }
+
+ osmuxh = (struct osmux_hdr *)msgb_data(msg);
+ switch (osmuxh->ft) {
+ case OSMUX_FT_VOICE_AMR:
+ if (!osmo_amr_ft_valid(osmuxh->amr_ft)) {
+ LOGP(DLMUX, LOGL_ERROR, "Discarding bad AMR FT %d\n", osmuxh->amr_ft);
+ return NULL;
+ }
+ len = osmo_amr_bytes(osmuxh->amr_ft) * (osmuxh->ctr + 1) + sizeof(struct osmux_hdr);
+ if (msgb_length(msg) < len) {
+ LOGP(DLMUX, LOGL_ERROR,
+ "Discarding malformed OSMUX message: %s\n",
+ osmo_hexdump(msgb_data(msg), msgb_length(msg)));
+ return NULL;
+ }
+ msgb_pull(msg, len);
+ return osmuxh;
+
+ case OSMUX_FT_DUMMY:
+ if (!osmo_amr_ft_valid(osmuxh->amr_ft)) {
+ LOGP(DLMUX, LOGL_ERROR, "Discarding bad Dummy FT: amr_ft=%u\n", osmuxh->amr_ft);
+ return NULL;
+ }
+ len = osmux_ft_dummy_size(osmuxh->amr_ft, osmuxh->ctr + 1);
+ if (msgb_length(msg) < len) {
+ LOGP(DLMUX, LOGL_ERROR, "Discarding bad Dummy FT: %s\n",
+ osmo_hexdump(msgb_data(msg), msgb_length(msg)));
+ return NULL;
+ }
+ msgb_pull(msg, len);
+ goto next;
+
+ default:
+ LOGP(DLMUX, LOGL_ERROR, "Discarding unsupported Osmux FT %d\n",
+ osmuxh->ft);
+ return NULL;
+ }
+}
+
+static struct msgb *
+osmux_rebuild_rtp(struct osmux_out_handle *h, struct osmux_hdr *osmuxh,
+ void *payload, int payload_len, bool first_pkt)
+{
+ struct msgb *prev_msg, *out_msg;
+ struct timespec *prev_ts, *out_ts;
+ struct rtp_hdr *rtph;
+ struct amr_hdr *amrh;
+ struct timespec delta = { .tv_sec = 0, .tv_nsec = DELTA_RTP_MSG*1000 };
+ unsigned int msg_len = sizeof(struct rtp_hdr) +
+ sizeof(struct amr_hdr) +
+ payload_len;
+
+ if (h->rtp_msgb_alloc_cb) {
+ out_msg = h->rtp_msgb_alloc_cb(h->rtp_msgb_alloc_cb_data, msg_len);
+ } else {
+ out_msg = msgb_alloc(msg_len, "osmux-rtp");
+ }
+ if (out_msg == NULL)
+ return NULL;
+
+ /* Reconstruct RTP header */
+ rtph = (struct rtp_hdr *)out_msg->data;
+ rtph->csrc_count = 0;
+ rtph->extension = 0;
+ rtph->version = RTP_VERSION;
+ rtph->payload_type = h->rtp_payload_type;
+ /* ... emulate timestamp and ssrc */
+ rtph->timestamp = htonl(h->rtp_timestamp);
+ rtph->sequence = htons(h->rtp_seq);
+ rtph->ssrc = htonl(h->rtp_ssrc);
+ /* rtp packet with the marker bit is always guaranteed to be the first
+ * one. We want to notify with marker in 2 scenarios:
+ * 1- Sender told us through osmux frame rtp_m.
+ * 2- Intermediate osmux frame lost (seq gap), otherwise rtp receiver only sees
+ * steady increase of delay
+ */
+ rtph->marker = first_pkt &&
+ (osmuxh->rtp_m || (osmuxh->seq != ((h->osmux_seq_ack + 1) & 0xff)));
+
+ msgb_put(out_msg, sizeof(struct rtp_hdr));
+
+ /* Reconstruct AMR header */
+ amrh = (struct amr_hdr *)out_msg->tail;
+ amrh->cmr = osmuxh->amr_cmr;
+ amrh->f = osmuxh->amr_f;
+ amrh->ft = osmuxh->amr_ft;
+ amrh->q = osmuxh->amr_q;
+
+ msgb_put(out_msg, sizeof(struct amr_hdr));
+
+ /* add AMR speech data */
+ if (payload_len > 0) {
+ memcpy(out_msg->tail, payload, payload_len);
+ msgb_put(out_msg, payload_len);
+ }
+
+ /* bump last RTP sequence number and timestamp that has been used */
+ h->rtp_seq++;
+ h->rtp_timestamp += DELTA_RTP_TIMESTAMP;
+
+ out_ts = ((struct timespec *)&((out_msg)->cb[0]));
+ if (first_pkt || llist_empty(&h->list)) {
+ osmo_clock_gettime(CLOCK_MONOTONIC, out_ts);
+ } else {
+ prev_msg = llist_last_entry(&h->list, struct msgb, list);
+ prev_ts = ((struct timespec *)&((prev_msg)->cb[0]));
+ timespecadd(prev_ts, &delta, out_ts);
+ }
+
+ return out_msg;
+}
+
+static void osmux_xfrm_output_trigger(void *data)
+{
+ struct osmux_out_handle *h = data;
+ struct timespec delay_ts, now;
+ struct msgb *msg, *next;
+
+ llist_for_each_entry_safe(msg, next, &h->list, list) {
+ osmo_clock_gettime(CLOCK_MONOTONIC, &now);
+ struct timespec *msg_ts = ((struct timespec *)&((msg)->cb[0]));
+ if (timespeccmp(msg_ts, &now, >)) {
+ timespecsub(msg_ts, &now, &delay_ts);
+ osmo_timer_schedule(&h->timer,
+ delay_ts.tv_sec, delay_ts.tv_nsec / 1000);
+ return;
+ }
+
+ /* Transmit the rtp packet */
+ llist_del(&msg->list);
+ if (h->tx_cb)
+ h->tx_cb(msg, h->data);
+ else
+ msgb_free(msg);
+ }
+}
+
+/*! \brief Generate RTP packets from osmux frame AMR payload set and schedule
+ * them for transmission at appropriate time.
+ * \param[in] h the osmux out handle handling a specific CID
+ * \param[in] osmuxh Buffer pointing to osmux frame header structure and AMR payload
+ * \return Number of generated RTP packets
+ *
+ * The osmux frame passed to this function must be of the type OSMUX_FT_VOICE_AMR.
+ * The generated RTP packets are kept into h's internal list and sent to the
+ * callback configured through osmux_xfrm_output_set_tx_cb when are ready to be
+ * transmitted according to schedule.
+ */
+int osmux_xfrm_output_sched(struct osmux_out_handle *h, struct osmux_hdr *osmuxh)
+{
+ struct timespec now, *msg_ts;
+ struct msgb *msg;
+ int i;
+ bool was_empty = llist_empty(&h->list);
+
+ if (!was_empty) {
+ /* If we received new data it means we are behind schedule and
+ * we should flush all previous quickly */
+ osmo_clock_gettime(CLOCK_MONOTONIC, &now);
+ llist_for_each_entry(msg, &h->list, list) {
+ msg_ts = ((struct timespec *)&((msg)->cb[0]));
+ *msg_ts = now;
+ }
+ osmo_timer_schedule(&h->timer, 0, 0);
+ }
+
+ for (i = 0; i < osmuxh->ctr + 1; i++) {
+ struct rtp_hdr *rtph;
+
+ msg = osmux_rebuild_rtp(h, osmuxh,
+ osmux_get_payload(osmuxh) +
+ i * osmo_amr_bytes(osmuxh->amr_ft),
+ osmo_amr_bytes(osmuxh->amr_ft), !i);
+ if (msg == NULL)
+ continue;
+
+ rtph = osmo_rtp_get_hdr(msg);
+ if (rtph == NULL)
+ continue;
+
+ llist_add_tail(&msg->list, &h->list);
+ }
+
+ /* Update last seen seq number: */
+ h->osmux_seq_ack = osmuxh->seq;
+
+ /* In case list is still empty after parsing messages, no need to rearm */
+ if (was_empty && !llist_empty(&h->list))
+ osmux_xfrm_output_trigger(h);
+ return i;
+}
+
+/*! \brief Flush all scheduled RTP packets still pending to be transmitted
+ * \param[in] h the osmux out handle to flush
+ *
+ * This function will immediately call the transmit callback for all queued RTP
+ * packets, making sure the list ends up empty. It will also stop all internal
+ * timers to make sure the osmux_out_handle can be dropped or re-used by calling
+ * osmux_xfrm_output on it.
+ */
+void osmux_xfrm_output_flush(struct osmux_out_handle *h)
+{
+ struct msgb *msg, *next;
+
+ if (osmo_timer_pending(&h->timer))
+ osmo_timer_del(&h->timer);
+
+ llist_for_each_entry_safe(msg, next, &h->list, list) {
+ llist_del(&msg->list);
+ if (h->tx_cb)
+ h->tx_cb(msg, h->data);
+ else
+ msgb_free(msg);
+ }
+}
+
+struct osmux_tx_handle {
+ struct osmo_timer_list timer;
+ struct msgb *msg;
+ void (*tx_cb)(struct msgb *msg, void *data);
+ void *data;
+};
+
+static int osmux_xfrm_output_talloc_destructor(struct osmux_out_handle *h)
+{
+ osmux_xfrm_output_flush(h);
+ return 0;
+}
+
+/* Placeholder to avoid init code duplication while keeping backward
+ * compatilbility with deprecated osmux_xfrm_output_init{2}() APIs. */
+static void _osmux_xfrm_output_init(struct osmux_out_handle *h, uint32_t rtp_ssrc, uint8_t rtp_payload_type)
+{
+ h->rtp_seq = (uint16_t)random();
+ h->rtp_timestamp = (uint32_t)random();
+ h->rtp_ssrc = rtp_ssrc;
+ h->rtp_payload_type = rtp_payload_type;
+ INIT_LLIST_HEAD(&h->list);
+ osmo_timer_setup(&h->timer, osmux_xfrm_output_trigger, h);
+}
+
+/*! \brief Allocate a new osmux out handle
+ * \param[in] ctx talloc context to use when allocating the returned struct
+ * \return Allocated osmux out handle
+ *
+ * This object contains configuration and state to handle a specific CID in
+ * incoming network Osmux messages, repackaging the frames for that CID as RTP
+ * packets and pushing them up the protocol stack.
+ * Returned pointer can be freed with regular talloc_free, queue will be flushed
+ * and all internal data will be freed. */
+struct osmux_out_handle *osmux_xfrm_output_alloc(void *ctx)
+{
+ struct osmux_out_handle *h;
+
+ h = talloc_zero(ctx, struct osmux_out_handle);
+ OSMO_ASSERT(h);
+
+ _osmux_xfrm_output_init(h, (uint32_t)random(), 98);
+
+ talloc_set_destructor(h, osmux_xfrm_output_talloc_destructor);
+ return h;
+}
+
+/*! \deprecated: Use osmux_xfrm_output_alloc() and osmux_xfrm_output_set_rtp_*() instead */
+void osmux_xfrm_output_init2(struct osmux_out_handle *h, uint32_t rtp_ssrc, uint8_t rtp_payload_type)
+{
+ memset(h, 0, sizeof(*h));
+ _osmux_xfrm_output_init(h, rtp_ssrc, rtp_payload_type);
+}
+
+/*! \deprecated: Use osmux_xfrm_output_alloc() and osmux_xfrm_output_set_rtp_*() instead */
+void osmux_xfrm_output_init(struct osmux_out_handle *h, uint32_t rtp_ssrc)
+{
+ /* backward compatibility with old users, where 98 was harcoded in osmux_rebuild_rtp() */
+ memset(h, 0, sizeof(*h));
+ _osmux_xfrm_output_init(h, rtp_ssrc, 98);
+}
+
+/*! \brief Set transmission callback to call when a generated RTP packet is to be transmitted
+ * \param[in] h the osmux out handle handling a specific CID
+ * \param[in] osmuxh Buffer pointing to osmux frame header structure and AMR payload
+ * \return Number of generated RTP packets
+ *
+ * This Function sets the callback called by the interal timer set by
+ * osmux_xfrm_out_sched function.
+ */
+void osmux_xfrm_output_set_tx_cb(struct osmux_out_handle *h,
+ void (*tx_cb)(struct msgb *msg, void *data),
+ void *data)
+{
+ h->tx_cb = tx_cb;
+ h->data = data;
+}
+
+/*! \brief Set callback to call when an RTP packet to be generated is to be allocated
+ * \param[in] h the osmux out handle handling a specific CID
+ * \param[in] cb User defined msgb alloc function for generated RTP pkts
+ * \param[in] cb_data Opaque data pointer set by user and passed in \ref cb
+ * \return msgb structure to be used to fill in generated RTP pkt content
+ */
+void osmux_xfrm_output_set_rtp_msgb_alloc_cb(struct osmux_out_handle *h,
+ rtp_msgb_alloc_cb_t cb,
+ void *cb_data)
+{
+ h->rtp_msgb_alloc_cb = cb;
+ h->rtp_msgb_alloc_cb_data = cb_data;
+}
+
+/*! \brief Set SSRC of generated RTP packets from Osmux frames
+ * \param[in] h the osmux out handle handling a specific CID
+ * \param[in] rtp_ssrc the RTP SSRC to set
+ */
+void osmux_xfrm_output_set_rtp_ssrc(struct osmux_out_handle *h, uint32_t rtp_ssrc)
+{
+ h->rtp_ssrc = rtp_ssrc;
+}
+
+/*! \brief Set Payload Type of generated RTP packets from Osmux frames
+ * \param[in] h the osmux out handle handling a specific CID
+ * \param[in] rtp_payload_type the RTP Payload Type to set
+ */
+void osmux_xfrm_output_set_rtp_pl_type(struct osmux_out_handle *h, uint32_t rtp_payload_type)
+{
+ h->rtp_payload_type = rtp_payload_type;
+}
+
+/*! @} */
diff --git a/src/prim.c b/src/prim.c
new file mode 100644
index 0000000..eae1064
--- /dev/null
+++ b/src/prim.c
@@ -0,0 +1,473 @@
+/* (C) 2021 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
+ * Author: Pau Espin Pedrol <pespin@sysmocom.de>
+ * All Rights Reserved
+ *
+ * SPDX-License-Identifier: GPL-2.0+
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <inttypes.h>
+
+#include <osmocom/core/talloc.h>
+#include <osmocom/core/select.h>
+#include <osmocom/core/socket.h>
+#include <osmocom/core/logging.h>
+
+#include <osmocom/netif/prim.h>
+#include <osmocom/netif/stream.h>
+
+struct osmo_prim_pkt_hdr {
+ uint32_t sap; /*!< Service Access Point Identifier */
+ uint16_t primitive; /*!< Primitive number */
+ uint16_t operation; /*! Primitive Operation (enum osmo_prim_operation) */
+} __attribute__ ((packed));
+
+/* Here we take advantage of the fact that sizeof(struct
+ * osmo_prim_pkt_hdr) <= sizeof(struct osmo_prim_hdr), so we don't need
+ * to allocate headroom when serializing later.
+ */
+osmo_static_assert(sizeof(struct osmo_prim_pkt_hdr) <= sizeof(struct osmo_prim_hdr),
+ osmo_prim_msgb_alloc_validate_headroom);
+
+/*! Allocate a primitive of given type and its associated msgb.
+* \param[in] sap Service Access Point
+* \param[in] primitive Primitive Number
+* \param[in] operation Primitive Operation (REQ/RESP/IND/CONF)
+* \param[in] alloc_len Total length (including struct osmo_prim_hdr) to allocate for the primitive
+* \returns Pointer to allocated prim_hdr inisde its own msgb. The osmo_prim_hdr
+* is pre-alocated & pre-filled.
+*/
+struct osmo_prim_hdr *osmo_prim_msgb_alloc(unsigned int sap, unsigned int primitive,
+ enum osmo_prim_operation operation, size_t alloc_len)
+{
+ struct msgb *msg;
+ struct osmo_prim_hdr *oph;
+
+ if (alloc_len < sizeof(*oph))
+ return NULL;
+
+ msg = msgb_alloc(alloc_len, "osmo_prim_msgb_alloc");
+ oph = (struct osmo_prim_hdr *)msgb_put(msg, sizeof(*oph));
+ osmo_prim_init(oph, sap, primitive, operation, msg);
+ msg->l2h = msg->tail;
+
+ return oph;
+}
+
+struct osmo_prim_srv_link {
+ void *priv;
+ char *addr;
+ int log_cat; /* Defaults to DLGLOBAL */
+ struct osmo_stream_srv_link *stream;
+ osmo_prim_srv_conn_cb opened_conn_cb;
+ osmo_prim_srv_conn_cb closed_conn_cb;
+ osmo_prim_srv_rx_sapi_version rx_sapi_version_cb;
+ osmo_prim_srv_rx_cb rx_cb;
+ size_t rx_msgb_alloc_len;
+};
+
+struct osmo_prim_srv {
+ void *priv;
+ struct osmo_prim_srv_link *link; /* backpointer */
+ struct osmo_stream_srv *stream;
+};
+
+/******************************
+ * CONTROL SAP
+ ******************************/
+#define OSMO_PRIM_CTL_SAPI 0xffffffff
+#define OSMO_PRIM_CTL_API_VERSION 0
+
+enum sap_ctl_prim_type {
+ SAP_CTL_PRIM_HELLO,
+ _SAP_CTL_PRIM_MAX
+};
+
+const struct value_string sap_ctl_prim_type_names[] = {
+ OSMO_VALUE_STRING(SAP_CTL_PRIM_HELLO),
+ { 0, NULL }
+};
+
+/* HNB_CTL_PRIM_HELLO.ind, UL */
+struct sap_ctl_hello_param {
+ uint32_t sapi; /* SAPI for which we negotiate version */
+ uint16_t api_version; /* The intended version */
+} __attribute__ ((packed));
+
+struct sap_ctl_prim {
+ struct osmo_prim_hdr hdr;
+ union {
+ struct sap_ctl_hello_param hello_req;
+ struct sap_ctl_hello_param hello_cnf;
+ } u;
+} __attribute__ ((packed));
+
+static struct sap_ctl_prim *_sap_ctl_makeprim_hello_cnf(uint32_t sapi, uint16_t api_version)
+{
+ struct sap_ctl_prim *ctl_prim;
+
+ ctl_prim = (struct sap_ctl_prim *)osmo_prim_msgb_alloc(
+ OSMO_PRIM_CTL_SAPI, SAP_CTL_PRIM_HELLO, PRIM_OP_CONFIRM,
+ sizeof(struct osmo_prim_hdr) + sizeof(struct sap_ctl_hello_param));
+ msgb_put(ctl_prim->hdr.msg, sizeof(struct sap_ctl_hello_param));
+ ctl_prim->u.hello_cnf.sapi = sapi;
+ ctl_prim->u.hello_cnf.api_version = api_version;
+
+ return ctl_prim;
+}
+
+/******************************
+ * osmo_prim_srv
+ ******************************/
+#define LOGSRV(srv, lvl, fmt, args...) LOGP((srv)->link->log_cat, lvl, fmt, ## args)
+
+static int _srv_sap_ctl_rx_hello_req(struct osmo_prim_srv *prim_srv, struct sap_ctl_hello_param *hello_ind)
+{
+ struct sap_ctl_prim *prim_resp;
+ int rc;
+
+ LOGSRV(prim_srv, LOGL_INFO, "Rx CTL-HELLO.req SAPI=%u API_VERSION=%u\n", hello_ind->sapi, hello_ind->api_version);
+
+ if (hello_ind->sapi == OSMO_PRIM_CTL_SAPI)
+ rc = hello_ind->api_version == OSMO_PRIM_CTL_API_VERSION ? OSMO_PRIM_CTL_API_VERSION : -1;
+ else if (prim_srv->link->rx_sapi_version_cb)
+ rc = prim_srv->link->rx_sapi_version_cb(prim_srv, hello_ind->sapi, hello_ind->api_version);
+ else /* Accept whatever version by default: */
+ rc = hello_ind->api_version;
+
+ if (rc < 0) {
+ LOGSRV(prim_srv, LOGL_ERROR,
+ "SAPI=%u API_VERSION=%u not supported! destroying connection\n",
+ hello_ind->sapi, hello_ind->api_version);
+ osmo_stream_srv_set_flush_and_destroy(prim_srv->stream);
+ return rc;
+ }
+ prim_resp = _sap_ctl_makeprim_hello_cnf(hello_ind->sapi, (uint16_t)rc);
+ LOGSRV(prim_srv, LOGL_INFO, "Tx CTL-HELLO.cnf SAPI=%u API_VERSION=%u\n",
+ hello_ind->sapi, prim_resp->u.hello_cnf.api_version);
+ rc = osmo_prim_srv_send(prim_srv, prim_resp->hdr.msg);
+ return rc;
+}
+
+static int _srv_sap_ctl_rx(struct osmo_prim_srv *prim_srv, struct osmo_prim_hdr *oph)
+{
+ switch (oph->operation) {
+ case PRIM_OP_REQUEST:
+ switch (oph->primitive) {
+ case SAP_CTL_PRIM_HELLO:
+ return _srv_sap_ctl_rx_hello_req(prim_srv, (struct sap_ctl_hello_param *)msgb_data(oph->msg));
+ default:
+ LOGSRV(prim_srv, LOGL_ERROR, "Rx unknown CTL SAP primitive %u (len=%u)\n",
+ oph->primitive, msgb_length(oph->msg));
+ return -EINVAL;
+ }
+ break;
+ case PRIM_OP_RESPONSE:
+ case PRIM_OP_INDICATION:
+ case PRIM_OP_CONFIRM:
+ default:
+ LOGSRV(prim_srv, LOGL_ERROR, "Rx CTL SAP unexpected primitive operation %s-%s (len=%u)\n",
+ get_value_string(sap_ctl_prim_type_names, oph->primitive),
+ get_value_string(osmo_prim_op_names, oph->operation),
+ msgb_length(oph->msg));
+ return -EINVAL;
+ }
+}
+
+static int _osmo_prim_srv_read_cb(struct osmo_stream_srv *srv)
+{
+ struct osmo_prim_srv *prim_srv = osmo_stream_srv_get_data(srv);
+ struct osmo_prim_pkt_hdr *pkth;
+ struct msgb *msg;
+ struct osmo_prim_hdr oph;
+ int rc;
+
+ msg = msgb_alloc_c(prim_srv, sizeof(*pkth) + prim_srv->link->rx_msgb_alloc_len,
+ "osmo_prim_srv_link_rx");
+ if (!msg)
+ return -ENOMEM;
+ rc = osmo_stream_srv_recv(srv, msg);
+ if (rc == 0)
+ goto close;
+
+ if (rc < 0) {
+ if (errno == EAGAIN) {
+ msgb_free(msg);
+ return 0;
+ }
+ goto close;
+ }
+
+ if (rc < sizeof(*pkth)) {
+ LOGSRV(prim_srv, LOGL_ERROR, "Received %d bytes on UD Socket, but primitive hdr size "
+ "is %zu, discarding\n", rc, sizeof(*pkth));
+ msgb_free(msg);
+ return 0;
+ }
+ pkth = (struct osmo_prim_pkt_hdr *)msgb_data(msg);
+
+ /* De-serialize message: */
+ osmo_prim_init(&oph, pkth->sap, pkth->primitive, pkth->operation, msg);
+ msgb_pull(msg, sizeof(*pkth));
+
+ switch (oph.sap) {
+ case OSMO_PRIM_CTL_SAPI:
+ rc = _srv_sap_ctl_rx(prim_srv, &oph);
+ break;
+ default:
+ if (prim_srv->link->rx_cb)
+ rc = prim_srv->link->rx_cb(prim_srv, &oph);
+ break;
+ }
+ /* as we always synchronously process the message in _osmo_prim_srv_link_rx() and
+ * its callbacks, we can free the message here. */
+ msgb_free(msg);
+
+ return rc;
+
+close:
+ msgb_free(msg);
+ osmo_prim_srv_close(prim_srv);
+ return -1;
+}
+
+static void osmo_prim_srv_free(struct osmo_prim_srv *prim_srv);
+static int _osmo_prim_srv_closed_cb(struct osmo_stream_srv *srv)
+{
+ struct osmo_prim_srv *prim_srv = osmo_stream_srv_get_data(srv);
+ struct osmo_prim_srv_link *prim_link = prim_srv->link;
+ if (prim_link->closed_conn_cb)
+ return prim_link->closed_conn_cb(prim_srv);
+ osmo_prim_srv_free(prim_srv);
+ return 0;
+}
+
+/*! Allocate a primitive of given type and its associated msgb.
+* \param[in] srv The osmo_prim_srv_link instance where message is to be sent through
+* \param[in] msg msgb containing osmo_prim_hdr plus extra content, allocated through \ref osmo_prim_msgb_alloc()
+* \returns zero on success, negative on error */
+int osmo_prim_srv_send(struct osmo_prim_srv *prim_srv, struct msgb *msg)
+{
+ struct osmo_prim_hdr *oph;
+ struct osmo_prim_pkt_hdr *pkth;
+ unsigned int sap;
+ unsigned int primitive;
+ enum osmo_prim_operation operation;
+
+ OSMO_ASSERT(prim_srv);
+
+ /* Serialize the oph: */
+ oph = (struct osmo_prim_hdr *)msgb_data(msg);
+ OSMO_ASSERT(oph && msgb_length(msg) >= sizeof(*oph));
+ sap = oph->sap;
+ primitive = oph->primitive;
+ operation = oph->operation;
+ msgb_pull(msg, sizeof(*oph));
+ pkth = (struct osmo_prim_pkt_hdr *)msgb_push(msg, sizeof(*pkth));
+ pkth->sap = sap;
+ pkth->primitive = primitive;
+ pkth->operation = operation;
+
+ /* Finally enqueue the msg */
+ osmo_stream_srv_send(prim_srv->stream, msg);
+
+ return 0;
+}
+
+static struct osmo_prim_srv *osmo_prim_srv_alloc(struct osmo_prim_srv_link *prim_link, int fd)
+{
+ struct osmo_prim_srv *prim_srv;
+ prim_srv = talloc_zero(prim_link, struct osmo_prim_srv);
+ if (!prim_srv)
+ return NULL;
+ prim_srv->link = prim_link;
+ prim_srv->stream = osmo_stream_srv_create(prim_link, prim_link->stream, fd,
+ _osmo_prim_srv_read_cb,
+ _osmo_prim_srv_closed_cb,
+ prim_srv);
+ if (!prim_srv->stream) {
+ talloc_free(prim_srv);
+ return NULL;
+ }
+ /* Inherit link priv pointer by default, user can later set it through API: */
+ prim_srv->priv = prim_link->priv;
+ return prim_srv;
+}
+
+static void osmo_prim_srv_free(struct osmo_prim_srv *prim_srv)
+{
+ talloc_free(prim_srv);
+}
+
+void osmo_prim_srv_set_name(struct osmo_prim_srv *prim_srv, const char *name)
+{
+ osmo_stream_srv_set_name(prim_srv->stream, name);
+}
+
+struct osmo_prim_srv_link *osmo_prim_srv_get_link(struct osmo_prim_srv *prim_srv)
+{
+ return prim_srv->link;
+}
+
+void osmo_prim_srv_set_priv(struct osmo_prim_srv *prim_srv, void *priv)
+{
+ prim_srv->priv = priv;
+}
+
+void *osmo_prim_srv_get_priv(const struct osmo_prim_srv *prim_srv)
+{
+ return prim_srv->priv;
+}
+
+void osmo_prim_srv_close(struct osmo_prim_srv *prim_srv)
+{
+ osmo_stream_srv_destroy(prim_srv->stream);
+ /* we free prim_srv in _osmo_prim_srv_closed_cb() */
+}
+
+/******************************
+ * osmo_prim_srv_link
+ ******************************/
+
+#define LOGSRVLINK(srv, lvl, fmt, args...) LOGP((srv)->log_cat, lvl, fmt, ## args)
+
+/* accept connection coming from PCU */
+static int _osmo_prim_srv_link_accept(struct osmo_stream_srv_link *link, int fd)
+{
+ struct osmo_prim_srv *prim_srv;
+ struct osmo_prim_srv_link *prim_link = osmo_stream_srv_link_get_data(link);
+
+ prim_srv = osmo_prim_srv_alloc(prim_link, fd);
+
+ if (prim_link->opened_conn_cb)
+ return prim_link->opened_conn_cb(prim_srv);
+
+ return 0;
+}
+
+struct osmo_prim_srv_link *osmo_prim_srv_link_alloc(void *ctx)
+{
+ struct osmo_prim_srv_link *prim_link;
+ prim_link = talloc_zero(ctx, struct osmo_prim_srv_link);
+ if (!prim_link)
+ return NULL;
+ prim_link->stream = osmo_stream_srv_link_create(prim_link);
+ if (!prim_link->stream) {
+ talloc_free(prim_link);
+ return NULL;
+ }
+ osmo_stream_srv_link_set_data(prim_link->stream, prim_link);
+ osmo_stream_srv_link_set_domain(prim_link->stream, AF_UNIX);
+ osmo_stream_srv_link_set_type(prim_link->stream, SOCK_SEQPACKET);
+ osmo_stream_srv_link_set_accept_cb(prim_link->stream, _osmo_prim_srv_link_accept);
+
+ prim_link->log_cat = DLGLOBAL;
+ prim_link->rx_msgb_alloc_len = 1600 - sizeof(struct osmo_prim_pkt_hdr);
+ return prim_link;
+}
+
+void osmo_prim_srv_link_free(struct osmo_prim_srv_link *prim_link)
+{
+ if (!prim_link)
+ return;
+
+ if (prim_link->stream) {
+ osmo_stream_srv_link_close(prim_link->stream);
+ osmo_stream_srv_link_destroy(prim_link->stream);
+ prim_link->stream = NULL;
+ }
+ talloc_free(prim_link);
+}
+
+void osmo_prim_srv_link_set_name(struct osmo_prim_srv_link *prim_link, const char *name)
+{
+ osmo_stream_srv_link_set_name(prim_link->stream, name);
+}
+
+int osmo_prim_srv_link_set_addr(struct osmo_prim_srv_link *prim_link, const char *path)
+{
+ osmo_talloc_replace_string(prim_link, &prim_link->addr, path);
+ osmo_stream_srv_link_set_addr(prim_link->stream, path);
+ return 0;
+}
+
+const char *osmo_prim_srv_link_get_addr(struct osmo_prim_srv_link *prim_link)
+{
+ return prim_link->addr;
+}
+
+void osmo_prim_srv_link_set_priv(struct osmo_prim_srv_link *prim_link, void *priv)
+{
+ prim_link->priv = priv;
+}
+
+void *osmo_prim_srv_link_get_priv(const struct osmo_prim_srv_link *prim_link)
+{
+ return prim_link->priv;
+}
+
+void osmo_prim_srv_link_set_log_category(struct osmo_prim_srv_link *prim_link, int log_cat)
+{
+ prim_link->log_cat = log_cat;
+}
+
+void osmo_prim_srv_link_set_opened_conn_cb(struct osmo_prim_srv_link *prim_link, osmo_prim_srv_conn_cb opened_conn_cb)
+{
+ prim_link->opened_conn_cb = opened_conn_cb;
+}
+void osmo_prim_srv_link_set_closed_conn_cb(struct osmo_prim_srv_link *prim_link, osmo_prim_srv_conn_cb closed_conn_cb)
+{
+ prim_link->closed_conn_cb = closed_conn_cb;
+}
+
+void osmo_prim_srv_link_set_rx_sapi_version_cb(struct osmo_prim_srv_link *prim_link, osmo_prim_srv_rx_sapi_version rx_sapi_version_cb)
+{
+ prim_link->rx_sapi_version_cb = rx_sapi_version_cb;
+}
+
+void osmo_prim_srv_link_set_rx_cb(struct osmo_prim_srv_link *prim_link, osmo_prim_srv_rx_cb rx_cb)
+{
+ prim_link->rx_cb = rx_cb;
+}
+
+void osmo_prim_srv_link_set_rx_msgb_alloc_len(struct osmo_prim_srv_link *prim_link, size_t alloc_len)
+{
+ prim_link->rx_msgb_alloc_len = alloc_len;
+}
+
+int osmo_prim_srv_link_open(struct osmo_prim_srv_link *prim_link)
+{
+ int rc;
+
+ if (!prim_link->addr) {
+ LOGSRVLINK(prim_link, LOGL_ERROR, "Cannot open, Address not configured\n");
+ return -1;
+ }
+
+ rc = osmo_stream_srv_link_open(prim_link->stream);
+
+ LOGSRVLINK(prim_link, LOGL_INFO, "Started listening on Lower Layer Unix Domain Socket: %s\n", prim_link->addr);
+
+ return rc;
+}
diff --git a/src/rs232.c b/src/rs232.c
index 28f1ba0..b20c111 100644
--- a/src/rs232.c
+++ b/src/rs232.c
@@ -62,7 +62,7 @@ void rs232_tx_timer_cb(void *ptr)
struct osmo_rs232 *r = ptr;
/* we're again ready to transmit. */
- r->ofd.when |= BSC_FD_WRITE;
+ osmo_fd_write_enable(&r->ofd);
}
static int handle_ser_write(struct osmo_fd *bfd)
@@ -72,15 +72,15 @@ static int handle_ser_write(struct osmo_fd *bfd)
struct msgb *msg;
int written;
- LOGP(DLINP, LOGL_DEBUG, "writing data to rs232\n");
+ LOGP(DLINP, LOGL_DEBUG, "writing data to rs232\n");
- if (llist_empty(&r->tx_queue)) {
- r->ofd.when &= ~BSC_FD_WRITE;
- return 0;
- }
- lh = r->tx_queue.next;
- llist_del(lh);
- msg = llist_entry(lh, struct msgb, list);
+ if (llist_empty(&r->tx_queue)) {
+ osmo_fd_write_disable(&r->ofd);
+ return 0;
+ }
+ lh = r->tx_queue.next;
+ llist_del(lh);
+ msg = llist_entry(lh, struct msgb, list);
written = write(bfd->fd, msg->data, msg->len);
if (written < msg->len) {
@@ -92,7 +92,7 @@ static int handle_ser_write(struct osmo_fd *bfd)
/* We've got more data to write, but we have to wait to make it. */
if (!llist_empty(&r->tx_queue) && r->cfg.delay_us) {
- r->ofd.when &= ~BSC_FD_WRITE;
+ osmo_fd_write_disable(&r->ofd);
osmo_timer_schedule(&r->tx_timer, 0, r->cfg.delay_us);
}
return 0;
@@ -114,13 +114,13 @@ static int serial_fd_cb(struct osmo_fd *bfd, unsigned int what)
{
int rc = 0;
- if (what & BSC_FD_READ)
+ if (what & OSMO_FD_READ)
rc = handle_ser_read(bfd);
if (rc < 0)
return rc;
- if (what & BSC_FD_WRITE)
+ if (what & OSMO_FD_WRITE)
rc = handle_ser_write(bfd);
return rc;
@@ -223,9 +223,7 @@ int osmo_rs232_open(struct osmo_rs232 *r)
return rc;
}
- bfd->when = BSC_FD_READ;
- bfd->cb = serial_fd_cb;
- bfd->data = r;
+ osmo_fd_setup(bfd, bfd->fd, OSMO_FD_READ, serial_fd_cb, r, 0);
rc = osmo_fd_register(bfd);
if (rc < 0) {
@@ -256,8 +254,8 @@ int osmo_rs232_read(struct osmo_rs232 *r, struct msgb *msg)
void osmo_rs232_write(struct osmo_rs232 *r, struct msgb *msg)
{
- msgb_enqueue(&r->tx_queue, msg);
- r->ofd.when |= BSC_FD_WRITE;
+ msgb_enqueue(&r->tx_queue, msg);
+ osmo_fd_write_enable(&r->ofd);
}
void osmo_rs232_close(struct osmo_rs232 *r)
diff --git a/src/rtp.c b/src/rtp.c
index 5718c5f..f4b0ada 100644
--- a/src/rtp.c
+++ b/src/rtp.c
@@ -89,13 +89,14 @@ int osmo_rtp_handle_tx_set_timestamp(struct osmo_rtp_handle *h, uint32_t timesta
struct rtp_hdr *osmo_rtp_get_hdr(struct msgb *msg)
{
- struct rtp_hdr *rtph = (struct rtp_hdr *)msg->data;
+ struct rtp_hdr *rtph;
if (msg->len < sizeof(struct rtp_hdr)) {
DEBUGPC(DLMUX, "received RTP frame too short (len = %d)\n",
msg->len);
return NULL;
}
+ rtph = (struct rtp_hdr *)msg->data;
if (rtph->version != RTP_VERSION) {
DEBUGPC(DLMUX, "received RTP version %d not supported.\n",
rtph->version);
@@ -198,7 +199,8 @@ osmo_rtp_build(struct osmo_rtp_handle *h, uint8_t payload_type,
rtph->timestamp = htonl(h->tx.timestamp);
h->tx.timestamp += duration;
rtph->ssrc = htonl(h->tx.ssrc);
- memcpy(msg->data + sizeof(struct rtp_hdr), data, payload_len);
+ if (payload_len > 0)
+ memcpy(msg->data + sizeof(struct rtp_hdr), data, payload_len);
msgb_put(msg, sizeof(struct rtp_hdr) + payload_len);
return msg;
diff --git a/src/sctp.c b/src/sctp.c
new file mode 100644
index 0000000..807bdac
--- /dev/null
+++ b/src/sctp.c
@@ -0,0 +1,95 @@
+#include <netinet/sctp.h>
+#include <osmocom/netif/sctp.h>
+
+const struct value_string osmo_sctp_assoc_chg_strs[] = {
+ { SCTP_COMM_UP, "COMM_UP" },
+ { SCTP_COMM_LOST, "COMM_LOST" },
+ { SCTP_RESTART, "RESTART" },
+ { SCTP_SHUTDOWN_COMP, "SHUTDOWN_COMP" },
+ { SCTP_CANT_STR_ASSOC, "CANT_STR_ASSOC" },
+ { 0, NULL }
+};
+
+const struct value_string osmo_sctp_paddr_chg_strs[] = {
+ { SCTP_ADDR_AVAILABLE, "ADDR_AVAILABLE" },
+ { SCTP_ADDR_UNREACHABLE, "ADDR_UNREACHABLE" },
+ { SCTP_ADDR_REMOVED, "ADDR_REMOVED" },
+ { SCTP_ADDR_ADDED, "ADDR_ADDED" },
+ { SCTP_ADDR_MADE_PRIM, "ADDR_MADE_PRIM" },
+ { SCTP_ADDR_CONFIRMED, "ADDR_CONFIRMED" },
+#ifdef SCTP_ADDR_PF
+ { SCTP_ADDR_PF, "ADDR_POTENTIALLY_FAILED" },
+#endif
+ { 0, NULL }
+};
+
+const struct value_string osmo_sctp_sn_type_strs[] = {
+ { SCTP_ASSOC_CHANGE, "ASSOC_CHANGE" },
+ { SCTP_PEER_ADDR_CHANGE, "PEER_ADDR_CHANGE" },
+ { SCTP_SHUTDOWN_EVENT, "SHUTDOWN_EVENT" },
+ { SCTP_SEND_FAILED, "SEND_FAILED" },
+ { SCTP_REMOTE_ERROR, "REMOTE_ERROR" },
+ { SCTP_PARTIAL_DELIVERY_EVENT, "PARTIAL_DELIVERY_EVENT" },
+ { SCTP_ADAPTATION_INDICATION, "ADAPTATION_INDICATION" },
+#ifdef SCTP_AUTHENTICATION_INDICATION
+ { SCTP_AUTHENTICATION_INDICATION, "AUTHENTICATION_INDICATION" },
+#endif
+#ifdef SCTP_SENDER_DRY_EVENT
+ { SCTP_SENDER_DRY_EVENT, "SENDER_DRY_EVENT" },
+#endif
+ { 0, NULL }
+};
+
+
+const struct value_string osmo_sctp_sn_error_strs[] = {
+ { SCTP_FAILED_THRESHOLD, "FAILED_THRESHOLD" },
+ { SCTP_RECEIVED_SACK, "RECEIVED_SACK" },
+ { SCTP_HEARTBEAT_SUCCESS, "HEARTBEAT_SUCCESS" },
+ { SCTP_RESPONSE_TO_USER_REQ, "RESPONSE_TO_USER_REQ" },
+ { SCTP_INTERNAL_ERROR, "INTERNAL_ERROR" },
+ { SCTP_SHUTDOWN_GUARD_EXPIRES, "SHUTDOWN_GUARD_EXPIRES" },
+ { SCTP_PEER_FAULTY, "PEER_FAULTY" },
+ { 0, NULL }
+};
+
+/* rfc4960 section 3.3.10 "Operation Error", in host byte order */
+const struct value_string osmo_sctp_op_error_strs[] = {
+ { OSMO_SCTP_OP_ERR_INVALID_STREAM_ID, "Invalid Stream Identifier" },
+ { OSMO_SCTP_OP_ERR_MISS_MAND_PARAM, "Missing Mandatory Parameter" },
+ { OSMO_SCTP_OP_ERR_STALE_COOKIE, "Stale Cookie Error" },
+ { OSMO_SCTP_OP_ERR_NO_RESOURCES, "Out of Resource" },
+ { OSMO_SCTP_OP_ERR_UNRESOLV_ADDR, "Unresolvable Address" },
+ { OSMO_SCTP_OP_ERR_UNKN_CHUNK_TYPE, "Unrecognized Chunk Type" },
+ { OSMO_SCTP_OP_ERR_INVALID_MAND_PARAM, "Invalid Mandatory Parameter" },
+ { OSMO_SCTP_OP_ERR_UNKN_PARAM, "Unrecognized Parameters" },
+ { OSMO_SCTP_OP_ERR_NO_USER_DATA, "No User Data" },
+ { OSMO_SCTP_OP_ERR_COOKIE_RX_WHILE_SHUTDOWN, "Cookie Received While Shutting Down" },
+ { OSMO_SCTP_OP_ERR_RESTART_ASSC_NEW_ADDR, "Restart of an Association with New Addresses" },
+ { OSMO_SCTP_OP_ERR_UNER_INITED_ABORT, "User Initiated Abort" },
+ { OSMO_SCTP_OP_ERR_PROTO_VERSION, "Protocol Violation" },
+ { 0, NULL }
+};
+
+/* linux/sctp.h enum sctp_spinfo_state */
+const struct value_string osmo_sctp_spinfo_state_strs[] = {
+ { SCTP_INACTIVE, "INACTIVE" },
+ { SCTP_PF, "POTENTIALLY_FAILED" },
+ { SCTP_ACTIVE, "ACTIVE" },
+ { SCTP_UNCONFIRMED, "UNCONFIRMED" },
+ { SCTP_UNKNOWN, "UNKNOWN" },
+ { 0, NULL }
+};
+
+/* linux/sctp.h enum sctp_sstat_state */
+const struct value_string osmo_sctp_sstat_state_strs[] = {
+ { SCTP_EMPTY, "EMPTY" },
+ { SCTP_CLOSED, "CLOSED" },
+ { SCTP_COOKIE_WAIT, "COOKIE_WAIT" },
+ { SCTP_COOKIE_ECHOED, "COOKIE_ECHOED" },
+ { SCTP_ESTABLISHED, "ESTABLISHED" },
+ { SCTP_SHUTDOWN_PENDING, "SHUTDOWN_PENDING" },
+ { SCTP_SHUTDOWN_SENT, "SHUTDOWN_SENT" },
+ { SCTP_SHUTDOWN_RECEIVED, "SHUTDOWN_RECEIVED" },
+ { SCTP_SHUTDOWN_ACK_SENT, "SHUTDOWN_ACK_SENT" },
+ { 0, NULL }
+};
diff --git a/src/stream.c b/src/stream.c
index 3d0b665..f8cbed6 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -1,5 +1,6 @@
/* (C) 2011 by Pablo Neira Ayuso <pablo@gnumonks.org>
* (C) 2015-2016 by Harald Welte <laforge@gnumonks.org>
+ * (C) 2023 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
* All Rights Reserved.
*
* SPDX-License-Identifier: GPL-2.0+
@@ -37,43 +38,117 @@
#include <osmocom/core/utils.h>
#include <osmocom/gsm/tlv.h>
#include <osmocom/core/msgb.h>
+#include <osmocom/core/osmo_io.h>
+#include <osmocom/core/panic.h>
#include <osmocom/core/logging.h>
#include <osmocom/core/talloc.h>
#include <osmocom/core/socket.h>
#include <osmocom/netif/stream.h>
+#include <osmocom/netif/stream_private.h>
#include "config.h"
+#include <osmocom/netif/sctp.h>
+
+/*! \cond private */
+
#ifdef HAVE_LIBSCTP
-#include <netinet/sctp.h>
-#endif
-#define LOGSCLI(cli, level, fmt, args...) \
- LOGP(DLINP, level, "[%s] %s(): " fmt, get_value_string(stream_cli_state_names, (cli)->state), __func__, ## args)
+/* is any of the bytes from offset .. u8_size in 'u8' non-zero? return offset or -1 if all zero */
+static int byte_nonzero(const uint8_t *u8, unsigned int offset, unsigned int u8_size)
+{
+ int j;
+
+ for (j = offset; j < u8_size; j++) {
+ if (u8[j] != 0)
+ return j;
+ }
+
+ return -1;
+}
+
+static unsigned int sctp_sockopt_event_subscribe_size = 0;
+
+static int determine_sctp_sockopt_event_subscribe_size(void)
+{
+ uint8_t buf[256];
+ socklen_t buf_len = sizeof(buf);
+ int sd, rc;
+
+ /* only do this once */
+ if (sctp_sockopt_event_subscribe_size > 0)
+ return 0;
+
+ sd = socket(AF_INET, SOCK_STREAM, IPPROTO_SCTP);
+ if (sd < 0)
+ return sd;
+
+ rc = getsockopt(sd, IPPROTO_SCTP, SCTP_EVENTS, buf, &buf_len);
+ close(sd);
+ if (rc < 0)
+ return rc;
+
+ sctp_sockopt_event_subscribe_size = (unsigned int)buf_len;
-/*! \addtogroup stream Osmocom Stream Socket
- * @{
+ LOGP(DLINP, LOGL_INFO, "sizes of 'struct sctp_event_subscribe': compile-time %zu, kernel: %u\n",
+ sizeof(struct sctp_event_subscribe), sctp_sockopt_event_subscribe_size);
+ return 0;
+}
+
+/* Attempt to work around Linux kernel ABI breakage
*
- * This code is intended to abstract any use of stream-type sockets,
- * such as TCP and SCTP. It offers both server and client side
- * implementations, fully integrated with the libosmocore select loop
- * abstraction.
- */
+ * The Linux kernel ABI for the SCTP_EVENTS socket option has been broken repeatedly.
+ * - until commit 35ea82d611da59f8bea44a37996b3b11bb1d3fd7 ( kernel < 4.11), the size is 10 bytes
+ * - in 4.11 it is 11 bytes
+ * - in 4.12 .. 5.4 it is 13 bytes
+ * - in kernels >= 5.5 it is 14 bytes
+ *
+ * This wouldn't be a problem if the kernel didn't have a "stupid" assumption that the structure
+ * size passed by userspace will match 1:1 the length of the structure at kernel compile time. In
+ * an ideal world, it would just use the known first bytes and assume the remainder is all zero.
+ * But as it doesn't do that, let's try to work around this */
+static int sctp_setsockopt_events_linux_workaround(int fd, const struct sctp_event_subscribe *event)
+{
-/*! \file stream.c
- * \brief Osmocom stream socket helpers
- */
+ const unsigned int compiletime_size = sizeof(*event);
+ int rc;
-/*
- * Platforms that don't have MSG_NOSIGNAL (which disables SIGPIPE)
- * usually have SO_NOSIGPIPE (set via setsockopt).
- */
-#ifndef MSG_NOSIGNAL
-#define MSG_NOSIGNAL 0
-#endif
+ if (determine_sctp_sockopt_event_subscribe_size() < 0) {
+ LOGP(DLINP, LOGL_ERROR, "Cannot determine SCTP_EVENTS socket option size\n");
+ return -1;
+ }
+
+ if (compiletime_size == sctp_sockopt_event_subscribe_size) {
+ /* no kernel workaround needed */
+ return setsockopt(fd, IPPROTO_SCTP, SCTP_EVENTS, event, compiletime_size);
+ } else if (compiletime_size < sctp_sockopt_event_subscribe_size) {
+ /* we are using an older userspace with a more modern kernel and hence need
+ * to pad the data */
+ uint8_t buf[sctp_sockopt_event_subscribe_size];
+
+ memcpy(buf, event, compiletime_size);
+ memset(buf + sizeof(*event), 0, sctp_sockopt_event_subscribe_size - compiletime_size);
+ return setsockopt(fd, IPPROTO_SCTP, SCTP_EVENTS, buf, sctp_sockopt_event_subscribe_size);
+ } else /* if (compiletime_size > sctp_sockopt_event_subscribe_size) */ {
+ /* we are using a newer userspace with an older kernel and hence need to truncate
+ * the data - but only if the caller didn't try to enable any of the events of the
+ * truncated portion */
+ rc = byte_nonzero((const uint8_t *)event, sctp_sockopt_event_subscribe_size,
+ compiletime_size);
+ if (rc >= 0) {
+ LOGP(DLINP, LOGL_ERROR, "Kernel only supports sctp_event_subscribe of %u bytes, "
+ "but caller tried to enable more modern event at offset %u\n",
+ sctp_sockopt_event_subscribe_size, rc);
+ return -1;
+ }
+
+ return setsockopt(fd, IPPROTO_SCTP, SCTP_EVENTS, event, sctp_sockopt_event_subscribe_size);
+ }
+}
+#endif // HAVE_LIBSCTP
-static int sctp_sock_activate_events(int fd)
+int stream_sctp_sock_activate_events(int fd)
{
#ifdef HAVE_LIBSCTP
struct sctp_event_subscribe event;
@@ -84,32 +159,31 @@ static int sctp_sock_activate_events(int fd)
event.sctp_data_io_event = 1;
event.sctp_association_event = 1;
event.sctp_address_event = 1;
- event.sctp_address_event = 1;
event.sctp_send_failure_event = 1;
event.sctp_peer_error_event = 1;
event.sctp_shutdown_event = 1;
/* IMPORTANT: Do NOT enable sender_dry_event here, see
* https://bugzilla.redhat.com/show_bug.cgi?id=1442784 */
- rc = setsockopt(fd, IPPROTO_SCTP, SCTP_EVENTS,
- &event, sizeof(event));
+ rc = sctp_setsockopt_events_linux_workaround(fd, &event);
if (rc < 0)
- LOGP(DLINP, LOGL_ERROR, "couldn't activate SCTP events "
- "on FD %u\n", fd);
+ LOGP(DLINP, LOGL_ERROR, "couldn't activate SCTP events on FD %u\n", fd);
return rc;
#else
return -1;
#endif
}
-static int setsockopt_nodelay(int fd, int proto, int on)
+int stream_setsockopt_nodelay(int fd, int proto, int on)
{
int rc;
switch (proto) {
+#ifdef HAVE_LIBSCTP
case IPPROTO_SCTP:
rc = setsockopt(fd, IPPROTO_SCTP, SCTP_NODELAY, &on, sizeof(on));
break;
+#endif
case IPPROTO_TCP:
rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on));
break;
@@ -122,1027 +196,145 @@ static int setsockopt_nodelay(int fd, int proto, int on)
return rc;
}
-
-/*
- * Client side.
- */
-
-enum osmo_stream_cli_state {
- STREAM_CLI_STATE_NONE = 0,
- STREAM_CLI_STATE_CONNECTING = 1,
- STREAM_CLI_STATE_CONNECTED = 2,
- STREAM_CLI_STATE_MAX
-};
-
-static const struct value_string stream_cli_state_names[] = {
- { STREAM_CLI_STATE_NONE, " NONE" },
- { STREAM_CLI_STATE_CONNECTING, "CONNECTING" },
- { STREAM_CLI_STATE_CONNECTED, " CONNECTED" },
- { 0, NULL }
-};
-
-#define OSMO_STREAM_CLI_F_RECONF (1 << 0)
-#define OSMO_STREAM_CLI_F_NODELAY (1 << 1)
-
-struct osmo_stream_cli {
- struct osmo_fd ofd;
- struct llist_head tx_queue;
- struct osmo_timer_list timer;
- enum osmo_stream_cli_state state;
- char *addr;
- uint16_t port;
- char *local_addr;
- uint16_t local_port;
- uint16_t proto;
- int (*connect_cb)(struct osmo_stream_cli *srv);
- int (*disconnect_cb)(struct osmo_stream_cli *srv);
- int (*read_cb)(struct osmo_stream_cli *srv);
- int (*write_cb)(struct osmo_stream_cli *srv);
- void *data;
- int flags;
- int reconnect_timeout;
-};
-
-void osmo_stream_cli_close(struct osmo_stream_cli *cli);
-
-/*! \brief Re-connect an Osmocom Stream Client
- * If re-connection is enabled for this client
- * (which is the case unless negative timeout was explicitly set via osmo_stream_cli_set_reconnect_timeout() call),
- * we close any existing connection (if any) and schedule a re-connect timer */
-void osmo_stream_cli_reconnect(struct osmo_stream_cli *cli)
-{
- osmo_stream_cli_close(cli);
-
- if (cli->reconnect_timeout < 0) {
- LOGSCLI(cli, LOGL_INFO, "not reconnecting, disabled.\n");
- return;
- }
-
- LOGSCLI(cli, LOGL_INFO, "retrying in %d seconds...\n",
- cli->reconnect_timeout);
- osmo_timer_schedule(&cli->timer, cli->reconnect_timeout, 0);
- cli->state = STREAM_CLI_STATE_CONNECTING;
-}
-
-/*! \brief Close an Osmocom Stream Client
- * \param[in] cli Osmocom Stream Client to be closed
- * We unregister the socket fd from the osmocom select() loop
- * abstraction and close the socket */
-void osmo_stream_cli_close(struct osmo_stream_cli *cli)
-{
- if (cli->ofd.fd == -1)
- return;
- osmo_fd_unregister(&cli->ofd);
- close(cli->ofd.fd);
- cli->ofd.fd = -1;
-
- if (cli->state == STREAM_CLI_STATE_CONNECTED) {
- LOGSCLI(cli, LOGL_DEBUG, "connection closed\n");
- if (cli->disconnect_cb)
- cli->disconnect_cb(cli);
- }
-
- cli->state = STREAM_CLI_STATE_NONE;
-}
-
-static void osmo_stream_cli_read(struct osmo_stream_cli *cli)
-{
- LOGSCLI(cli, LOGL_DEBUG, "message received\n");
-
- if (cli->read_cb)
- cli->read_cb(cli);
-}
-
-static int osmo_stream_cli_write(struct osmo_stream_cli *cli)
-{
#ifdef HAVE_LIBSCTP
- struct sctp_sndrcvinfo sinfo;
-#endif
- struct msgb *msg;
- struct llist_head *lh;
- int ret;
-
- LOGSCLI(cli, LOGL_DEBUG, "sending data\n");
-
- if (llist_empty(&cli->tx_queue)) {
- cli->ofd.when &= ~BSC_FD_WRITE;
- return 0;
- }
- lh = cli->tx_queue.next;
- llist_del(lh);
- msg = llist_entry(lh, struct msgb, list);
-
- if (cli->state == STREAM_CLI_STATE_CONNECTING) {
- LOGSCLI(cli, LOGL_ERROR, "not connected, dropping data!\n");
- return 0;
- }
-
- switch (cli->proto) {
-#ifdef HAVE_LIBSCTP
- case IPPROTO_SCTP:
- memset(&sinfo, 0, sizeof(sinfo));
- sinfo.sinfo_ppid = htonl(msgb_sctp_ppid(msg));
- sinfo.sinfo_stream = msgb_sctp_stream(msg);
- ret = sctp_send(cli->ofd.fd, msg->data, msgb_length(msg),
- &sinfo, MSG_NOSIGNAL);
- break;
-#endif
- case IPPROTO_TCP:
- default:
- ret = send(cli->ofd.fd, msg->data, msg->len, 0);
- break;
- }
- if (ret < 0) {
- if (errno == EPIPE || errno == ENOTCONN) {
- osmo_stream_cli_reconnect(cli);
- }
- LOGSCLI(cli, LOGL_ERROR, "error to send\n");
- }
- msgb_free(msg);
- return 0;
-}
-
-static int osmo_stream_cli_fd_cb(struct osmo_fd *ofd, unsigned int what)
+static int stream_sctp_recvmsg_trailer(const char *log_pfx, struct msgb *msg, int ret, const struct sctp_sndrcvinfo *sinfo, int flags)
{
- struct osmo_stream_cli *cli = ofd->data;
- int error, ret;
- socklen_t len = sizeof(error);
-
- switch(cli->state) {
- case STREAM_CLI_STATE_CONNECTING:
- ret = getsockopt(ofd->fd, SOL_SOCKET, SO_ERROR, &error, &len);
- if (ret >= 0 && error > 0) {
- osmo_stream_cli_reconnect(cli);
- return 0;
- }
- ofd->when &= ~BSC_FD_WRITE;
- LOGSCLI(cli, LOGL_DEBUG, "connection done.\n");
- cli->state = STREAM_CLI_STATE_CONNECTED;
- if (cli->proto == IPPROTO_SCTP) {
-#ifdef SO_NOSIGPIPE
- int val = 1;
-
- ret = setsockopt(ofd->fd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&val, sizeof(val));
- if (ret < 0)
- LOGSCLI(cli, LOGL_DEBUG, "Failed setting SO_NOSIGPIPE: %s\n", strerror(errno));
-#endif
- sctp_sock_activate_events(ofd->fd);
- }
- if (cli->connect_cb)
- cli->connect_cb(cli);
- break;
- case STREAM_CLI_STATE_CONNECTED:
- if (what & BSC_FD_READ) {
- LOGSCLI(cli, LOGL_DEBUG, "connected read\n");
- osmo_stream_cli_read(cli);
- }
- if (what & BSC_FD_WRITE) {
- LOGSCLI(cli, LOGL_DEBUG, "connected write\n");
- osmo_stream_cli_write(cli);
- }
- break;
- default:
- break;
- }
- return 0;
-}
-
-static void cli_timer_cb(void *data);
-
-/*! \brief Create an Osmocom stream client
- * \param[in] ctx talloc context from which to allocate memory
- * This function allocates a new \ref osmo_stream_cli and initializes
- * it with default values (5s reconnect timer, TCP protocol) */
-struct osmo_stream_cli *osmo_stream_cli_create(void *ctx)
-{
- struct osmo_stream_cli *cli;
-
- cli = talloc_zero(ctx, struct osmo_stream_cli);
- if (!cli)
- return NULL;
-
- cli->proto = IPPROTO_TCP;
- cli->ofd.fd = -1;
- cli->ofd.when |= BSC_FD_READ | BSC_FD_WRITE;
- cli->ofd.priv_nr = 0; /* XXX */
- cli->ofd.cb = osmo_stream_cli_fd_cb;
- cli->ofd.data = cli;
- cli->state = STREAM_CLI_STATE_CONNECTING;
- osmo_timer_setup(&cli->timer, cli_timer_cb, cli);
- cli->reconnect_timeout = 5; /* default is 5 seconds. */
- INIT_LLIST_HEAD(&cli->tx_queue);
-
- return cli;
-}
-
-/*! \brief Set the remote address to which we connect
- * \param[in] cli Stream Client to modify
- * \param[in] addr Remote IP address
- */
-void
-osmo_stream_cli_set_addr(struct osmo_stream_cli *cli, const char *addr)
-{
- osmo_talloc_replace_string(cli, &cli->addr, addr);
- cli->flags |= OSMO_STREAM_CLI_F_RECONF;
-}
-
-/*! \brief Set the remote port number to which we connect
- * \param[in] cli Stream Client to modify
- * \param[in] port Remote port number
- */
-void
-osmo_stream_cli_set_port(struct osmo_stream_cli *cli, uint16_t port)
-{
- cli->port = port;
- cli->flags |= OSMO_STREAM_CLI_F_RECONF;
-}
-
-/*! \brief Set the local port number for the socket (to be bound to)
- * \param[in] cli Stream Client to modify
- * \param[in] port Local port number
- */
-void
-osmo_stream_cli_set_local_port(struct osmo_stream_cli *cli, uint16_t port)
-{
- cli->local_port = port;
- cli->flags |= OSMO_STREAM_CLI_F_RECONF;
-}
-
-/*! \brief Set the local address for the socket (to be bound to)
- * \param[in] cli Stream Client to modify
- * \param[in] port Local host name
- */
-void
-osmo_stream_cli_set_local_addr(struct osmo_stream_cli *cli, const char *addr)
-{
- osmo_talloc_replace_string(cli, &cli->local_addr, addr);
- cli->flags |= OSMO_STREAM_CLI_F_RECONF;
-}
-
-/*! \brief Set the protocol for the stream client socket
- * \param[in] cli Stream Client to modify
- * \param[in] proto Protocol (like IPPROTO_TCP (default), IPPROTO_SCTP, ...)
- */
-void
-osmo_stream_cli_set_proto(struct osmo_stream_cli *cli, uint16_t proto)
-{
- cli->proto = proto;
- cli->flags |= OSMO_STREAM_CLI_F_RECONF;
-}
-
-/*! \brief Set the reconnect time of the stream client socket
- * \param[in] cli Stream Client to modify
- * \param[in] timeout Re-connect timeout in seconds or negative value to disable auto-reconnection */
-void
-osmo_stream_cli_set_reconnect_timeout(struct osmo_stream_cli *cli, int timeout)
-{
- cli->reconnect_timeout = timeout;
-}
-
-/*! \brief Set application private data of the stream client socket
- * \param[in] cli Stream Client to modify
- * \param[in] data User-specific data (available in call-back functions) */
-void
-osmo_stream_cli_set_data(struct osmo_stream_cli *cli, void *data)
-{
- cli->data = data;
-}
-
-/*! \brief Get application private data of the stream client socket
- * \param[in] cli Stream Client to modify
- * \returns Application private data, as set by \ref osmo_stream_cli_set_data() */
-void *osmo_stream_cli_get_data(struct osmo_stream_cli *cli)
-{
- return cli->data;
-}
-
-/*! \brief Get the stream client socket description.
- * \param[in] cli Stream Client to examine
- * \returns Socket description or NULL in case of error */
-char *osmo_stream_cli_get_sockname(const struct osmo_stream_cli *cli)
-{
- static char buf[OSMO_SOCK_NAME_MAXLEN];
-
- osmo_sock_get_name_buf(buf, OSMO_SOCK_NAME_MAXLEN, cli->ofd.fd);
-
- return buf;
-}
-
-/*! \brief Get Osmocom File Descriptor of the stream client socket
- * \param[in] cli Stream Client to modify
- * \returns Pointer to \ref osmo_fd */
-struct osmo_fd *
-osmo_stream_cli_get_ofd(struct osmo_stream_cli *cli)
-{
- return &cli->ofd;
-}
-
-/*! \brief Set the call-back function called on connect of the stream client socket
- * \param[in] cli Stream Client to modify
- * \param[in] connect_cb Call-back function to be called upon connect */
-void
-osmo_stream_cli_set_connect_cb(struct osmo_stream_cli *cli,
- int (*connect_cb)(struct osmo_stream_cli *cli))
-{
- cli->connect_cb = connect_cb;
-}
-
-/*! \brief Set the call-back function called on disconnect of the stream client socket
- * \param[in] cli Stream Client to modify
- * \param[in] disconnect_cb Call-back function to be called upon disconnect */
-void osmo_stream_cli_set_disconnect_cb(struct osmo_stream_cli *cli,
- int (*disconnect_cb)(struct osmo_stream_cli *cli))
-{
- cli->disconnect_cb = disconnect_cb;
-}
-
-/*! \brief Set the call-back function called to read from the stream client socket
- * \param[in] cli Stream Client to modify
- * \param[in] read_cb Call-back function to be called when we want to read */
-void
-osmo_stream_cli_set_read_cb(struct osmo_stream_cli *cli,
- int (*read_cb)(struct osmo_stream_cli *cli))
-{
- cli->read_cb = read_cb;
-}
-
-/*! \brief Destroy a Osmocom stream client (includes close)
- * \param[in] cli Stream Client to destroy */
-void osmo_stream_cli_destroy(struct osmo_stream_cli *cli)
-{
- osmo_stream_cli_close(cli);
- osmo_timer_del(&cli->timer);
- msgb_queue_free(&cli->tx_queue);
- talloc_free(cli);
-}
-
-/*! \brief DEPRECATED: use osmo_stream_cli_set_reconnect_timeout() or osmo_stream_cli_reconnect() instead!
- * Open connection of an Osmocom stream client
- * \param[in] cli Stream Client to connect
- * \param[in] reconect 1 if we should not automatically reconnect
- */
-int osmo_stream_cli_open2(struct osmo_stream_cli *cli, int reconnect)
-{
- int ret;
-
- /* we are reconfiguring this socket, close existing first. */
- if ((cli->flags & OSMO_STREAM_CLI_F_RECONF) && cli->ofd.fd >= 0)
- osmo_stream_cli_close(cli);
-
- cli->flags &= ~OSMO_STREAM_CLI_F_RECONF;
-
- ret = osmo_sock_init2(AF_INET, SOCK_STREAM, cli->proto,
- cli->local_addr, cli->local_port,
- cli->addr, cli->port,
- OSMO_SOCK_F_CONNECT|OSMO_SOCK_F_BIND|OSMO_SOCK_F_NONBLOCK);
- if (ret < 0) {
- if (reconnect)
- osmo_stream_cli_reconnect(cli);
- return ret;
+ msgb_sctp_msg_flags(msg) = 0;
+ if (OSMO_LIKELY(sinfo)) {
+ msgb_sctp_ppid(msg) = ntohl(sinfo->sinfo_ppid);
+ msgb_sctp_stream(msg) = sinfo->sinfo_stream;
}
- cli->ofd.fd = ret;
-
- if (cli->flags & OSMO_STREAM_CLI_F_NODELAY) {
- ret = setsockopt_nodelay(cli->ofd.fd, cli->proto, 1);
- if (ret < 0)
- goto error_close_socket;
- }
-
- if (osmo_fd_register(&cli->ofd) < 0)
- goto error_close_socket;
-
- return 0;
-
-error_close_socket:
- close(ret);
- cli->ofd.fd = -1;
- return -EIO;
-}
-
-/*! \brief Set the NODELAY socket option to avoid Nagle-like behavior
- * Setting this to nodelay=true will automatically set the NODELAY
- * socket option on any socket established via \ref osmo_stream_cli_open
- * or any re-connect. You have to set this _before_ opening the
- * socket.
- * \param[in] cli Stream client whose sockets are to be configured
- * \param[in] nodelay whether to set (true) NODELAY before connect()
- */
-void osmo_stream_cli_set_nodelay(struct osmo_stream_cli *cli, bool nodelay)
-{
- if (nodelay)
- cli->flags |= OSMO_STREAM_CLI_F_NODELAY;
- else
- cli->flags &= ~OSMO_STREAM_CLI_F_NODELAY;
-}
-
-/*! \brief Open connection of an Osmocom stream client
- * By default the client will automatically reconnect after default timeout.
- * To disable this, use osmo_stream_cli_set_reconnect_timeout() before calling this function.
- * \param[in] cli Stream Client to connect */
-int osmo_stream_cli_open(struct osmo_stream_cli *cli)
-{
- int ret;
-
- /* we are reconfiguring this socket, close existing first. */
- if ((cli->flags & OSMO_STREAM_CLI_F_RECONF) && cli->ofd.fd >= 0)
- osmo_stream_cli_close(cli);
-
- cli->flags &= ~OSMO_STREAM_CLI_F_RECONF;
-
- ret = osmo_sock_init2(AF_INET, SOCK_STREAM, cli->proto,
- cli->local_addr, cli->local_port,
- cli->addr, cli->port,
- OSMO_SOCK_F_CONNECT|OSMO_SOCK_F_BIND|OSMO_SOCK_F_NONBLOCK);
- if (ret < 0) {
- osmo_stream_cli_reconnect(cli);
- return ret;
- }
- cli->ofd.fd = ret;
-
- if (cli->flags & OSMO_STREAM_CLI_F_NODELAY) {
- ret = setsockopt_nodelay(cli->ofd.fd, cli->proto, 1);
- if (ret < 0)
- goto error_close_socket;
- }
-
- if (osmo_fd_register(&cli->ofd) < 0)
- goto error_close_socket;
-
- return 0;
-
-error_close_socket:
- close(ret);
- cli->ofd.fd = -1;
- return -EIO;
-}
-static void cli_timer_cb(void *data)
-{
- struct osmo_stream_cli *cli = data;
-
- LOGSCLI(cli, LOGL_DEBUG, "reconnecting.\n");
-
- switch(cli->state) {
- case STREAM_CLI_STATE_CONNECTING:
- cli->ofd.when |= BSC_FD_READ | BSC_FD_WRITE;
- osmo_stream_cli_open(cli);
- break;
- default:
- break;
- }
-}
-
-/*! \brief Enqueue data to be sent via an Osmocom stream client
- * \param[in] cli Stream Client through which we want to send
- * \param[in] msg Message buffer to enqueue in transmit queue */
-void osmo_stream_cli_send(struct osmo_stream_cli *cli, struct msgb *msg)
-{
- msgb_enqueue(&cli->tx_queue, msg);
- cli->ofd.when |= BSC_FD_WRITE;
-}
-
-/*! \brief Receive data via an Osmocom stream client
- * \param[in] cli Stream Client through which we want to send
- * \param msg pre-allocate message buffer to which received data is appended
- * \returns number of bytes read; <=0 in case of error */
-int osmo_stream_cli_recv(struct osmo_stream_cli *cli, struct msgb *msg)
-{
- int ret;
-
- ret = recv(cli->ofd.fd, msg->data, msg->data_len, 0);
- if (ret < 0) {
- if (errno == EPIPE || errno == ECONNRESET) {
- LOGSCLI(cli, LOGL_ERROR, "lost connection with srv\n");
+ if (flags & MSG_NOTIFICATION) {
+ char buf[512];
+ struct osmo_strbuf sb = { .buf = buf, .len = sizeof(buf) };
+ int logl = LOGL_INFO;
+ union sctp_notification *notif = (union sctp_notification *) msg->data;
+
+ OSMO_STRBUF_PRINTF(sb, "%s NOTIFICATION %s flags=0x%x", log_pfx,
+ osmo_sctp_sn_type_str(notif->sn_header.sn_type), notif->sn_header.sn_flags);
+ msgb_put(msg, sizeof(union sctp_notification));
+ msgb_sctp_msg_flags(msg) = OSMO_STREAM_SCTP_MSG_FLAGS_NOTIFICATION;
+ ret = -EAGAIN;
+
+ switch (notif->sn_header.sn_type) {
+ case SCTP_ASSOC_CHANGE:
+ OSMO_STRBUF_PRINTF(sb, " %s", osmo_sctp_assoc_chg_str(notif->sn_assoc_change.sac_state));
+ switch (notif->sn_assoc_change.sac_state) {
+ case SCTP_COMM_UP:
+ break;
+ case SCTP_COMM_LOST:
+ OSMO_STRBUF_PRINTF(sb, " (err: %s)",
+ osmo_sctp_sn_error_str(notif->sn_assoc_change.sac_error));
+ /* Handle this like a regular disconnect */
+ ret = 0;
+ break;
+ case SCTP_RESTART:
+ case SCTP_SHUTDOWN_COMP:
+ logl = LOGL_NOTICE;
+ break;
+ case SCTP_CANT_STR_ASSOC:
+ break;
+ }
+ break;
+ case SCTP_SEND_FAILED:
+ logl = LOGL_ERROR;
+ break;
+ case SCTP_PEER_ADDR_CHANGE:
+ {
+ char addr_str[INET6_ADDRSTRLEN + 10];
+ struct sockaddr_storage sa = notif->sn_paddr_change.spc_aaddr;
+ osmo_sockaddr_to_str_buf(addr_str, sizeof(addr_str),
+ (const struct osmo_sockaddr *)&sa);
+ OSMO_STRBUF_PRINTF(sb, " %s %s err=%s",
+ osmo_sctp_paddr_chg_str(notif->sn_paddr_change.spc_state), addr_str,
+ (notif->sn_paddr_change.spc_state == SCTP_ADDR_UNREACHABLE) ?
+ osmo_sctp_sn_error_str(notif->sn_paddr_change.spc_error) : "None");
+ }
+ break;
+ case SCTP_SHUTDOWN_EVENT:
+ logl = LOGL_NOTICE;
+ /* RFC6458 3.1.4: Any attempt to send more data will cause sendmsg()
+ * to return with an ESHUTDOWN error. */
+ break;
+ case SCTP_REMOTE_ERROR:
+ logl = LOGL_NOTICE;
+ OSMO_STRBUF_PRINTF(sb, " %s", osmo_sctp_op_error_str(ntohs(notif->sn_remote_error.sre_error)));
+ break;
}
- osmo_stream_cli_reconnect(cli);
- return ret;
- } else if (ret == 0) {
- LOGSCLI(cli, LOGL_ERROR, "connection closed with srv\n");
- osmo_stream_cli_reconnect(cli);
+ LOGP(DLINP, logl, "%s\n", buf);
return ret;
}
- msgb_put(msg, ret);
- LOGSCLI(cli, LOGL_DEBUG, "received %d bytes from srv\n", ret);
- return ret;
-}
-/*
- * Server side.
- */
-
-#define OSMO_STREAM_SRV_F_RECONF (1 << 0)
-#define OSMO_STREAM_SRV_F_NODELAY (1 << 1)
-
-struct osmo_stream_srv_link {
- struct osmo_fd ofd;
- char *addr;
- uint16_t port;
- uint16_t proto;
- int (*accept_cb)(struct osmo_stream_srv_link *srv, int fd);
- void *data;
- int flags;
-};
-
-static int osmo_stream_srv_fd_cb(struct osmo_fd *ofd, unsigned int what)
-{
- int ret;
- int sock_fd;
- struct sockaddr_in sa;
- socklen_t sa_len = sizeof(sa);
- struct osmo_stream_srv_link *link = ofd->data;
+ if (OSMO_UNLIKELY(ret > 0 && !sinfo))
+ LOGP(DLINP, LOGL_ERROR, "%s sctp_recvmsg without SNDRCV cmsg?!?\n", log_pfx);
- ret = accept(ofd->fd, (struct sockaddr *)&sa, &sa_len);
- if (ret < 0) {
- LOGP(DLINP, LOGL_ERROR, "failed to accept from origin "
- "peer, reason=`%s'\n", strerror(errno));
- return ret;
- }
- LOGP(DLINP, LOGL_DEBUG, "accept()ed new link from %s to port %u\n",
- inet_ntoa(sa.sin_addr), link->port);
- sock_fd = ret;
-
- if (link->proto == IPPROTO_SCTP) {
- ret = sctp_sock_activate_events(sock_fd);
- if (ret < 0)
- goto error_close_socket;
- }
-
- if (link->flags & OSMO_STREAM_SRV_F_NODELAY) {
- ret = setsockopt_nodelay(sock_fd, link->proto, 1);
- if (ret < 0)
- goto error_close_socket;
- }
-
- if (!link->accept_cb) {
- ret = -ENOTSUP;
- goto error_close_socket;
- }
-
- ret = link->accept_cb(link, sock_fd);
- if (ret)
- goto error_close_socket;
- return 0;
-
-error_close_socket:
- close(sock_fd);
return ret;
}
-/*! \brief Create an Osmocom Stream Server Link
- * A Stream Server Link is the listen()+accept() "parent" to individual
- * Stream Servers
- * \param[in] ctx talloc allocation context
- * \returns Stream Server Link with default values (TCP)
- */
-struct osmo_stream_srv_link *osmo_stream_srv_link_create(void *ctx)
-{
- struct osmo_stream_srv_link *link;
-
- link = talloc_zero(ctx, struct osmo_stream_srv_link);
- if (!link)
- return NULL;
-
- link->proto = IPPROTO_TCP;
- link->ofd.fd = -1;
- link->ofd.when |= BSC_FD_READ | BSC_FD_WRITE;
- link->ofd.cb = osmo_stream_srv_fd_cb;
- link->ofd.data = link;
-
- return link;
-}
-
-/*! \brief Set the NODELAY socket option to avoid Nagle-like behavior
- * Setting this to nodelay=true will automatically set the NODELAY
- * socket option on any socket established via this server link, before
- * calling the accept_cb()
- * \param[in] link server link whose sockets are to be configured
- * \param[in] nodelay whether to set (true) NODELAY after accept
- */
-void osmo_stream_srv_link_set_nodelay(struct osmo_stream_srv_link *link, bool nodelay)
-{
- if (nodelay)
- link->flags |= OSMO_STREAM_SRV_F_NODELAY;
- else
- link->flags &= ~OSMO_STREAM_SRV_F_NODELAY;
-}
-
-/*! \brief Set the local address to which we bind
- * \param[in] link Stream Server Link to modify
- * \param[in] addr Local IP address
- */
-void osmo_stream_srv_link_set_addr(struct osmo_stream_srv_link *link,
- const char *addr)
-{
- osmo_talloc_replace_string(link, &link->addr, addr);
- link->flags |= OSMO_STREAM_SRV_F_RECONF;
-}
-
-/*! \brief Set the local port number to which we bind
- * \param[in] link Stream Server Link to modify
- * \param[in] port Local port number
- */
-void osmo_stream_srv_link_set_port(struct osmo_stream_srv_link *link,
- uint16_t port)
-{
- link->port = port;
- link->flags |= OSMO_STREAM_SRV_F_RECONF;
-}
-
-/*! \brief Set the protocol for the stream server link
- * \param[in] link Stream Server Link to modify
- * \param[in] proto Protocol (like IPPROTO_TCP (default), IPPROTO_SCTP, ...)
- */
-void
-osmo_stream_srv_link_set_proto(struct osmo_stream_srv_link *link,
- uint16_t proto)
-{
- link->proto = proto;
- link->flags |= OSMO_STREAM_SRV_F_RECONF;
-}
-
-/*! \brief Set application private data of the stream server link
- * \param[in] link Stream Server Link to modify
- * \param[in] data User-specific data (available in call-back functions) */
-void
-osmo_stream_srv_link_set_data(struct osmo_stream_srv_link *link,
- void *data)
-{
- link->data = data;
-}
-
-/*! \brief Get application private data of the stream server link
- * \param[in] link Stream Server Link to modify
- * \returns Application private data, as set by \ref osmo_stream_cli_set_data() */
-void *osmo_stream_srv_link_get_data(struct osmo_stream_srv_link *link)
-{
- return link->data;
-}
-
-/*! \brief Get description of the stream server link e. g. 127.0.0.1:1234
- * \param[in] link Stream Server Link to examine
- * \returns Link description or NULL in case of error */
-char *osmo_stream_srv_link_get_sockname(const struct osmo_stream_srv_link *link)
-{
- static char buf[INET6_ADDRSTRLEN + 6];
- int rc = osmo_sock_get_local_ip(link->ofd.fd, buf, INET6_ADDRSTRLEN);
- if (rc < 0)
- return NULL;
-
- buf[strnlen(buf, INET6_ADDRSTRLEN + 6)] = ':';
-
- rc = osmo_sock_get_local_ip_port(link->ofd.fd, buf + strnlen(buf, INET6_ADDRSTRLEN + 6), 6);
- if (rc < 0)
- return NULL;
-
- return buf;
-}
-
-/*! \brief Get Osmocom File Descriptor of the stream server link
- * \param[in] link Stream Server Link
- * \returns Pointer to \ref osmo_fd */
-struct osmo_fd *
-osmo_stream_srv_link_get_ofd(struct osmo_stream_srv_link *link)
-{
- return &link->ofd;
-}
-
-/*! \brief Set the accept() call-back of the stream server link
- * \param[in] link Stream Server Link
- * \param[in] accept_cb Call-back function executed upon accept() */
-void osmo_stream_srv_link_set_accept_cb(struct osmo_stream_srv_link *link,
- int (*accept_cb)(struct osmo_stream_srv_link *link, int fd))
-
-{
- link->accept_cb = accept_cb;
-}
-
-/*! \brief Destroy the stream server link. Closes + Releases Memory.
- * \param[in] link Stream Server Link */
-void osmo_stream_srv_link_destroy(struct osmo_stream_srv_link *link)
-{
- osmo_stream_srv_link_close(link);
- talloc_free(link);
-}
-
-/*! \brief Open the stream server link. This actually initializes the
- * underlying socket and binds it to the configured ip/port
- * \param[in] link Stream Server Link to open */
-int osmo_stream_srv_link_open(struct osmo_stream_srv_link *link)
-{
- int ret;
-
- if (link->ofd.fd >= 0) {
- /* No reconfigure needed for existing socket, we are fine */
- if (!(link->flags & OSMO_STREAM_SRV_F_RECONF))
- return 0;
- /* we are reconfiguring this socket, close existing first. */
- osmo_stream_srv_link_close(link);
- }
-
- link->flags &= ~OSMO_STREAM_SRV_F_RECONF;
-
- ret = osmo_sock_init(AF_INET, SOCK_STREAM, link->proto,
- link->addr, link->port, OSMO_SOCK_F_BIND);
- if (ret < 0)
- return ret;
-
- link->ofd.fd = ret;
- if (osmo_fd_register(&link->ofd) < 0) {
- close(ret);
- link->ofd.fd = -1;
- return -EIO;
- }
- return 0;
-}
-
-/*! \brief Close the stream server link and unregister from select loop
- * Does not destroy the server link, merely closes it!
- * \param[in] link Stream Server Link to close */
-void osmo_stream_srv_link_close(struct osmo_stream_srv_link *link)
-{
- if (link->ofd.fd == -1)
- return;
- osmo_fd_unregister(&link->ofd);
- close(link->ofd.fd);
- link->ofd.fd = -1;
-}
-
-#define OSMO_STREAM_SRV_F_FLUSH_DESTROY (1 << 0)
-
-struct osmo_stream_srv {
- struct osmo_stream_srv_link *srv;
- struct osmo_fd ofd;
- struct llist_head tx_queue;
- int (*closed_cb)(struct osmo_stream_srv *peer);
- int (*cb)(struct osmo_stream_srv *peer);
- void *data;
- int flags;
-};
-
-static int osmo_stream_srv_read(struct osmo_stream_srv *conn)
-{
- int rc = 0;
-
- LOGP(DLINP, LOGL_DEBUG, "message received\n");
-
- if (conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY) {
- LOGP(DLINP, LOGL_DEBUG, "Connection is being flushed and closed; ignoring received message\n");
- return 0;
- }
-
- if (conn->cb)
- rc = conn->cb(conn);
-
- return rc;
-}
-
-static void osmo_stream_srv_write(struct osmo_stream_srv *conn)
+/*! wrapper for regular synchronous sctp_recvmsg(3) */
+int stream_sctp_recvmsg_wrapper(int fd, struct msgb *msg, const char *log_pfx)
{
-#ifdef HAVE_LIBSCTP
struct sctp_sndrcvinfo sinfo;
-#endif
- struct msgb *msg;
- struct llist_head *lh;
+ int flags = 0;
int ret;
- LOGP(DLINP, LOGL_DEBUG, "sending data\n");
-
- if (llist_empty(&conn->tx_queue)) {
- conn->ofd.when &= ~BSC_FD_WRITE;
- return;
- }
- lh = conn->tx_queue.next;
- llist_del(lh);
- msg = llist_entry(lh, struct msgb, list);
-
- switch (conn->srv->proto) {
-#ifdef HAVE_LIBSCTP
- case IPPROTO_SCTP:
- memset(&sinfo, 0, sizeof(sinfo));
- sinfo.sinfo_ppid = htonl(msgb_sctp_ppid(msg));
- sinfo.sinfo_stream = msgb_sctp_stream(msg);
- ret = sctp_send(conn->ofd.fd, msg->data, msgb_length(msg),
- &sinfo, MSG_NOSIGNAL);
- break;
-#endif
- case IPPROTO_TCP:
- default:
- ret = send(conn->ofd.fd, msg->data, msg->len, 0);
- break;
- }
- if (ret < 0) {
- LOGP(DLINP, LOGL_ERROR, "error to send\n");
- }
- msgb_free(msg);
-
- if (llist_empty(&conn->tx_queue) && (conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY))
- osmo_stream_srv_destroy(conn);
+ ret = sctp_recvmsg(fd, msg->tail, msgb_tailroom(msg), NULL, NULL, &sinfo, &flags);
+ return stream_sctp_recvmsg_trailer(log_pfx, msg, ret, &sinfo, flags);
}
-static int osmo_stream_srv_cb(struct osmo_fd *ofd, unsigned int what)
+/*! wrapper for osmo_io asynchronous recvmsg response */
+int stream_iofd_sctp_recvmsg_trailer(struct osmo_io_fd *iofd, struct msgb *msg, int ret, const struct msghdr *msgh)
{
- struct osmo_stream_srv *conn = ofd->data;
- int rc = 0;
-
- LOGP(DLINP, LOGL_DEBUG, "connected read/write\n");
- if (what & BSC_FD_READ)
- rc = osmo_stream_srv_read(conn);
- if (rc != -EBADF && (what & BSC_FD_WRITE))
- osmo_stream_srv_write(conn);
-
- return rc;
-}
+ const struct sctp_sndrcvinfo *sinfo = NULL;
+ struct cmsghdr *cmsg = NULL;
-/*! \brief Create a Stream Server inside the specified link
- * \param[in] ctx talloc allocation context from which to allocate
- * \param[in] link Stream Server Link to which we belong
- * \returns Stream Server in case of success; NULL on error */
-struct osmo_stream_srv *
-osmo_stream_srv_create(void *ctx, struct osmo_stream_srv_link *link,
- int fd,
- int (*cb)(struct osmo_stream_srv *conn),
- int (*closed_cb)(struct osmo_stream_srv *conn), void *data)
-{
- struct osmo_stream_srv *conn;
-
- conn = talloc_zero(ctx, struct osmo_stream_srv);
- if (conn == NULL) {
- LOGP(DLINP, LOGL_ERROR, "cannot allocate new peer in srv, "
- "reason=`%s'\n", strerror(errno));
- return NULL;
- }
- conn->srv = link;
- conn->ofd.fd = fd;
- conn->ofd.data = conn;
- conn->ofd.cb = osmo_stream_srv_cb;
- conn->ofd.when = BSC_FD_READ;
- conn->cb = cb;
- conn->closed_cb = closed_cb;
- conn->data = data;
- INIT_LLIST_HEAD(&conn->tx_queue);
-
- if (osmo_fd_register(&conn->ofd) < 0) {
- LOGP(DLINP, LOGL_ERROR, "could not register FD\n");
- talloc_free(conn);
- return NULL;
+ for (cmsg = CMSG_FIRSTHDR((struct msghdr *) msgh); cmsg != NULL;
+ cmsg = CMSG_NXTHDR((struct msghdr *) msgh, cmsg)) {
+ if (cmsg->cmsg_level == IPPROTO_SCTP && cmsg->cmsg_type == SCTP_SNDRCV) {
+ sinfo = (const struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
+ break;
+ }
}
- return conn;
-}
-/*! \brief Prepare to send out all pending messages on the connection's Tx queue
- * and then automatically destroy the stream with osmo_stream_srv_destroy().
- * This function disables queuing of new messages on the connection and also
- * disables reception of new messages on the connection.
- * \param[in] conn Stream Server to modify */
-void osmo_stream_srv_set_flush_and_destroy(struct osmo_stream_srv *conn)
-{
- conn->flags |= OSMO_STREAM_SRV_F_FLUSH_DESTROY;
-}
-
-/*! \brief Set application private data of the stream server
- * \param[in] conn Stream Server to modify
- * \param[in] data User-specific data (available in call-back functions) */
-void
-osmo_stream_srv_set_data(struct osmo_stream_srv *conn,
- void *data)
-{
- conn->data = data;
-}
-
-/*! \brief Get application private data of the stream server
- * \param[in] conn Stream Server
- * \returns Application private data, as set by \ref osmo_stream_srv_set_data() */
-void *osmo_stream_srv_get_data(struct osmo_stream_srv *conn)
-{
- return conn->data;
+ return stream_sctp_recvmsg_trailer(osmo_iofd_get_name(iofd), msg, ret, sinfo, msgh->msg_flags);
}
-/*! \brief Get Osmocom File Descriptor of the stream server
- * \param[in] conn Stream Server
- * \returns Pointer to \ref osmo_fd */
-struct osmo_fd *
-osmo_stream_srv_get_ofd(struct osmo_stream_srv *conn)
+/*! Send a message through a connected SCTP socket, similar to sctp_sendmsg().
+ *
+ * Appends the message to the internal transmit queue.
+ * If the function returns success (0), it will take ownership of the msgb and
+ * internally call msgb_free() after the write request completes.
+ * In case of an error the msgb needs to be freed by the caller.
+ *
+ * \param[in] iofd file descriptor to write to
+ * \param[in] msg message buffer to send; uses msgb_sctp_ppid/msg_sctp_stream
+ * \param[in] sendmsg_flags Flags to pass to the send call
+ * \returns 0 in case of success; a negative value in case of error
+ */
+int stream_iofd_sctp_send_msgb(struct osmo_io_fd *iofd, struct msgb *msg, int sendmsg_flags)
{
- return &conn->ofd;
-}
+ struct msghdr outmsg = {};
+ char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
+ struct sctp_sndrcvinfo *sinfo;
+ struct cmsghdr *cmsg;
-/*! \brief Get the master (Link) from a Stream Server
- * \param[in] conn Stream Server of which we want to know the Link
- * \returns Link through which the given Stream Server is established */
-struct osmo_stream_srv_link *osmo_stream_srv_get_master(struct osmo_stream_srv *conn)
-{
- return conn->srv;
-}
+ outmsg.msg_control = outcmsg;
+ outmsg.msg_controllen = sizeof(outcmsg);
-/*! \brief Destroy given Stream Server
- * This function closes the Stream Server socket, unregisters from
- * select loop, invokes the connection's closed_cb() callback to allow API
- * users to clean up any associated state they have for this connection,
- * and then de-allocates associated memory.
- * \param[in] conn Stream Server to be destroyed */
-void osmo_stream_srv_destroy(struct osmo_stream_srv *conn)
-{
- close(conn->ofd.fd);
- osmo_fd_unregister(&conn->ofd);
- if (conn->closed_cb)
- conn->closed_cb(conn);
- msgb_queue_free(&conn->tx_queue);
- talloc_free(conn);
-}
+ cmsg = CMSG_FIRSTHDR(&outmsg);
+ cmsg->cmsg_level = IPPROTO_SCTP;
+ cmsg->cmsg_type = SCTP_SNDRCV;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
-/*! \brief Enqueue data to be sent via an Osmocom stream server
- * \param[in] conn Stream Server through which we want to send
- * \param[in] msg Message buffer to enqueue in transmit queue */
-void osmo_stream_srv_send(struct osmo_stream_srv *conn, struct msgb *msg)
-{
- if (conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY) {
- LOGP(DLINP, LOGL_DEBUG, "Connection is being flushed and closed; ignoring new outgoing message\n");
- return;
- }
+ outmsg.msg_controllen = cmsg->cmsg_len;
+ sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
+ memset(sinfo, 0, sizeof(struct sctp_sndrcvinfo));
+ sinfo->sinfo_ppid = htonl(msgb_sctp_ppid(msg));
+ sinfo->sinfo_stream = msgb_sctp_stream(msg);
- msgb_enqueue(&conn->tx_queue, msg);
- conn->ofd.when |= BSC_FD_WRITE;
+ return osmo_iofd_sendmsg_msgb(iofd, msg, sendmsg_flags, &outmsg);
}
-
-/*! \brief Receive data via Osmocom stream server
- * \param[in] conn Stream Server from which to receive
- * \param msg pre-allocate message buffer to which received data is appended
- * \returns number of bytes read, negative on error.
- */
-int osmo_stream_srv_recv(struct osmo_stream_srv *conn, struct msgb *msg)
-{
-#ifdef HAVE_LIBSCTP
- struct sctp_sndrcvinfo sinfo;
- int flags = 0;
#endif
- int ret;
-
- if (!msg)
- return -EINVAL;
-
- switch (conn->srv->proto) {
-#ifdef HAVE_LIBSCTP
- case IPPROTO_SCTP:
- ret = sctp_recvmsg(conn->ofd.fd, msgb_data(msg), msgb_tailroom(msg),
- NULL, NULL, &sinfo, &flags);
- if (flags & MSG_NOTIFICATION) {
- union sctp_notification *notif = (union sctp_notification *) msgb_data(msg);
- LOGP(DLINP, LOGL_DEBUG, "NOTIFICATION %u flags=0x%x\n", notif->sn_header.sn_type, notif->sn_header.sn_flags);
- switch (notif->sn_header.sn_type) {
- case SCTP_ASSOC_CHANGE:
- LOGP(DLINP, LOGL_DEBUG, "===> ASSOC CHANGE:");
- switch (notif->sn_assoc_change.sac_state) {
- case SCTP_COMM_UP:
- LOGPC(DLINP, LOGL_DEBUG, " UP\n");
- break;
- case SCTP_COMM_LOST:
- LOGPC(DLINP, LOGL_DEBUG, " LOST\n");
- break;
- case SCTP_RESTART:
- LOGPC(DLINP, LOGL_DEBUG, " RESTART\n");
- break;
- case SCTP_SHUTDOWN_COMP:
- LOGPC(DLINP, LOGL_DEBUG, " SHUTDOWN COMP\n");
- break;
- case SCTP_CANT_STR_ASSOC:
- LOGPC(DLINP, LOGL_DEBUG, " CANT STR ASSOC\n");
- break;
- }
- break;
- case SCTP_PEER_ADDR_CHANGE:
- LOGP(DLINP, LOGL_DEBUG, "===> PEER ADDR CHANGE\n");
- break;
- case SCTP_SHUTDOWN_EVENT:
- LOGP(DLINP, LOGL_DEBUG, "===> SHUTDOWN EVT\n");
- /* Handle this like a regular disconnect */
- return 0;
- break;
- }
- return -EAGAIN;
- }
- msgb_sctp_ppid(msg) = ntohl(sinfo.sinfo_ppid);
- msgb_sctp_stream(msg) = sinfo.sinfo_stream;
- break;
-#endif
- case IPPROTO_TCP:
- default:
- ret = recv(conn->ofd.fd, msgb_data(msg), msgb_tailroom(msg), 0);
- break;
- }
-
- if (ret < 0) {
- if (errno == EPIPE || errno == ECONNRESET) {
- LOGP(DLINP, LOGL_ERROR,
- "lost connection with srv\n");
- }
- return ret;
- } else if (ret == 0) {
- LOGP(DLINP, LOGL_ERROR, "connection closed with srv\n");
- return ret;
- }
- msgb_put(msg, ret);
- LOGP(DLINP, LOGL_DEBUG, "received %d bytes from client\n", ret);
- return ret;
-}
-/*! @} */
+/*! \endccond */
diff --git a/src/stream_cli.c b/src/stream_cli.c
new file mode 100644
index 0000000..d4067d6
--- /dev/null
+++ b/src/stream_cli.c
@@ -0,0 +1,1237 @@
+/* (C) 2011 by Pablo Neira Ayuso <pablo@gnumonks.org>
+ * (C) 2015-2016 by Harald Welte <laforge@gnumonks.org>
+ * (C) 2023 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
+ * All Rights Reserved.
+ *
+ * SPDX-License-Identifier: GPL-2.0+
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <string.h>
+#include <time.h>
+#include <sys/fcntl.h>
+#include <sys/socket.h>
+#include <sys/ioctl.h>
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+
+#include <osmocom/core/timer.h>
+#include <osmocom/core/select.h>
+#include <osmocom/core/utils.h>
+#include <osmocom/gsm/tlv.h>
+#include <osmocom/core/msgb.h>
+#include <osmocom/core/osmo_io.h>
+#include <osmocom/core/panic.h>
+#include <osmocom/core/logging.h>
+#include <osmocom/core/talloc.h>
+#include <osmocom/core/socket.h>
+
+#include <osmocom/netif/stream.h>
+#include <osmocom/netif/stream_private.h>
+
+#include "config.h"
+
+#include <osmocom/netif/sctp.h>
+
+/*! \file stream_cli.c */
+
+#define LOGSCLI(cli, level, fmt, args...) \
+ LOGP(DLINP, level, "CLICONN(%s,%s){%s} " fmt, \
+ cli->name ? : "", \
+ cli->sockname, \
+ get_value_string(stream_cli_state_names, (cli)->state), \
+ ## args)
+
+/*
+ * Client side.
+ */
+
+enum osmo_stream_cli_state {
+ STREAM_CLI_STATE_CLOSED, /* No fd associated, no timer active */
+ STREAM_CLI_STATE_WAIT_RECONNECT, /* No fd associated, has timer active to try to connect again */
+ STREAM_CLI_STATE_CONNECTING, /* Fd associated, but connection not yet confirmed by peer or lower layers */
+ STREAM_CLI_STATE_CONNECTED, /* Fd associated and connection is established */
+ STREAM_CLI_STATE_MAX
+};
+
+static const struct value_string stream_cli_state_names[] = {
+ { STREAM_CLI_STATE_CLOSED, "CLOSED" },
+ { STREAM_CLI_STATE_WAIT_RECONNECT, "WAIT_RECONNECT" },
+ { STREAM_CLI_STATE_CONNECTING, "CONNECTING" },
+ { STREAM_CLI_STATE_CONNECTED, "CONNECTED" },
+ { 0, NULL }
+};
+
+#define OSMO_STREAM_CLI_F_RECONF (1 << 0)
+#define OSMO_STREAM_CLI_F_NODELAY (1 << 1)
+
+struct osmo_stream_cli {
+ char *name;
+ char sockname[OSMO_SOCK_NAME_MAXLEN];
+ enum osmo_stream_mode mode;
+ union {
+ struct osmo_fd ofd;
+ struct osmo_io_fd *iofd;
+ };
+ struct llist_head tx_queue;
+ struct osmo_timer_list timer;
+ enum osmo_stream_cli_state state;
+ char *addr[OSMO_STREAM_MAX_ADDRS];
+ uint8_t addrcnt;
+ uint16_t port;
+ char *local_addr[OSMO_STREAM_MAX_ADDRS];
+ uint8_t local_addrcnt;
+ uint16_t local_port;
+ int sk_domain;
+ int sk_type;
+ uint16_t proto;
+ osmo_stream_cli_connect_cb_t connect_cb;
+ osmo_stream_cli_disconnect_cb_t disconnect_cb;
+ osmo_stream_cli_read_cb_t read_cb;
+ osmo_stream_cli_read_cb2_t iofd_read_cb;
+ osmo_stream_cli_segmentation_cb_t segmentation_cb;
+ void *data;
+ int flags;
+ int reconnect_timeout;
+ struct osmo_sock_init2_multiaddr_pars ma_pars;
+};
+
+void osmo_stream_cli_close(struct osmo_stream_cli *cli);
+
+/*! \addtogroup stream_cli
+ * @{
+ */
+
+/*! Re-connect an Osmocom Stream Client.
+ * If re-connection is enabled for this client
+ * (which is the case unless negative timeout was explicitly set via osmo_stream_cli_set_reconnect_timeout() call),
+ * we close any existing connection (if any) and schedule a re-connect timer */
+void osmo_stream_cli_reconnect(struct osmo_stream_cli *cli)
+{
+ osmo_stream_cli_close(cli);
+
+ if (cli->reconnect_timeout < 0) {
+ LOGSCLI(cli, LOGL_INFO, "not reconnecting, disabled\n");
+ return;
+ }
+
+ cli->state = STREAM_CLI_STATE_WAIT_RECONNECT;
+ LOGSCLI(cli, LOGL_INFO, "retrying reconnect in %d seconds...\n",
+ cli->reconnect_timeout);
+ osmo_timer_schedule(&cli->timer, cli->reconnect_timeout, 0);
+}
+
+/*! Check if Osmocom Stream Client is in connected state.
+ * \param[in] cli Osmocom Stream Client
+ * \return true if connected, false otherwise
+ */
+bool osmo_stream_cli_is_connected(struct osmo_stream_cli *cli)
+{
+ return cli->state == STREAM_CLI_STATE_CONNECTED;
+}
+
+static void osmo_stream_cli_close_iofd(struct osmo_stream_cli *cli)
+{
+ if (!cli->iofd)
+ return;
+
+ osmo_iofd_free(cli->iofd);
+ cli->iofd = NULL;
+}
+
+static void osmo_stream_cli_close_ofd(struct osmo_stream_cli *cli)
+{
+ if (cli->ofd.fd == -1)
+ return;
+ osmo_fd_unregister(&cli->ofd);
+ close(cli->ofd.fd);
+ cli->ofd.fd = -1;
+}
+
+/*! Close an Osmocom Stream Client.
+ * \param[in] cli Osmocom Stream Client to be closed
+ * We unregister the socket fd from the osmocom select() loop
+ * abstraction and close the socket */
+void osmo_stream_cli_close(struct osmo_stream_cli *cli)
+{
+ int old_state = cli->state;
+
+ if (cli->state == STREAM_CLI_STATE_CLOSED)
+ return;
+ if (cli->state == STREAM_CLI_STATE_WAIT_RECONNECT) {
+ osmo_timer_del(&cli->timer);
+ cli->state = STREAM_CLI_STATE_CLOSED;
+ return;
+ }
+
+
+ switch (cli->mode) {
+ case OSMO_STREAM_MODE_OSMO_FD:
+ osmo_stream_cli_close_ofd(cli);
+ break;
+ case OSMO_STREAM_MODE_OSMO_IO:
+ osmo_stream_cli_close_iofd(cli);
+ break;
+ default:
+ OSMO_ASSERT(false);
+ }
+
+ cli->state = STREAM_CLI_STATE_CLOSED;
+
+ if (old_state == STREAM_CLI_STATE_CONNECTED) {
+ LOGSCLI(cli, LOGL_DEBUG, "connection closed\n");
+ if (cli->disconnect_cb)
+ cli->disconnect_cb(cli);
+ }
+}
+
+/*! Retrieve file descriptor of the stream client socket.
+ * \param[in] cli Stream Client of which we want to obtain the file descriptor
+ * \returns File descriptor or negative in case of error */
+int
+osmo_stream_cli_get_fd(const struct osmo_stream_cli *cli)
+{
+ switch (cli->mode) {
+ case OSMO_STREAM_MODE_OSMO_FD:
+ return cli->ofd.fd;
+ case OSMO_STREAM_MODE_OSMO_IO:
+ if (cli->iofd)
+ return osmo_iofd_get_fd(cli->iofd);
+ default:
+ break;
+ }
+ return -EINVAL;
+}
+
+/*! Retrieve osmo_io descriptor of the stream client socket.
+ * This function must not be called on a stream client in legacy osmo_fd mode!
+ * The iofd is only valid once/after osmo_stream_cli_open() has successfully returned.
+ * \param[in] cli Stream Client of which we want to obtain the file descriptor
+ * \returns osmo_io_fd of stream client, or NULL if stream not yet opened. */
+struct osmo_io_fd *
+osmo_stream_cli_get_iofd(const struct osmo_stream_cli *cli)
+{
+ OSMO_ASSERT(cli->mode == OSMO_STREAM_MODE_OSMO_IO);
+ return cli->iofd;
+}
+
+static void osmo_stream_cli_read(struct osmo_stream_cli *cli)
+{
+ LOGSCLI(cli, LOGL_DEBUG, "message received\n");
+
+ if (cli->read_cb)
+ cli->read_cb(cli);
+}
+
+static int osmo_stream_cli_write(struct osmo_stream_cli *cli)
+{
+#ifdef HAVE_LIBSCTP
+ struct sctp_sndrcvinfo sinfo;
+#endif
+ struct msgb *msg;
+ int ret;
+
+ if (llist_empty(&cli->tx_queue)) {
+ osmo_fd_write_disable(&cli->ofd);
+ return 0;
+ }
+ msg = llist_first_entry(&cli->tx_queue, struct msgb, list);
+ llist_del(&msg->list);
+
+ if (!osmo_stream_cli_is_connected(cli)) {
+ LOGSCLI(cli, LOGL_ERROR, "send: not connected, dropping data!\n");
+ return 0;
+ }
+
+ LOGSCLI(cli, LOGL_DEBUG, "sending %u bytes of data\n", msgb_length(msg));
+
+ switch (cli->sk_domain) {
+ case AF_UNIX:
+ ret = send(cli->ofd.fd, msgb_data(msg), msgb_length(msg), 0);
+ break;
+ case AF_UNSPEC:
+ case AF_INET:
+ case AF_INET6:
+ switch (cli->proto) {
+#ifdef HAVE_LIBSCTP
+ case IPPROTO_SCTP:
+ memset(&sinfo, 0, sizeof(sinfo));
+ sinfo.sinfo_ppid = htonl(msgb_sctp_ppid(msg));
+ sinfo.sinfo_stream = msgb_sctp_stream(msg);
+ ret = sctp_send(cli->ofd.fd, msgb_data(msg), msgb_length(msg),
+ &sinfo, MSG_NOSIGNAL);
+ break;
+#endif
+ case IPPROTO_TCP:
+ default:
+ ret = send(cli->ofd.fd, msgb_data(msg), msgb_length(msg), 0);
+ break;
+ }
+ break;
+ default:
+ ret = -ENOTSUP;
+ }
+
+ if (ret >= 0 && ret < msgb_length(msg)) {
+ LOGSCLI(cli, LOGL_ERROR, "short send: %d < exp %u\n", ret, msgb_length(msg));
+ /* Update msgb and re-add it at the start of the queue: */
+ msgb_pull(msg, ret);
+ llist_add(&msg->list, &cli->tx_queue);
+ return 0;
+ }
+
+ if (ret < 0) {
+ int err = errno;
+ LOGSCLI(cli, LOGL_ERROR, "send(len=%u) error: %s\n", msgb_length(msg), strerror(err));
+ if (err == EAGAIN) {
+ /* Re-add at the start of the queue to re-attempt: */
+ llist_add(&msg->list, &cli->tx_queue);
+ return 0;
+ }
+ msgb_free(msg);
+ osmo_stream_cli_reconnect(cli);
+ return 0;
+ }
+
+ msgb_free(msg);
+
+ if (llist_empty(&cli->tx_queue))
+ osmo_fd_write_disable(&cli->ofd);
+
+ return 0;
+}
+
+static int _setsockopt_nosigpipe(struct osmo_stream_cli *cli)
+{
+#ifdef SO_NOSIGPIPE
+ int ret;
+ int val = 1;
+ ret = setsockopt(osmo_stream_cli_get_fd(cli), SOL_SOCKET, SO_NOSIGPIPE, (void *)&val, sizeof(val));
+ if (ret < 0)
+ LOGSCLI(cli, LOGL_ERROR, "Failed setting SO_NOSIGPIPE: %s\n", strerror(errno));
+ return ret;
+#else
+ return 0;
+#endif
+}
+
+static void stream_cli_handle_connecting(struct osmo_stream_cli *cli, int res)
+{
+ int error, ret = res;
+ socklen_t len = sizeof(error);
+
+ int fd = osmo_stream_cli_get_fd(cli);
+ OSMO_ASSERT(fd >= 0);
+
+ if (ret < 0) {
+ osmo_stream_cli_reconnect(cli);
+ return;
+ }
+ ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len);
+ if (ret >= 0 && error > 0) {
+ osmo_stream_cli_reconnect(cli);
+ return;
+ }
+
+ /* If messages got enqueued while 'connecting', keep WRITE flag
+ up to dispatch them upon next main loop step */
+ if (cli->mode == OSMO_STREAM_MODE_OSMO_FD && llist_empty(&cli->tx_queue))
+ osmo_fd_write_disable(&cli->ofd);
+
+ /* Update sockname based on socket info: */
+ osmo_sock_get_name_buf(cli->sockname, sizeof(cli->sockname), osmo_stream_cli_get_fd(cli));
+
+ LOGSCLI(cli, LOGL_INFO, "connection established\n");
+ cli->state = STREAM_CLI_STATE_CONNECTED;
+ switch (cli->sk_domain) {
+ case AF_UNIX:
+ _setsockopt_nosigpipe(cli);
+ break;
+ case AF_UNSPEC:
+ case AF_INET:
+ case AF_INET6:
+ if (cli->proto == IPPROTO_SCTP) {
+ _setsockopt_nosigpipe(cli);
+ stream_sctp_sock_activate_events(fd);
+ }
+ break;
+ default:
+ break;
+ }
+ if (cli->connect_cb)
+ cli->connect_cb(cli);
+}
+
+static int osmo_stream_cli_fd_cb(struct osmo_fd *ofd, unsigned int what)
+{
+ struct osmo_stream_cli *cli = ofd->data;
+
+ switch (cli->state) {
+ case STREAM_CLI_STATE_CONNECTING:
+ stream_cli_handle_connecting(cli, 0);
+ break;
+ case STREAM_CLI_STATE_CONNECTED:
+ if (what & OSMO_FD_READ) {
+ LOGSCLI(cli, LOGL_DEBUG, "connected read\n");
+ osmo_stream_cli_read(cli);
+ }
+ if (what & OSMO_FD_WRITE) {
+ LOGSCLI(cli, LOGL_DEBUG, "connected write\n");
+ osmo_stream_cli_write(cli);
+ }
+ break;
+ default:
+ /* Only CONNECTING and CONNECTED states are expected, since they are the only states
+ * where FD exists: */
+ osmo_panic("%s() called with unexpected state %d\n", __func__, cli->state);
+ }
+ return 0;
+}
+
+static void cli_timer_cb(void *data);
+
+/*! Create an Osmocom stream client.
+ * \param[in] ctx talloc context from which to allocate memory
+ * This function allocates a new \ref osmo_stream_cli and initializes
+ * it with default values (5s reconnect timer, TCP protocol)
+ * \return allocated stream client, or NULL in case of error
+ */
+struct osmo_stream_cli *osmo_stream_cli_create(void *ctx)
+{
+ struct osmo_stream_cli *cli;
+
+ cli = talloc_zero(ctx, struct osmo_stream_cli);
+ if (!cli)
+ return NULL;
+
+ cli->mode = OSMO_STREAM_MODE_UNKNOWN;
+ cli->sk_domain = AF_UNSPEC;
+ cli->sk_type = SOCK_STREAM;
+ cli->proto = IPPROTO_TCP;
+
+ cli->state = STREAM_CLI_STATE_CLOSED;
+ osmo_timer_setup(&cli->timer, cli_timer_cb, cli);
+ cli->reconnect_timeout = 5; /* default is 5 seconds. */
+ cli->segmentation_cb = NULL;
+ INIT_LLIST_HEAD(&cli->tx_queue);
+
+ cli->ma_pars.sctp.version = 0;
+
+ return cli;
+}
+
+static void stream_cli_iofd_read_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg)
+{
+ struct osmo_stream_cli *cli = osmo_iofd_get_data(iofd);
+
+ switch (cli->state) {
+ case STREAM_CLI_STATE_CONNECTING:
+ msgb_free(msg);
+ stream_cli_handle_connecting(cli, res);
+ break;
+ case STREAM_CLI_STATE_CONNECTED:
+ switch (res) {
+ case -EPIPE:
+ case -ECONNRESET:
+ LOGSCLI(cli, LOGL_ERROR, "lost connection with srv (%d)\n", res);
+ osmo_stream_cli_reconnect(cli);
+ break;
+ case 0:
+ LOGSCLI(cli, LOGL_NOTICE, "connection closed with srv\n");
+ osmo_stream_cli_reconnect(cli);
+ break;
+ default:
+ LOGSCLI(cli, LOGL_DEBUG, "received %d bytes from srv\n", res);
+ break;
+ }
+ /* Notify user of new data or error: */
+ if (cli->iofd_read_cb)
+ cli->iofd_read_cb(cli, res, msg);
+ else
+ msgb_free(msg);
+ break;
+ default:
+ osmo_panic("%s() called with unexpected state %d\n", __func__, cli->state);
+ }
+}
+
+static void stream_cli_iofd_write_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg)
+{
+ struct osmo_stream_cli *cli = osmo_iofd_get_data(iofd);
+
+ switch (cli->state) {
+ case STREAM_CLI_STATE_CONNECTING:
+ stream_cli_handle_connecting(cli, res);
+ break;
+ case STREAM_CLI_STATE_CONNECTED:
+ if (msg && res <= 0) {
+ osmo_stream_cli_reconnect(cli);
+ LOGSCLI(cli, LOGL_ERROR, "received error %d in response to send\n", res);
+ }
+ break;
+ default:
+ osmo_panic("%s() called with unexpected state %d\n", __func__, cli->state);
+ }
+}
+
+static const struct osmo_io_ops osmo_stream_cli_ioops = {
+ .read_cb = stream_cli_iofd_read_cb,
+ .write_cb = stream_cli_iofd_write_cb,
+
+ .segmentation_cb = NULL,
+};
+
+#ifdef HAVE_LIBSCTP
+static void stream_cli_iofd_recvmsg_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg, const struct msghdr *msgh)
+{
+ struct osmo_stream_cli *cli = osmo_iofd_get_data(iofd);
+
+ res = stream_iofd_sctp_recvmsg_trailer(iofd, msg, res, msgh);
+
+ switch (cli->state) {
+ case STREAM_CLI_STATE_CONNECTING:
+ msgb_free(msg);
+ stream_cli_handle_connecting(cli, res);
+ break;
+ case STREAM_CLI_STATE_CONNECTED:
+ switch (res) {
+ case -EPIPE:
+ case -ECONNRESET:
+ LOGSCLI(cli, LOGL_ERROR, "lost connection with srv (%d)\n", res);
+ osmo_stream_cli_reconnect(cli);
+ break;
+ case 0:
+ LOGSCLI(cli, LOGL_NOTICE, "connection closed with srv\n");
+ osmo_stream_cli_reconnect(cli);
+ break;
+ default:
+ break;
+ }
+ /* Notify user of new data or error: */
+ if (cli->iofd_read_cb)
+ cli->iofd_read_cb(cli, res, msg);
+ else
+ msgb_free(msg);
+ break;
+ default:
+ osmo_panic("%s() called with unexpected state %d\n", __func__, cli->state);
+ }
+}
+
+static const struct osmo_io_ops osmo_stream_cli_ioops_sctp = {
+ .recvmsg_cb = stream_cli_iofd_recvmsg_cb,
+ .sendmsg_cb = stream_cli_iofd_write_cb,
+
+ .segmentation_cb = NULL,
+};
+#endif
+
+
+/*! Set a name on the cli object (used during logging).
+ * \param[in] cli stream_cli whose name is to be set
+ * \param[in] name the name to be set on cli
+ */
+void osmo_stream_cli_set_name(struct osmo_stream_cli *cli, const char *name)
+{
+ osmo_talloc_replace_string(cli, &cli->name, name);
+ if (cli->mode == OSMO_STREAM_MODE_OSMO_IO && cli->iofd)
+ osmo_iofd_set_name(cli->iofd, name);
+}
+
+/*! Retrieve name previously set on the cli object (see osmo_stream_cli_set_name()).
+ * \param[in] cli stream_cli whose name is to be retrieved
+ * \returns The name to be set on cli; NULL if never set
+ */
+const char *osmo_stream_cli_get_name(const struct osmo_stream_cli *cli)
+{
+ return cli->name;
+}
+
+/*! Set the remote address to which we connect.
+ * Any changes to this setting will only become active upon next (re)connect.
+ * \param[in] cli Stream Client to modify
+ * \param[in] addr Remote IP address
+ */
+void
+osmo_stream_cli_set_addr(struct osmo_stream_cli *cli, const char *addr)
+{
+ osmo_stream_cli_set_addrs(cli, &addr, 1);
+}
+
+/*! Set the remote address set to which we connect.
+ * Useful for protocols allowing connecting to more than one address (such as SCTP)
+ * Any changes to this setting will only become active upon next (re)connect.
+ * \param[in] cli Stream Client to modify
+ * \param[in] addr Remote IP address set
+ * \return negative on error, 0 on success
+ */
+int osmo_stream_cli_set_addrs(struct osmo_stream_cli *cli, const char **addr, size_t addrcnt)
+{
+ int i = 0;
+
+ if (addrcnt > OSMO_STREAM_MAX_ADDRS)
+ return -EINVAL;
+
+ for (; i < addrcnt; i++)
+ osmo_talloc_replace_string(cli, &cli->addr[i], addr[i]);
+ for (; i < cli->addrcnt; i++) {
+ talloc_free(cli->addr[i]);
+ cli->addr[i] = NULL;
+ }
+
+ cli->addrcnt = addrcnt;
+ cli->flags |= OSMO_STREAM_CLI_F_RECONF;
+ return 0;
+}
+
+/*! Set the remote port number to which we connect.
+ * Any changes to this setting will only become active upon next (re)connect.
+ * \param[in] cli Stream Client to modify
+ * \param[in] port Remote port number
+ */
+void
+osmo_stream_cli_set_port(struct osmo_stream_cli *cli, uint16_t port)
+{
+ cli->port = port;
+ cli->flags |= OSMO_STREAM_CLI_F_RECONF;
+}
+
+/*! Set the local port number for the socket (to be bound to).
+ * Any changes to this setting will only become active upon next (re)connect.
+ * \param[in] cli Stream Client to modify
+ * \param[in] port Local port number
+ */
+void
+osmo_stream_cli_set_local_port(struct osmo_stream_cli *cli, uint16_t port)
+{
+ cli->local_port = port;
+ cli->flags |= OSMO_STREAM_CLI_F_RECONF;
+}
+
+/*! Set the local address for the socket (to be bound to).
+ * Any changes to this setting will only become active upon next (re)connect.
+ * \param[in] cli Stream Client to modify
+ * \param[in] port Local host name
+ */
+void
+osmo_stream_cli_set_local_addr(struct osmo_stream_cli *cli, const char *addr)
+{
+ osmo_stream_cli_set_local_addrs(cli, &addr, 1);
+}
+
+/*! Set the local address set to which we bind.
+ * Useful for protocols allowing bind to more than one address (such as SCTP)
+ * Any changes to this setting will only become active upon next (re)connect.
+ * \param[in] cli Stream Client to modify
+ * \param[in] addr Local IP address set
+ * \return negative on error, 0 on success
+ */
+int osmo_stream_cli_set_local_addrs(struct osmo_stream_cli *cli, const char **addr, size_t addrcnt)
+{
+ int i = 0;
+
+ if (addrcnt > OSMO_STREAM_MAX_ADDRS)
+ return -EINVAL;
+
+ for (; i < addrcnt; i++)
+ osmo_talloc_replace_string(cli, &cli->local_addr[i], addr[i]);
+ for (; i < cli->local_addrcnt; i++) {
+ talloc_free(cli->local_addr[i]);
+ cli->local_addr[i] = NULL;
+ }
+
+ cli->local_addrcnt = addrcnt;
+ cli->flags |= OSMO_STREAM_CLI_F_RECONF;
+ return 0;
+}
+
+/*! Set the protocol for the stream client socket.
+ * Any changes to this setting will only become active upon next (re)connect.
+ * \param[in] cli Stream Client to modify
+ * \param[in] proto Protocol (like IPPROTO_TCP (default), IPPROTO_SCTP, ...)
+ */
+void
+osmo_stream_cli_set_proto(struct osmo_stream_cli *cli, uint16_t proto)
+{
+ cli->proto = proto;
+ cli->flags |= OSMO_STREAM_CLI_F_RECONF;
+}
+
+/* Configure client side segmentation for the iofd */
+static void configure_cli_segmentation_cb(struct osmo_stream_cli *cli,
+ osmo_stream_cli_segmentation_cb_t segmentation_cb)
+{
+ /* Copy default settings */
+ struct osmo_io_ops client_ops;
+ osmo_iofd_get_ioops(cli->iofd, &client_ops);
+ /* Set segmentation cb for this client */
+ client_ops.segmentation_cb = segmentation_cb;
+ osmo_iofd_set_ioops(cli->iofd, &client_ops);
+}
+
+/*! Set the segmentation callback for the client.
+ * \param[in,out] cli Stream Client to modify
+ * \param[in] segmentation_cb Target segmentation callback
+ */
+void osmo_stream_cli_set_segmentation_cb(struct osmo_stream_cli *cli,
+ osmo_stream_cli_segmentation_cb_t segmentation_cb)
+{
+ cli->segmentation_cb = segmentation_cb;
+ if (cli->iofd) /* Otherwise, this will be done in osmo_stream_cli_open() */
+ configure_cli_segmentation_cb(cli, segmentation_cb);
+}
+
+/*! Set the socket type for the stream server link.
+ * Any changes to this setting will only become active upon next (re)connect.
+ * \param[in] cli Stream Client to modify
+ * \param[in] type Socket Type (like SOCK_STREAM (default), SOCK_SEQPACKET, ...)
+ * \returns zero on success, negative -errno on error.
+ */
+int osmo_stream_cli_set_type(struct osmo_stream_cli *cli, int type)
+{
+ switch (type) {
+ case SOCK_STREAM:
+ case SOCK_SEQPACKET:
+ break;
+ default:
+ return -ENOTSUP;
+ }
+ cli->sk_type = type;
+ cli->flags |= OSMO_STREAM_CLI_F_RECONF;
+ return 0;
+}
+
+/*! Set the socket type for the stream server link.
+ * Any changes to this setting will only become active upon next (re)connect.
+ * \param[in] cli Stream Client to modify
+ * \param[in] type Socket Domain (like AF_UNSPEC (default for IP), AF_UNIX, AF_INET, ...)
+ * \returns zero on success, negative -errno on error.
+ */
+int osmo_stream_cli_set_domain(struct osmo_stream_cli *cli, int domain)
+{
+ switch (domain) {
+ case AF_UNSPEC:
+ case AF_INET:
+ case AF_INET6:
+ case AF_UNIX:
+ break;
+ default:
+ return -ENOTSUP;
+ }
+ cli->sk_domain = domain;
+ cli->flags |= OSMO_STREAM_CLI_F_RECONF;
+ return 0;
+}
+
+/*! Set the reconnect time of the stream client socket.
+ * \param[in] cli Stream Client to modify
+ * \param[in] timeout Re-connect timeout in seconds or negative value to disable auto-reconnection */
+void
+osmo_stream_cli_set_reconnect_timeout(struct osmo_stream_cli *cli, int timeout)
+{
+ cli->reconnect_timeout = timeout;
+}
+
+/*! Set application private data of the stream client socket.
+ * \param[in] cli Stream Client to modify
+ * \param[in] data User-specific data (available in call-back functions) */
+void
+osmo_stream_cli_set_data(struct osmo_stream_cli *cli, void *data)
+{
+ cli->data = data;
+}
+
+/*! Retrieve application private data of the stream client socket.
+ * \param[in] cli Stream Client to modify
+ * \returns Application private data, as set by \ref osmo_stream_cli_set_data() */
+void *osmo_stream_cli_get_data(struct osmo_stream_cli *cli)
+{
+ return cli->data;
+}
+
+/*! Retrieve the stream client socket description.
+ * Calling this function will build a string that describes the socket in terms of its local/remote
+ * address/port. The returned name is stored in a static buffer; it is hence not re-entrant or thread-safe.
+ * \param[in] cli Stream Client to examine
+ * \returns Socket description or NULL in case of error */
+char *osmo_stream_cli_get_sockname(const struct osmo_stream_cli *cli)
+{
+ static char buf[OSMO_STREAM_MAX_ADDRS * OSMO_SOCK_NAME_MAXLEN];
+
+ osmo_sock_multiaddr_get_name_buf(buf, sizeof(buf),
+ osmo_stream_cli_get_fd(cli), cli->proto);
+
+ return buf;
+}
+
+/*! Retrieve Osmocom File Descriptor of the stream client socket.
+ * This function only works in case you operate osmo_stream_cli in osmo_fd mode!
+ * \param[in] cli Stream Client to modify
+ * \returns Pointer to \ref osmo_fd */
+struct osmo_fd *
+osmo_stream_cli_get_ofd(struct osmo_stream_cli *cli)
+{
+ OSMO_ASSERT(cli->mode == OSMO_STREAM_MODE_OSMO_FD);
+ return &cli->ofd;
+}
+
+/*! Set the call-back function called on connect of the stream client socket.
+ * The call-back function registered via this function will be called upon completion of the non-blocking
+ * outbound connect operation.
+ * \param[in] cli Stream Client to modify
+ * \param[in] connect_cb Call-back function to be called upon connect */
+void
+osmo_stream_cli_set_connect_cb(struct osmo_stream_cli *cli,
+ osmo_stream_cli_connect_cb_t connect_cb)
+{
+ cli->connect_cb = connect_cb;
+}
+
+/*! Set the call-back function called on disconnect of the stream client socket.
+ * \param[in] cli Stream Client to modify
+ * \param[in] disconnect_cb Call-back function to be called upon disconnect */
+void osmo_stream_cli_set_disconnect_cb(struct osmo_stream_cli *cli,
+ osmo_stream_cli_disconnect_cb_t disconnect_cb)
+{
+ cli->disconnect_cb = disconnect_cb;
+}
+
+/*! Set the call-back function called to read from the stream client socket.
+ * This function will implicitly configure osmo_stream_cli to use legacy osmo_ofd mode.
+ * \param[in] cli Stream Client to modify
+ * \param[in] read_cb Call-back function to be called when we want to read */
+void
+osmo_stream_cli_set_read_cb(struct osmo_stream_cli *cli,
+ osmo_stream_cli_read_cb_t read_cb)
+{
+ OSMO_ASSERT(cli->mode != OSMO_STREAM_MODE_OSMO_IO);
+ cli->mode = OSMO_STREAM_MODE_OSMO_FD;
+ cli->read_cb = read_cb;
+}
+
+/*! Set the call-back function called to read from the stream client socket.
+ * This function will implicitly configure osmo_stream_cli to use osmo_iofd mode.
+ * \param[in] cli Stream Client to modify
+ * \param[in] read_cb Call-back function to be called when data was read from the socket */
+void
+osmo_stream_cli_set_read_cb2(struct osmo_stream_cli *cli,
+ osmo_stream_cli_read_cb2_t read_cb)
+{
+ OSMO_ASSERT(cli->mode != OSMO_STREAM_MODE_OSMO_FD);
+ cli->mode = OSMO_STREAM_MODE_OSMO_IO;
+ cli->iofd_read_cb = read_cb;
+}
+
+/*! Destroy a Osmocom stream client (includes close).
+ * \param[in] cli Stream Client to destroy */
+void osmo_stream_cli_destroy(struct osmo_stream_cli *cli)
+{
+ osmo_stream_cli_close(cli);
+ osmo_timer_del(&cli->timer);
+ msgb_queue_free(&cli->tx_queue);
+ talloc_free(cli);
+}
+
+/*! DEPRECATED: use osmo_stream_cli_set_reconnect_timeout() or osmo_stream_cli_reconnect() instead!
+ * Open connection of an Osmocom stream client
+ * \param[in] cli Stream Client to connect
+ * \param[in] reconect 1 if we should not automatically reconnect
+ * \return negative on error, 0 on success
+ */
+int osmo_stream_cli_open2(struct osmo_stream_cli *cli, int reconnect)
+{
+ int ret;
+
+ /* we are reconfiguring this socket, close existing first. */
+ if ((cli->flags & OSMO_STREAM_CLI_F_RECONF) && cli->ofd.fd >= 0)
+ osmo_stream_cli_close(cli);
+
+ cli->flags &= ~OSMO_STREAM_CLI_F_RECONF;
+
+ switch (cli->proto) {
+#ifdef HAVE_LIBSCTP
+ case IPPROTO_SCTP:
+ ret = osmo_sock_init2_multiaddr2(AF_UNSPEC, SOCK_STREAM, cli->proto,
+ (const char **)cli->local_addr, cli->local_addrcnt, cli->local_port,
+ (const char **)cli->addr, cli->addrcnt, cli->port,
+ OSMO_SOCK_F_CONNECT|OSMO_SOCK_F_BIND|OSMO_SOCK_F_NONBLOCK,
+ &cli->ma_pars);
+ break;
+#endif
+ default:
+ ret = osmo_sock_init2(AF_UNSPEC, SOCK_STREAM, cli->proto,
+ cli->local_addr[0], cli->local_port,
+ cli->addr[0], cli->port,
+ OSMO_SOCK_F_CONNECT|OSMO_SOCK_F_BIND|OSMO_SOCK_F_NONBLOCK);
+ }
+
+ if (ret < 0) {
+ if (reconnect)
+ osmo_stream_cli_reconnect(cli);
+ return ret;
+ }
+ osmo_fd_setup(&cli->ofd, ret, OSMO_FD_READ | OSMO_FD_WRITE, osmo_stream_cli_fd_cb, cli, 0);
+
+ if (cli->flags & OSMO_STREAM_CLI_F_NODELAY) {
+ ret = stream_setsockopt_nodelay(cli->ofd.fd, cli->proto, 1);
+ if (ret < 0)
+ goto error_close_socket;
+ }
+
+ if (osmo_fd_register(&cli->ofd) < 0)
+ goto error_close_socket;
+
+ cli->state = STREAM_CLI_STATE_CONNECTING;
+ return 0;
+
+error_close_socket:
+ close(cli->ofd.fd);
+ cli->ofd.fd = -1;
+ return -EIO;
+}
+
+/*! Set the NODELAY socket option to avoid Nagle-like behavior.
+ * Setting this to nodelay=true will automatically set the NODELAY
+ * socket option on any socket established via \ref osmo_stream_cli_open
+ * or any re-connect. You have to set this _before_ opening the
+ * socket.
+ * \param[in] cli Stream client whose sockets are to be configured
+ * \param[in] nodelay whether to set (true) NODELAY before connect()
+ */
+void osmo_stream_cli_set_nodelay(struct osmo_stream_cli *cli, bool nodelay)
+{
+ if (nodelay)
+ cli->flags |= OSMO_STREAM_CLI_F_NODELAY;
+ else
+ cli->flags &= ~OSMO_STREAM_CLI_F_NODELAY;
+}
+
+/*! Open connection of an Osmocom stream client.
+ * This will initiate an non-blocking outbound connect to the configured destination (server) address.
+ * By default the client will automatically attempt to reconnect after default timeout.
+ * To disable this, use osmo_stream_cli_set_reconnect_timeout() before calling this function.
+ * \param[in] cli Stream Client to connect
+ * \return negative on error, 0 on success */
+int osmo_stream_cli_open(struct osmo_stream_cli *cli)
+{
+ int ret, flags;
+ int fd = -1;
+ unsigned int local_addrcnt;
+
+ /* we are reconfiguring this socket, close existing first. */
+ if ((cli->flags & OSMO_STREAM_CLI_F_RECONF) && osmo_stream_cli_get_fd(cli) >= 0)
+ osmo_stream_cli_close(cli);
+
+ cli->flags &= ~OSMO_STREAM_CLI_F_RECONF;
+
+ switch (cli->sk_domain) {
+ case AF_UNIX:
+ ret = osmo_sock_unix_init(cli->sk_type, 0, cli->addr[0], OSMO_SOCK_F_CONNECT|OSMO_SOCK_F_BIND|OSMO_SOCK_F_NONBLOCK);
+ break;
+ case AF_INET:
+ case AF_INET6:
+ case AF_UNSPEC:
+ switch (cli->proto) {
+#ifdef HAVE_LIBSCTP
+ case IPPROTO_SCTP:
+ local_addrcnt = cli->local_addrcnt;
+ flags = OSMO_SOCK_F_CONNECT|OSMO_SOCK_F_NONBLOCK;
+ if (cli->local_addrcnt > 0 || cli->local_port > 0) { /* explicit bind required? */
+ flags |= OSMO_SOCK_F_BIND;
+ /* If no local addr configured, use local_addr[0]=NULL by default when creating the socket. */
+ if (cli->local_addrcnt == 0)
+ local_addrcnt = 1;
+ }
+ ret = osmo_sock_init2_multiaddr2(cli->sk_domain, cli->sk_type, cli->proto,
+ (const char **)cli->local_addr, local_addrcnt, cli->local_port,
+ (const char **)cli->addr, cli->addrcnt, cli->port,
+ flags, &cli->ma_pars);
+ break;
+#endif
+ default:
+ ret = osmo_sock_init2(cli->sk_domain, cli->sk_type, cli->proto,
+ cli->local_addr[0], cli->local_port,
+ cli->addr[0], cli->port,
+ OSMO_SOCK_F_CONNECT|OSMO_SOCK_F_BIND|OSMO_SOCK_F_NONBLOCK);
+ }
+ break;
+ default:
+ return -ENOTSUP;
+ }
+
+ if (ret < 0) {
+ osmo_stream_cli_reconnect(cli);
+ return ret;
+ }
+
+ fd = ret;
+
+ if (cli->flags & OSMO_STREAM_CLI_F_NODELAY) {
+ ret = stream_setsockopt_nodelay(fd, cli->proto, 1);
+ if (ret < 0)
+ goto error_close_socket;
+ }
+
+ switch (cli->mode) {
+ case OSMO_STREAM_MODE_OSMO_FD:
+ osmo_fd_setup(&cli->ofd, fd, OSMO_FD_READ | OSMO_FD_WRITE, osmo_stream_cli_fd_cb, cli, 0);
+ if (osmo_fd_register(&cli->ofd) < 0)
+ goto error_close_socket;
+ break;
+ case OSMO_STREAM_MODE_OSMO_IO:
+ /* Be sure that previous osmo_io instance is freed before creating a new one. */
+ osmo_stream_cli_close_iofd(cli);
+#ifdef HAVE_LIBSCTP
+ if (cli->proto == IPPROTO_SCTP) {
+ cli->iofd = osmo_iofd_setup(cli, fd, cli->name, OSMO_IO_FD_MODE_RECVMSG_SENDMSG,
+ &osmo_stream_cli_ioops_sctp, cli);
+ if (cli->iofd)
+ osmo_iofd_set_cmsg_size(cli->iofd, CMSG_SPACE(sizeof(struct sctp_sndrcvinfo)));
+ } else {
+#else
+ if (true) {
+#endif
+ cli->iofd = osmo_iofd_setup(cli, fd, cli->name, OSMO_IO_FD_MODE_READ_WRITE,
+ &osmo_stream_cli_ioops, cli);
+ }
+ if (!cli->iofd)
+ goto error_close_socket;
+
+ osmo_iofd_notify_connected(cli->iofd);
+
+ configure_cli_segmentation_cb(cli, cli->segmentation_cb);
+
+ if (osmo_iofd_register(cli->iofd, fd) < 0)
+ goto error_close_socket;
+ break;
+ default:
+ OSMO_ASSERT(false);
+ }
+
+ cli->state = STREAM_CLI_STATE_CONNECTING;
+ return 0;
+
+error_close_socket:
+ cli->state = STREAM_CLI_STATE_CLOSED;
+ close(fd);
+ if (cli->mode == OSMO_STREAM_MODE_OSMO_FD)
+ cli->ofd.fd = -1;
+ return -EIO;
+}
+
+static void cli_timer_cb(void *data)
+{
+ struct osmo_stream_cli *cli = data;
+
+ LOGSCLI(cli, LOGL_DEBUG, "reconnecting\n");
+ osmo_stream_cli_open(cli);
+}
+
+/*! Enqueue data to be sent via an Osmocom stream client..
+ * This is the function you use for writing/sending/transmitting data via the osmo_stream_cli.
+ * \param[in] cli Stream Client through which we want to send
+ * \param[in] msg Message buffer to enqueue in transmit queue */
+void osmo_stream_cli_send(struct osmo_stream_cli *cli, struct msgb *msg)
+{
+ int rc;
+
+ OSMO_ASSERT(cli);
+ OSMO_ASSERT(msg);
+
+ if (!osmo_stream_cli_is_connected(cli)) {
+ LOGSCLI(cli, LOGL_ERROR, "send: not connected, dropping data!\n");
+ msgb_free(msg);
+ return;
+ }
+
+ switch (cli->mode) {
+ case OSMO_STREAM_MODE_OSMO_FD:
+ msgb_enqueue(&cli->tx_queue, msg);
+ osmo_fd_write_enable(&cli->ofd);
+ break;
+ case OSMO_STREAM_MODE_OSMO_IO:
+ /* whenever osmo_stream_cli_is_connected() [see above check], we should have an iofd */
+ OSMO_ASSERT(cli->iofd);
+ if (cli->proto == IPPROTO_SCTP)
+ rc = stream_iofd_sctp_send_msgb(cli->iofd, msg, MSG_NOSIGNAL);
+ else
+ rc = osmo_iofd_write_msgb(cli->iofd, msg);
+ if (rc < 0)
+ msgb_free(msg);
+ break;
+ default:
+ OSMO_ASSERT(false);
+ }
+}
+
+/*! Receive data via an Osmocom stream client in osmo_fd mode.
+ * \param[in] cli Stream Client through which we want to send
+ * \param msg pre-allocate message buffer to which received data is appended
+ * \returns number of bytes read; <=0 in case of error
+ *
+ * Application programs using the legacy osmo_fd mode of osmo_stream_cli will use
+ * this function to read/receive from a stream client socket after they have been notified that
+ * it is readable (via select/poll).
+ *
+ * If conn is an SCTP connection, additional specific considerations shall be taken:
+ * - msg->cb is always filled with SCTP ppid, and SCTP stream values, see msgb_sctp_*() APIs.
+ * - If an SCTP notification was received when reading from the SCTP socket,
+ * msgb_sctp_msg_flags(msg) will contain bit flag
+ * OSMO_STREAM_SCTP_MSG_FLAGS_NOTIFICATION set, and the msgb will
+ * contain a "union sctp_notification" instead of user data. In this case the
+ * return code will be either 0 (if conn is considered dead after the
+ * notification) or -EAGAIN (if conn is considered still alive after the
+ * notification) resembling the standard recv() API.
+ */
+int osmo_stream_cli_recv(struct osmo_stream_cli *cli, struct msgb *msg)
+{
+ int ret;
+ OSMO_ASSERT(cli);
+ OSMO_ASSERT(msg);
+ OSMO_ASSERT(cli->mode == OSMO_STREAM_MODE_OSMO_FD);
+
+ switch (cli->sk_domain) {
+ case AF_UNIX:
+ ret = recv(cli->ofd.fd, msg->tail, msgb_tailroom(msg), 0);
+ break;
+ case AF_INET:
+ case AF_INET6:
+ case AF_UNSPEC:
+ switch (cli->proto) {
+#ifdef HAVE_LIBSCTP
+ case IPPROTO_SCTP:
+ {
+ char log_pfx[128];
+ snprintf(log_pfx, sizeof(log_pfx), "CLICONN(%s,%s)", cli->name ? : "", cli->sockname);
+ ret = stream_sctp_recvmsg_wrapper(cli->ofd.fd, msg, log_pfx);
+ break;
+ }
+#endif
+ case IPPROTO_TCP:
+ default:
+ ret = recv(cli->ofd.fd, msg->tail, msgb_tailroom(msg), 0);
+ break;
+ }
+ break;
+ default:
+ ret = -ENOTSUP;
+ }
+
+ if (ret < 0) {
+ if (ret == -EAGAIN)
+ return ret;
+ if (errno == EPIPE || errno == ECONNRESET)
+ LOGSCLI(cli, LOGL_ERROR, "lost connection with srv\n");
+ osmo_stream_cli_reconnect(cli);
+ return ret;
+ } else if (ret == 0) {
+ LOGSCLI(cli, LOGL_ERROR, "connection closed with srv\n");
+ osmo_stream_cli_reconnect(cli);
+ return ret;
+ }
+ msgb_put(msg, ret);
+ LOGSCLI(cli, LOGL_DEBUG, "received %d bytes from srv\n", ret);
+ return ret;
+}
+
+/*! Clear the transmit queue of the stream client.
+ * Calling this function wil clear (delete) any pending, not-yet transmitted data from the transmit queue. */
+void osmo_stream_cli_clear_tx_queue(struct osmo_stream_cli *cli)
+{
+ switch (cli->mode) {
+ case OSMO_STREAM_MODE_OSMO_FD:
+ msgb_queue_free(&cli->tx_queue);
+ /* If in state 'connecting', keep WRITE flag up to receive
+ * socket connection signal and then transition to STATE_CONNECTED: */
+ if (cli->state == STREAM_CLI_STATE_CONNECTED)
+ osmo_fd_write_disable(&cli->ofd);
+ break;
+ case OSMO_STREAM_MODE_OSMO_IO:
+ osmo_iofd_txqueue_clear(cli->iofd);
+ break;
+ default:
+ OSMO_ASSERT(false);
+ }
+}
+
+/*! Set given parameter of stream client to given value.
+ * \param[in] cli stream client on which to set parameter.
+ * \param[in] par identifier of the parameter to be set.
+ * \param[in] val value of the parameter to be set.
+ * \param[in] val_len length of the parameter value.
+ * \returns 0 in success; negative -errno on error. */
+int osmo_stream_cli_set_param(struct osmo_stream_cli *cli, enum osmo_stream_cli_param par, void *val, size_t val_len)
+{
+ OSMO_ASSERT(cli);
+ uint8_t val8;
+
+ switch (par) {
+ case OSMO_STREAM_CLI_PAR_SCTP_SOCKOPT_AUTH_SUPPORTED:
+ if (!val || val_len != sizeof(uint8_t))
+ return -EINVAL;
+ val8 = *(uint8_t *)val;
+ cli->ma_pars.sctp.sockopt_auth_supported.set = true;
+ cli->ma_pars.sctp.sockopt_auth_supported.abort_on_failure = val8 > 1;
+ cli->ma_pars.sctp.sockopt_auth_supported.value = (val8 == 1 || val8 == 3) ? 1 : 0;
+ break;
+ case OSMO_STREAM_CLI_PAR_SCTP_SOCKOPT_ASCONF_SUPPORTED:
+ if (!val || val_len != sizeof(uint8_t))
+ return -EINVAL;
+ val8 = *(uint8_t *)val;
+ cli->ma_pars.sctp.sockopt_asconf_supported.set = true;
+ cli->ma_pars.sctp.sockopt_asconf_supported.abort_on_failure = val8 > 1;
+ cli->ma_pars.sctp.sockopt_asconf_supported.value = (val8 == 1 || val8 == 3) ? 1 : 0;
+ break;
+ case OSMO_STREAM_CLI_PAR_SCTP_INIT_NUM_OSTREAMS:
+ if (!val || val_len != sizeof(uint16_t))
+ return -EINVAL;
+ cli->ma_pars.sctp.sockopt_initmsg.set = true;
+ cli->ma_pars.sctp.sockopt_initmsg.num_ostreams_present = true;
+ cli->ma_pars.sctp.sockopt_initmsg.num_ostreams_value = *(uint16_t *)val;
+ break;
+ case OSMO_STREAM_CLI_PAR_SCTP_INIT_MAX_INSTREAMS:
+ if (!val || val_len != sizeof(uint16_t))
+ return -EINVAL;
+ cli->ma_pars.sctp.sockopt_initmsg.set = true;
+ cli->ma_pars.sctp.sockopt_initmsg.max_instreams_present = true;
+ cli->ma_pars.sctp.sockopt_initmsg.max_instreams_value = *(uint16_t *)val;
+ break;
+ case OSMO_STREAM_CLI_PAR_SCTP_INIT_MAX_ATTEMPTS:
+ if (!val || val_len != sizeof(uint16_t))
+ return -EINVAL;
+ cli->ma_pars.sctp.sockopt_initmsg.set = true;
+ cli->ma_pars.sctp.sockopt_initmsg.max_attempts_present = true;
+ cli->ma_pars.sctp.sockopt_initmsg.max_attempts_value = *(uint16_t *)val;
+ break;
+ case OSMO_STREAM_CLI_PAR_SCTP_INIT_TIMEOUT:
+ if (!val || val_len != sizeof(uint16_t))
+ return -EINVAL;
+ cli->ma_pars.sctp.sockopt_initmsg.set = true;
+ cli->ma_pars.sctp.sockopt_initmsg.max_init_timeo_present = true;
+ cli->ma_pars.sctp.sockopt_initmsg.max_init_timeo_value = *(uint16_t *)val;
+ break;
+ default:
+ return -ENOENT;
+ };
+ return 0;
+}
+
+/*! @} */
diff --git a/src/stream_srv.c b/src/stream_srv.c
new file mode 100644
index 0000000..dad6b7a
--- /dev/null
+++ b/src/stream_srv.c
@@ -0,0 +1,1215 @@
+/* (C) 2011 by Pablo Neira Ayuso <pablo@gnumonks.org>
+ * (C) 2015-2016 by Harald Welte <laforge@gnumonks.org>
+ * (C) 2023 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
+ * All Rights Reserved.
+ *
+ * SPDX-License-Identifier: GPL-2.0+
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <string.h>
+#include <time.h>
+#include <sys/fcntl.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <sys/ioctl.h>
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+
+#include <osmocom/core/timer.h>
+#include <osmocom/core/select.h>
+#include <osmocom/core/utils.h>
+#include <osmocom/gsm/tlv.h>
+#include <osmocom/core/msgb.h>
+#include <osmocom/core/osmo_io.h>
+#include <osmocom/core/panic.h>
+#include <osmocom/core/logging.h>
+#include <osmocom/core/talloc.h>
+#include <osmocom/core/socket.h>
+
+#include <osmocom/netif/stream.h>
+#include <osmocom/netif/stream_private.h>
+
+#include "config.h"
+
+#include <osmocom/netif/sctp.h>
+
+/*! \file stream_srv.c */
+
+#define LOGSLNK(link, level, fmt, args...) \
+ LOGP(DLINP, level, "SRV(%s,%s) " fmt, \
+ link->name ? : "", \
+ link->sockname, \
+ ## args)
+
+#define LOGSSRV(srv, level, fmt, args...) \
+ LOGP(DLINP, level, "SRVCONN(%s,%s) " fmt, \
+ srv->name ? : "", \
+ srv->sockname, \
+ ## args)
+/*
+ * Server side.
+ */
+
+#define OSMO_STREAM_SRV_F_RECONF (1 << 0)
+#define OSMO_STREAM_SRV_F_NODELAY (1 << 1)
+
+struct osmo_stream_srv_link {
+ struct osmo_fd ofd;
+ char *name;
+ char sockname[OSMO_SOCK_MULTIADDR_PEER_STR_MAXLEN];
+ char *addr[OSMO_STREAM_MAX_ADDRS];
+ uint8_t addrcnt;
+ uint16_t port;
+ int sk_domain;
+ int sk_type;
+ uint16_t proto;
+ osmo_stream_srv_link_accept_cb_t accept_cb;
+ void *data;
+ int flags;
+ struct osmo_sock_init2_multiaddr_pars ma_pars;
+};
+
+static int _setsockopt_nosigpipe(struct osmo_stream_srv_link *link, int new_fd)
+{
+#ifdef SO_NOSIGPIPE
+ int ret;
+ int val = 1;
+ ret = setsockopt(new_fd, SOL_SOCKET, SO_NOSIGPIPE, (void *)&val, sizeof(val));
+ if (ret < 0)
+ LOGSLNK(link, LOGL_ERROR, "Failed setting SO_NOSIGPIPE: %s\n", strerror(errno));
+ return ret;
+#else
+ return 0;
+#endif
+}
+
+static int osmo_stream_srv_link_ofd_cb(struct osmo_fd *ofd, unsigned int what)
+{
+ int ret;
+ int sock_fd;
+ struct osmo_sockaddr osa;
+ socklen_t sa_len = sizeof(osa.u.sas);
+ struct osmo_stream_srv_link *link = ofd->data;
+
+ ret = accept(ofd->fd, &osa.u.sa, &sa_len);
+ if (ret < 0) {
+ LOGSLNK(link, LOGL_ERROR, "failed to accept from origin peer, reason=`%s'\n",
+ strerror(errno));
+ return ret;
+ }
+ sock_fd = ret;
+
+ switch (osa.u.sa.sa_family) {
+ case AF_UNIX:
+ LOGSLNK(link, LOGL_INFO, "accept()ed new link on fd %d\n",
+ sock_fd);
+ _setsockopt_nosigpipe(link, sock_fd);
+ break;
+ case AF_INET6:
+ case AF_INET:
+ LOGSLNK(link, LOGL_INFO, "accept()ed new link from %s\n",
+ osmo_sockaddr_to_str(&osa));
+
+ if (link->proto == IPPROTO_SCTP) {
+ _setsockopt_nosigpipe(link, sock_fd);
+ ret = stream_sctp_sock_activate_events(sock_fd);
+ if (ret < 0)
+ goto error_close_socket;
+ }
+ break;
+ default:
+ LOGSLNK(link, LOGL_ERROR, "accept()ed unexpected address family %d\n",
+ osa.u.sa.sa_family);
+ goto error_close_socket;
+ }
+
+ if (link->flags & OSMO_STREAM_SRV_F_NODELAY) {
+ ret = stream_setsockopt_nodelay(sock_fd, link->proto, 1);
+ if (ret < 0)
+ goto error_close_socket;
+ }
+
+ if (!link->accept_cb) {
+ ret = -ENOTSUP;
+ goto error_close_socket;
+ }
+
+ ret = link->accept_cb(link, sock_fd);
+ if (ret)
+ goto error_close_socket;
+ return 0;
+
+error_close_socket:
+ close(sock_fd);
+ return ret;
+}
+
+/*! \addtogroup stream_srv
+ * @{
+ */
+
+/*! Create an Osmocom Stream Server Link.
+ * A Stream Server Link is the listen()+accept() "parent" to individual connections from remote clients.
+ * \param[in] ctx talloc allocation context
+ * \returns Stream Server Link with default values (AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP)
+ */
+struct osmo_stream_srv_link *osmo_stream_srv_link_create(void *ctx)
+{
+ struct osmo_stream_srv_link *link;
+
+ link = talloc_zero(ctx, struct osmo_stream_srv_link);
+ if (!link)
+ return NULL;
+
+ link->sk_domain = AF_UNSPEC;
+ link->sk_type = SOCK_STREAM;
+ link->proto = IPPROTO_TCP;
+ osmo_fd_setup(&link->ofd, -1, OSMO_FD_READ | OSMO_FD_WRITE, osmo_stream_srv_link_ofd_cb, link, 0);
+
+ link->ma_pars.sctp.version = 0;
+
+ return link;
+}
+
+/*! Set a name on the srv_link object (used during logging).
+ * \param[in] link server link whose name is to be set. The name is copied into the osmo_stream_srv_link, so
+ * the caller memory is not required to be valid beyond the call of this function.
+ * \param[in] name the name to be set on link
+ */
+void osmo_stream_srv_link_set_name(struct osmo_stream_srv_link *link, const char *name)
+{
+ osmo_talloc_replace_string(link, &link->name, name);
+}
+
+/*! Retrieve name previously set on the srv_link object (see osmo_stream_srv_link_set_name()).
+ * \param[in] link server link whose name is to be retrieved
+ * \returns The name to be set on link; NULL if never set
+ */
+const char *osmo_stream_srv_link_get_name(const struct osmo_stream_srv_link *link)
+{
+ return link->name;
+}
+
+/*! Set the NODELAY socket option to avoid Nagle-like behavior.
+ * Setting this to nodelay=true will automatically set the NODELAY
+ * socket option on any socket established via this server link, before
+ * calling the accept_cb()
+ * \param[in] link server link whose sockets are to be configured
+ * \param[in] nodelay whether to set (true) NODELAY after accept
+ */
+void osmo_stream_srv_link_set_nodelay(struct osmo_stream_srv_link *link, bool nodelay)
+{
+ if (nodelay)
+ link->flags |= OSMO_STREAM_SRV_F_NODELAY;
+ else
+ link->flags &= ~OSMO_STREAM_SRV_F_NODELAY;
+}
+
+/*! Set the local address to which we bind.
+ * Any changes to this setting will only become active upon next (re)connect.
+ * \param[in] link Stream Server Link to modify
+ * \param[in] addr Local IP address
+ */
+void osmo_stream_srv_link_set_addr(struct osmo_stream_srv_link *link,
+ const char *addr)
+{
+ osmo_stream_srv_link_set_addrs(link, &addr, 1);
+}
+
+/*! Set the local address set to which we bind.
+ * Useful for protocols allowing bind on more than one address (such as SCTP)
+ * Any changes to this setting will only become active upon next (re)connect.
+ * \param[in] link Stream Server Link to modify
+ * \param[in] addr Local IP address
+ * \return negative on error, 0 on success
+ */
+int osmo_stream_srv_link_set_addrs(struct osmo_stream_srv_link *link, const char **addr, size_t addrcnt)
+{
+ int i = 0;
+
+ if (addrcnt > OSMO_STREAM_MAX_ADDRS)
+ return -EINVAL;
+
+ for (; i < addrcnt; i++)
+ osmo_talloc_replace_string(link, &link->addr[i], addr[i]);
+ for (; i < link->addrcnt; i++) {
+ talloc_free(link->addr[i]);
+ link->addr[i] = NULL;
+ }
+
+ link->addrcnt = addrcnt;
+ link->flags |= OSMO_STREAM_SRV_F_RECONF;
+ return 0;
+}
+
+/*! Set the local port number to which we bind.
+ * Any changes to this setting will only become active upon next (re)connect.
+ * \param[in] link Stream Server Link to modify
+ * \param[in] port Local port number
+ */
+void osmo_stream_srv_link_set_port(struct osmo_stream_srv_link *link,
+ uint16_t port)
+{
+ link->port = port;
+ link->flags |= OSMO_STREAM_SRV_F_RECONF;
+}
+
+/*! Set the protocol for the stream server link.
+ * Any changes to this setting will only become active upon next (re)connect.
+ * \param[in] link Stream Server Link to modify
+ * \param[in] proto Protocol (like IPPROTO_TCP (default), IPPROTO_SCTP, ...)
+ */
+void
+osmo_stream_srv_link_set_proto(struct osmo_stream_srv_link *link,
+ uint16_t proto)
+{
+ link->proto = proto;
+ link->flags |= OSMO_STREAM_SRV_F_RECONF;
+}
+
+
+/*! Set the socket type for the stream server link.
+ * Any changes to this setting will only become active upon next (re)connect.
+ * \param[in] link Stream Server Link to modify
+ * \param[in] type Socket Type (like SOCK_STREAM (default), SOCK_SEQPACKET, ...)
+ * \returns zero on success, negative on error.
+ */
+int osmo_stream_srv_link_set_type(struct osmo_stream_srv_link *link, int type)
+{
+ switch (type) {
+ case SOCK_STREAM:
+ case SOCK_SEQPACKET:
+ break;
+ default:
+ return -ENOTSUP;
+ }
+ link->sk_type = type;
+ link->flags |= OSMO_STREAM_SRV_F_RECONF;
+ return 0;
+}
+
+/*! Set the socket type for the stream server link.
+ * Any changes to this setting will only become active upon next (re)connect.
+ * \param[in] link Stream Server Link to modify
+ * \param[in] type Socket Domain (like AF_UNSPEC (default for IP), AF_UNIX, AF_INET, ...)
+ * \returns zero on success, negative on error.
+ */
+int osmo_stream_srv_link_set_domain(struct osmo_stream_srv_link *link, int domain)
+{
+ switch (domain) {
+ case AF_UNSPEC:
+ case AF_INET:
+ case AF_INET6:
+ case AF_UNIX:
+ break;
+ default:
+ return -ENOTSUP;
+ }
+ link->sk_domain = domain;
+ link->flags |= OSMO_STREAM_SRV_F_RECONF;
+ return 0;
+}
+
+/*! Set application private data of the stream server link.
+ * \param[in] link Stream Server Link to modify
+ * \param[in] data User-specific data (available in call-back functions) */
+void
+osmo_stream_srv_link_set_data(struct osmo_stream_srv_link *link,
+ void *data)
+{
+ link->data = data;
+}
+
+/*! Retrieve application private data of the stream server link.
+ * \param[in] link Stream Server Link to modify
+ * \returns Application private data, as set by \ref osmo_stream_cli_set_data() */
+void *osmo_stream_srv_link_get_data(struct osmo_stream_srv_link *link)
+{
+ return link->data;
+}
+
+/* Similar to osmo_sock_multiaddr_get_name_buf(), but aimed at listening sockets (only local part): */
+static char *get_local_sockname_buf(char *buf, size_t buf_len, const struct osmo_stream_srv_link *link)
+{
+ struct osmo_strbuf sb = { .buf = buf, .len = buf_len };
+ int rc;
+
+ if (buf_len > 0)
+ buf[0] = '\0';
+
+ switch (link->sk_domain) {
+ case AF_UNSPEC:
+ /* we assume INET(6) by default upon link creation: */
+ case AF_INET:
+ case AF_INET6:
+ {
+ char hostbuf[OSMO_STREAM_MAX_ADDRS][INET6_ADDRSTRLEN];
+ size_t num_hostbuf = ARRAY_SIZE(hostbuf);
+ char portbuf[6];
+ bool need_more_bufs;
+ rc = osmo_sock_multiaddr_get_ip_and_port(link->ofd.fd, link->proto, &hostbuf[0][0],
+ &num_hostbuf, sizeof(hostbuf[0]),
+ portbuf, sizeof(portbuf), true);
+ if (rc < 0)
+ return NULL;
+ need_more_bufs = num_hostbuf > ARRAY_SIZE(hostbuf);
+ if (need_more_bufs)
+ num_hostbuf = ARRAY_SIZE(hostbuf);
+ OSMO_STRBUF_APPEND(sb, osmo_multiaddr_ip_and_port_snprintf,
+ &hostbuf[0][0], num_hostbuf, sizeof(hostbuf[0]), portbuf);
+ if (need_more_bufs)
+ OSMO_STRBUF_PRINTF(sb, "<need-more-bufs!>");
+ return buf;
+ }
+ case AF_UNIX:
+ {
+ struct osmo_sockaddr osa;
+ struct sockaddr_un *sun;
+ socklen_t len = sizeof(osa.u.sas);
+ rc = getsockname(link->ofd.fd, &osa.u.sa, &len);
+ if (rc < 0) {
+ OSMO_STRBUF_PRINTF(sb, "<error-in-getsockname>");
+ return buf;
+ }
+ /* Make sure sun_path is NULL terminated: */
+ sun = (struct sockaddr_un *)&osa.u.sa;
+ sun->sun_path[sizeof(sun->sun_path) - 1] = '\0';
+ OSMO_STRBUF_PRINTF(sb, "%s", sun->sun_path);
+ return buf;
+ }
+ default:
+ return NULL;
+ }
+}
+
+/*! Retrieve description of the stream server link e. g. 127.0.0.1:1234.
+ * Calling this function will build a string that describes the socket in terms of its local/remote
+ * address/port. The returned name is stored in a static buffer; it is hence not re-entrant or thread-safe.
+ * \param[in] link Stream Server Link to examine
+ * \returns Link description or NULL in case of error */
+char *osmo_stream_srv_link_get_sockname(const struct osmo_stream_srv_link *link)
+{
+ static char buf[sizeof(link->sockname)];
+
+ if (!get_local_sockname_buf(buf, sizeof(buf), link))
+ return NULL;
+ return buf;
+}
+
+/*! Retrieve Osmocom File Descriptor of the stream server link.
+ * \param[in] link Stream Server Link
+ * \returns Pointer to \ref osmo_fd */
+struct osmo_fd *
+osmo_stream_srv_link_get_ofd(struct osmo_stream_srv_link *link)
+{
+ return &link->ofd;
+}
+
+/*! Retrieve File Descriptor of the stream server link.
+ * \param[in] conn Stream Server Link
+ * \returns file descriptor or negative on error */
+int osmo_stream_srv_link_get_fd(const struct osmo_stream_srv_link *link)
+{
+ return link->ofd.fd;
+}
+
+/*! Set the accept() call-back of the stream server link.
+ * The provided call-back will be called whenever a new inbound connection
+ * is accept()ed. The call-back then typically creates a new osmo_stream_srv.
+ * If the call-back returns a negative value, the file descriptor will be closed.
+ * \param[in] link Stream Server Link
+ * \param[in] accept_cb Call-back function executed upon accept() */
+void osmo_stream_srv_link_set_accept_cb(struct osmo_stream_srv_link *link,
+ int (*accept_cb)(struct osmo_stream_srv_link *link, int fd))
+
+{
+ link->accept_cb = accept_cb;
+}
+
+/*! Destroy the stream server link. Closes + Releases Memory.
+ * \param[in] link Stream Server Link */
+void osmo_stream_srv_link_destroy(struct osmo_stream_srv_link *link)
+{
+ osmo_stream_srv_link_close(link);
+ talloc_free(link);
+}
+
+/*! Open the stream server link. This actually initializes the
+ * underlying socket and binds it to the configured ip/port.
+ * \param[in] link Stream Server Link to open
+ * \return negative on error, 0 on success */
+int osmo_stream_srv_link_open(struct osmo_stream_srv_link *link)
+{
+ int ret;
+
+ if (link->ofd.fd >= 0) {
+ /* No reconfigure needed for existing socket, we are fine */
+ if (!(link->flags & OSMO_STREAM_SRV_F_RECONF))
+ return 0;
+ /* we are reconfiguring this socket, close existing first. */
+ osmo_stream_srv_link_close(link);
+ }
+
+ link->flags &= ~OSMO_STREAM_SRV_F_RECONF;
+
+ switch (link->sk_domain) {
+ case AF_UNIX:
+ ret = osmo_sock_unix_init(link->sk_type, 0, link->addr[0], OSMO_SOCK_F_BIND);
+ break;
+ case AF_UNSPEC:
+ case AF_INET:
+ case AF_INET6:
+ switch (link->proto) {
+#ifdef HAVE_LIBSCTP
+ case IPPROTO_SCTP:
+ ret = osmo_sock_init2_multiaddr2(link->sk_domain, link->sk_type, link->proto,
+ (const char **)link->addr, link->addrcnt, link->port,
+ NULL, 0, 0, OSMO_SOCK_F_BIND, &link->ma_pars);
+ break;
+#endif
+ default:
+ ret = osmo_sock_init(link->sk_domain, link->sk_type, link->proto,
+ link->addr[0], link->port, OSMO_SOCK_F_BIND);
+ }
+ break;
+ default:
+ ret = -ENOTSUP;
+ }
+ if (ret < 0)
+ return ret;
+
+ link->ofd.fd = ret;
+ if (osmo_fd_register(&link->ofd) < 0) {
+ close(ret);
+ link->ofd.fd = -1;
+ return -EIO;
+ }
+
+ get_local_sockname_buf(link->sockname, sizeof(link->sockname), link);
+ return 0;
+}
+
+/*! Check whether the stream server link is opened.
+ * \param[in] link Stream Server Link to check */
+bool osmo_stream_srv_link_is_opened(const struct osmo_stream_srv_link *link)
+{
+ if (!link)
+ return false;
+
+ if (link->ofd.fd == -1)
+ return false;
+
+ return true;
+}
+
+/*! Close the stream server link and unregister from select loop.
+ * Does not destroy the server link, merely closes it!
+ * \param[in] link Stream Server Link to close */
+void osmo_stream_srv_link_close(struct osmo_stream_srv_link *link)
+{
+ if (!osmo_stream_srv_link_is_opened(link))
+ return;
+
+ osmo_fd_unregister(&link->ofd);
+ close(link->ofd.fd);
+ link->ofd.fd = -1;
+}
+
+/*! Set given parameter of stream_srv_link to given value.
+ * \param[in] cli stream client on which to set parameter.
+ * \param[in] par identifier of the parameter to be set.
+ * \param[in] val value of the parameter to be set.
+ * \param[in] val_len length of the parameter value.
+ * \returns 0 in success; negative -errno on error. */
+int osmo_stream_srv_link_set_param(struct osmo_stream_srv_link *link, enum osmo_stream_srv_link_param par,
+ void *val, size_t val_len)
+{
+ OSMO_ASSERT(link);
+ uint8_t val8;
+
+ switch (par) {
+ case OSMO_STREAM_SRV_LINK_PAR_SCTP_SOCKOPT_AUTH_SUPPORTED:
+ if (!val || val_len != sizeof(uint8_t))
+ return -EINVAL;
+ val8 = *(uint8_t *)val;
+ link->ma_pars.sctp.sockopt_auth_supported.set = true;
+ link->ma_pars.sctp.sockopt_auth_supported.abort_on_failure = val8 > 1;
+ link->ma_pars.sctp.sockopt_auth_supported.value = (val8 == 1 || val8 == 3) ? 1 : 0;
+ break;
+ case OSMO_STREAM_SRV_LINK_PAR_SCTP_SOCKOPT_ASCONF_SUPPORTED:
+ if (!val || val_len != sizeof(uint8_t))
+ return -EINVAL;
+ val8 = *(uint8_t *)val;
+ link->ma_pars.sctp.sockopt_asconf_supported.set = true;
+ link->ma_pars.sctp.sockopt_asconf_supported.abort_on_failure = val8 > 1;
+ link->ma_pars.sctp.sockopt_asconf_supported.value = (val8 == 1 || val8 == 3) ? 1 : 0;
+ break;
+ case OSMO_STREAM_SRV_LINK_PAR_SCTP_INIT_NUM_OSTREAMS:
+ if (!val || val_len != sizeof(uint16_t))
+ return -EINVAL;
+ link->ma_pars.sctp.sockopt_initmsg.set = true;
+ link->ma_pars.sctp.sockopt_initmsg.num_ostreams_present = true;
+ link->ma_pars.sctp.sockopt_initmsg.num_ostreams_value = *(uint16_t *)val;
+ break;
+ case OSMO_STREAM_SRV_LINK_PAR_SCTP_INIT_MAX_INSTREAMS:
+ if (!val || val_len != sizeof(uint16_t))
+ return -EINVAL;
+ link->ma_pars.sctp.sockopt_initmsg.set = true;
+ link->ma_pars.sctp.sockopt_initmsg.max_instreams_present = true;
+ link->ma_pars.sctp.sockopt_initmsg.max_instreams_value = *(uint16_t *)val;
+ break;
+ default:
+ return -ENOENT;
+ };
+ return 0;
+}
+
+/*! @} */
+
+#define OSMO_STREAM_SRV_F_FLUSH_DESTROY (1 << 0)
+
+struct osmo_stream_srv {
+ struct osmo_stream_srv_link *srv;
+ char *name;
+ char sockname[OSMO_SOCK_NAME_MAXLEN];
+ enum osmo_stream_mode mode;
+ union {
+ struct osmo_fd ofd;
+ struct osmo_io_fd *iofd;
+ };
+ struct llist_head tx_queue;
+ osmo_stream_srv_closed_cb_t closed_cb;
+ osmo_stream_srv_read_cb_t read_cb;
+ osmo_stream_srv_read_cb2_t iofd_read_cb;
+ void *data;
+ int flags;
+};
+
+/*! \addtogroup stream_srv
+ * @{
+ */
+
+static void stream_srv_iofd_read_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg)
+{
+ struct osmo_stream_srv *conn = osmo_iofd_get_data(iofd);
+
+ switch (res) {
+ case -EPIPE:
+ case -ECONNRESET:
+ LOGSSRV(conn, LOGL_ERROR, "lost connection with client (%d)\n", res);
+ break;
+ case 0:
+ LOGSSRV(conn, LOGL_NOTICE, "connection closed with client\n");
+ break;
+ default:
+ LOGSSRV(conn, LOGL_DEBUG, "received %d bytes from client\n", res);
+ break;
+ }
+ if (OSMO_UNLIKELY(conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY)) {
+ LOGSSRV(conn, LOGL_INFO, "Connection is being flushed and closed; ignoring received message\n");
+ msgb_free(msg);
+ if (osmo_iofd_txqueue_len(iofd) == 0)
+ osmo_stream_srv_destroy(conn);
+ return;
+ }
+
+ if (conn->iofd_read_cb)
+ conn->iofd_read_cb(conn, res, msg);
+ else
+ msgb_free(msg);
+}
+
+static void stream_srv_iofd_write_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg)
+{
+ struct osmo_stream_srv *conn = osmo_iofd_get_data(iofd);
+ LOGSSRV(conn, LOGL_DEBUG, "connected write\n");
+
+ if (res < 0)
+ LOGSSRV(conn, LOGL_ERROR, "error to send: %s\n", strerror(errno));
+
+ if (osmo_iofd_txqueue_len(iofd) == 0)
+ if (conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY)
+ osmo_stream_srv_destroy(conn);
+}
+
+static const struct osmo_io_ops srv_ioops = {
+ .read_cb = stream_srv_iofd_read_cb,
+ .write_cb = stream_srv_iofd_write_cb,
+};
+
+#ifdef HAVE_LIBSCTP
+static void stream_srv_iofd_recvmsg_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg, const struct msghdr *msgh)
+{
+ struct osmo_stream_srv *conn = osmo_iofd_get_data(iofd);
+ LOGSSRV(conn, LOGL_DEBUG, "message received (res=%d)\n", res);
+
+ res = stream_iofd_sctp_recvmsg_trailer(iofd, msg, res, msgh);
+
+ switch (res) {
+ case -EPIPE:
+ case -ECONNRESET:
+ LOGSSRV(conn, LOGL_ERROR, "lost connection with client (%d)\n", res);
+ break;
+ case 0:
+ LOGSSRV(conn, LOGL_NOTICE, "connection closed with client\n");
+ break;
+ default:
+ if (OSMO_LIKELY(res > 0))
+ LOGSSRV(conn, LOGL_DEBUG, "received %u bytes from client\n", res);
+ break;
+ }
+ if (OSMO_UNLIKELY(conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY)) {
+ LOGSSRV(conn, LOGL_INFO, "Connection is being flushed and closed; ignoring received message\n");
+ msgb_free(msg);
+ if (osmo_iofd_txqueue_len(iofd) == 0)
+ osmo_stream_srv_destroy(conn);
+ return;
+ }
+
+ if (conn->iofd_read_cb)
+ conn->iofd_read_cb(conn, res, msg);
+ else
+ msgb_free(msg);
+}
+
+static const struct osmo_io_ops srv_ioops_sctp = {
+ .recvmsg_cb = stream_srv_iofd_recvmsg_cb,
+ .sendmsg_cb = stream_srv_iofd_write_cb,
+};
+#endif
+
+static int osmo_stream_srv_read(struct osmo_stream_srv *conn)
+{
+ int rc = 0;
+
+ LOGSSRV(conn, LOGL_DEBUG, "message received\n");
+
+ if (conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY) {
+ LOGSSRV(conn, LOGL_INFO, "Connection is being flushed and closed; ignoring received message\n");
+ return 0;
+ }
+
+ if (conn->read_cb)
+ rc = conn->read_cb(conn);
+
+ return rc;
+}
+
+static void osmo_stream_srv_write(struct osmo_stream_srv *conn)
+{
+#ifdef HAVE_LIBSCTP
+ struct sctp_sndrcvinfo sinfo;
+#endif
+ struct msgb *msg;
+ int ret;
+
+ if (llist_empty(&conn->tx_queue)) {
+ osmo_fd_write_disable(&conn->ofd);
+ return;
+ }
+ msg = llist_first_entry(&conn->tx_queue, struct msgb, list);
+ llist_del(&msg->list);
+
+ LOGSSRV(conn, LOGL_DEBUG, "sending %u bytes of data\n", msg->len);
+
+ switch (conn->srv->sk_domain) {
+ case AF_UNIX:
+ ret = send(conn->ofd.fd, msgb_data(msg), msgb_length(msg), 0);
+ break;
+ case AF_INET:
+ case AF_INET6:
+ case AF_UNSPEC:
+ switch (conn->srv->proto) {
+#ifdef HAVE_LIBSCTP
+ case IPPROTO_SCTP:
+ memset(&sinfo, 0, sizeof(sinfo));
+ sinfo.sinfo_ppid = htonl(msgb_sctp_ppid(msg));
+ sinfo.sinfo_stream = msgb_sctp_stream(msg);
+ ret = sctp_send(conn->ofd.fd, msgb_data(msg), msgb_length(msg),
+ &sinfo, MSG_NOSIGNAL);
+ break;
+#endif
+ case IPPROTO_TCP:
+ default:
+ ret = send(conn->ofd.fd, msgb_data(msg), msgb_length(msg), 0);
+ break;
+ }
+ break;
+ default:
+ ret = -1;
+ errno = ENOTSUP;
+ }
+
+ if (ret >= 0 && ret < msgb_length(msg)) {
+ LOGSSRV(conn, LOGL_ERROR, "short send: %d < exp %u\n", ret, msgb_length(msg));
+ /* Update msgb and re-add it at the start of the queue: */
+ msgb_pull(msg, ret);
+ llist_add(&msg->list, &conn->tx_queue);
+ return;
+ }
+
+ if (ret == -1) {/* send(): On error -1 is returned, and errno is set appropriately */
+ int err = errno;
+ LOGSSRV(conn, LOGL_ERROR, "send(len=%u) error: %s\n", msgb_length(msg), strerror(err));
+ if (err == EAGAIN) {
+ /* Re-add at the start of the queue to re-attempt: */
+ llist_add(&msg->list, &conn->tx_queue);
+ return;
+ }
+ msgb_free(msg);
+ osmo_stream_srv_destroy(conn);
+ return;
+ }
+
+ msgb_free(msg);
+
+ if (llist_empty(&conn->tx_queue)) {
+ osmo_fd_write_disable(&conn->ofd);
+ if (conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY)
+ osmo_stream_srv_destroy(conn);
+ }
+}
+
+static int osmo_stream_srv_cb(struct osmo_fd *ofd, unsigned int what)
+{
+ struct osmo_stream_srv *conn = ofd->data;
+ int rc = 0;
+
+ LOGSSRV(conn, LOGL_DEBUG, "connected read/write (what=0x%x)\n", what);
+ if (what & OSMO_FD_READ)
+ rc = osmo_stream_srv_read(conn);
+ if (rc != -EBADF && (what & OSMO_FD_WRITE))
+ osmo_stream_srv_write(conn);
+
+ return rc;
+}
+
+
+/*! Create a legacy osmo_fd mode Stream Server inside the specified link.
+ *
+ * This is the function an application traditionally calls from within the
+ * accept_cb call-back of the osmo_stream_srv_link. It creates a new
+ * osmo_stream_srv within that link.
+ *
+ * New users/programs should use osmo_stream_srv_create2 to operate in osmo_io
+ * mode instead.
+ *
+ * \param[in] ctx talloc allocation context from which to allocate
+ * \param[in] link Stream Server Link to which we belong
+ * \param[in] fd system file descriptor of the new connection
+ * \param[in] read_cb Call-back to call when the socket is readable
+ * \param[in] closed_cb Call-back to call when the connection is closed
+ * \param[in] data User data to save in the new Stream Server struct
+ * \returns Stream Server in case of success; NULL on error */
+struct osmo_stream_srv *
+osmo_stream_srv_create(void *ctx, struct osmo_stream_srv_link *link, int fd,
+ osmo_stream_srv_read_cb_t read_cb,
+ osmo_stream_srv_closed_cb_t closed_cb,
+ void *data)
+{
+ struct osmo_stream_srv *conn;
+
+ OSMO_ASSERT(link);
+
+ conn = talloc_zero(ctx, struct osmo_stream_srv);
+ if (conn == NULL)
+ return NULL;
+
+ conn->mode = OSMO_STREAM_MODE_OSMO_FD;
+ conn->srv = link;
+ osmo_fd_setup(&conn->ofd, fd, OSMO_FD_READ, osmo_stream_srv_cb, conn, 0);
+ conn->read_cb = read_cb;
+ conn->closed_cb = closed_cb;
+ conn->data = data;
+ INIT_LLIST_HEAD(&conn->tx_queue);
+
+ osmo_sock_get_name_buf(conn->sockname, sizeof(conn->sockname), fd);
+
+ if (osmo_fd_register(&conn->ofd) < 0) {
+ LOGSSRV(conn, LOGL_ERROR, "could not register FD\n");
+ talloc_free(conn);
+ return NULL;
+ }
+ return conn;
+}
+
+/*! Create an osmo_iofd mode Stream Server inside the specified link.
+ *
+ * This is the function an application typically calls from within the
+ * accept_cb call-back of the osmo_stream_srv_link. It creates a new
+ * osmo_stream_srv in osmo_io mode within that link.
+ *
+ * \param[in] ctx talloc allocation context from which to allocate
+ * \param[in] link Stream Server Link to which we belong
+ * \param[in] fd system file descriptor of the new connection
+ * \param[in] data User data to save in the new Stream Server struct
+ * \returns Stream Server in case of success; NULL on error */
+struct osmo_stream_srv *
+osmo_stream_srv_create2(void *ctx, struct osmo_stream_srv_link *link, int fd, void *data)
+{
+ struct osmo_stream_srv *conn;
+
+ OSMO_ASSERT(link);
+
+ conn = talloc_zero(ctx, struct osmo_stream_srv);
+ if (conn == NULL)
+ return NULL;
+
+ conn->mode = OSMO_STREAM_MODE_OSMO_IO;
+ conn->srv = link;
+
+ osmo_sock_get_name_buf(conn->sockname, sizeof(conn->sockname), fd);
+
+ if (link->proto == IPPROTO_SCTP) {
+ conn->iofd = osmo_iofd_setup(conn, fd, conn->sockname, OSMO_IO_FD_MODE_RECVMSG_SENDMSG,
+ &srv_ioops_sctp, conn);
+ if (conn->iofd)
+ osmo_iofd_set_cmsg_size(conn->iofd, CMSG_SPACE(sizeof(struct sctp_sndrcvinfo)));
+ } else {
+ conn->iofd = osmo_iofd_setup(conn, fd, conn->sockname, OSMO_IO_FD_MODE_READ_WRITE,
+ &srv_ioops, conn);
+ }
+ if (!conn->iofd) {
+ talloc_free(conn);
+ return NULL;
+ }
+ conn->data = data;
+
+ if (osmo_iofd_register(conn->iofd, fd) < 0) {
+ LOGSSRV(conn, LOGL_ERROR, "could not register FD %d\n", fd);
+ talloc_free(conn);
+ return NULL;
+ }
+
+ return conn;
+}
+
+/*! Set a name on the srv object (used during logging).
+ * \param[in] conn server whose name is to be set. The name is copied into the osmo_stream_srv_link, so
+ * the caller memory is not required to be valid beyond the call of this function.
+ * \param[in] name the name to be set on conn
+ */
+void osmo_stream_srv_set_name(struct osmo_stream_srv *conn, const char *name)
+{
+ osmo_talloc_replace_string(conn, &conn->name, name);
+ if (conn->mode == OSMO_STREAM_MODE_OSMO_IO && conn->iofd)
+ osmo_iofd_set_name(conn->iofd, name);
+}
+
+/*! Retrieve name previously set on the srv object (see osmo_stream_srv_set_name()).
+ * \param[in] conn server whose name is to be retrieved
+ * \returns The name to be set on conn; NULL if never set
+ */
+const char *osmo_stream_srv_get_name(const struct osmo_stream_srv *conn)
+{
+ return conn->name;
+}
+
+/*! Set the call-back function for incoming data on an osmo_io stream_srv.
+ *
+ * This function only works with osmo_stream_srv in osmo_io mode, created by osmo_stream_srv_create2()!
+ *
+ * Whenever data is received on the osmo_stram_srv, the read_cb call-back function of the user application is
+ * called.
+ *
+ * \param[in] conn Stream Server to modify
+ * \param[in] read_cb Call-back function to be called when data was read */
+void osmo_stream_srv_set_read_cb(struct osmo_stream_srv *conn,
+ osmo_stream_srv_read_cb2_t read_cb)
+{
+ OSMO_ASSERT(conn && conn->mode == OSMO_STREAM_MODE_OSMO_IO);
+ conn->iofd_read_cb = read_cb;
+}
+
+/*! Set the call-back function called when the stream server socket was closed.
+ * Whenever the socket was closed (network error, client disconnect, etc.), the user-provided
+ * call-back function given here is called. This is typically used by the application to clean up any of its
+ * internal state related to this specific client/connection.
+ * \param[in] conn Stream Server to modify
+ * \param[in] closed_cb Call-back function to be called when the connection was closed */
+void osmo_stream_srv_set_closed_cb(struct osmo_stream_srv *conn,
+ osmo_stream_srv_closed_cb_t closed_cb)
+{
+ OSMO_ASSERT(conn);
+ conn->closed_cb = closed_cb;
+}
+
+/*! Prepare to send out all pending messages on the connection's Tx queue.
+ * and then automatically destroy the stream with osmo_stream_srv_destroy().
+ * This function disables queuing of new messages on the connection and also
+ * disables reception of new messages on the connection.
+ * \param[in] conn Stream Server to modify */
+void osmo_stream_srv_set_flush_and_destroy(struct osmo_stream_srv *conn)
+{
+ conn->flags |= OSMO_STREAM_SRV_F_FLUSH_DESTROY;
+}
+
+/*! Set application private data of the stream server.
+ * \param[in] conn Stream Server to modify
+ * \param[in] data User-specific data (available in call-back functions) */
+void
+osmo_stream_srv_set_data(struct osmo_stream_srv *conn,
+ void *data)
+{
+ conn->data = data;
+}
+
+/*! Set the segmentation callback for target osmo_stream_srv structure.
+ *
+ * A segmentation call-back can optionally be used when a packet based protocol (like TCP) is used within a
+ * STREAM style socket that does not preserve message boundaries within the stream. If a segmentation
+ * call-back is given, the osmo_stream_srv library code will makes sure that the read_cb called only for
+ * complete single messages, and not arbitrary segments of the stream.
+ *
+ * This function only works with osmo_stream_srv in osmo_io mode, created by osmo_stream_srv_create2()!
+ * The connection has to have been established prior to calling this function.
+ *
+ * \param[in,out] conn Target Stream Server to modify
+ * \param[in] segmentation_cb Segmentation callback to be set */
+void osmo_stream_srv_set_segmentation_cb(struct osmo_stream_srv *conn,
+ osmo_stream_srv_segmentation_cb_t segmentation_cb)
+{
+ /* Note that the following implies that iofd != NULL, since
+ * osmo_stream_srv_create2() creates the iofd member, too */
+ OSMO_ASSERT(conn->mode == OSMO_STREAM_MODE_OSMO_IO);
+ /* Copy default settings */
+ struct osmo_io_ops conn_ops;
+ osmo_iofd_get_ioops(conn->iofd, &conn_ops);
+ /* Set segmentation cb for this connection */
+ conn_ops.segmentation_cb = segmentation_cb;
+ osmo_iofd_set_ioops(conn->iofd, &conn_ops);
+}
+
+/*! Retrieve application private data of the stream server
+ * \param[in] conn Stream Server
+ * \returns Application private data, as set by \ref osmo_stream_srv_set_data() */
+void *osmo_stream_srv_get_data(struct osmo_stream_srv *conn)
+{
+ return conn->data;
+}
+
+/*! Retrieve the stream server socket description.
+ * The returned name is stored in a static buffer; it is hence not re-entrant or thread-safe!
+ * \param[in] cli Stream Server to examine
+ * \returns Socket description or NULL in case of error */
+const char *osmo_stream_srv_get_sockname(const struct osmo_stream_srv *conn)
+{
+ static char buf[OSMO_STREAM_MAX_ADDRS * OSMO_SOCK_NAME_MAXLEN];
+
+ osmo_sock_multiaddr_get_name_buf(buf, sizeof(buf),
+ osmo_stream_srv_get_fd(conn), conn->srv->proto);
+
+ return buf;
+}
+
+/*! Retrieve Osmocom File Descriptor of a stream server in osmo_fd mode.
+ * \param[in] conn Stream Server
+ * \returns Pointer to \ref osmo_fd */
+struct osmo_fd *
+osmo_stream_srv_get_ofd(struct osmo_stream_srv *conn)
+{
+ OSMO_ASSERT(conn->mode == OSMO_STREAM_MODE_OSMO_FD);
+ return &conn->ofd;
+}
+
+/*! Retrieve File Descriptor of the stream server
+ * \param[in] conn Stream Server
+ * \returns file descriptor or negative on error */
+int
+osmo_stream_srv_get_fd(const struct osmo_stream_srv *conn)
+{
+ switch (conn->mode) {
+ case OSMO_STREAM_MODE_OSMO_FD:
+ return conn->ofd.fd;
+ case OSMO_STREAM_MODE_OSMO_IO:
+ if (conn->iofd)
+ return osmo_iofd_get_fd(conn->iofd);
+ default:
+ break;
+ }
+ return -EINVAL;
+}
+
+/*! Retrieve osmo_io descriptor of the stream server socket.
+ * This function must not be called on a stream server in legacy osmo_fd mode!
+ * \param[in] srv Stream Server of which we want to obtain the osmo_io descriptor
+ * \returns osmo_io_fd of stream server. */
+struct osmo_io_fd *
+osmo_stream_srv_get_iofd(const struct osmo_stream_srv *srv)
+{
+ OSMO_ASSERT(srv->mode == OSMO_STREAM_MODE_OSMO_IO);
+ return srv->iofd;
+}
+
+/*! Retrieve the master (Link) from a Stream Server.
+ * \param[in] conn Stream Server of which we want to know the Link
+ * \returns Link through which the given Stream Server is established */
+struct osmo_stream_srv_link *osmo_stream_srv_get_master(struct osmo_stream_srv *conn)
+{
+ return conn->srv;
+}
+
+/*! Destroy given Stream Server.
+ * This function closes the Stream Server socket, unregisters from the underlying I/O mechanism, invokes the
+ * connection's closed_cb() callback to allow API users to clean up any associated state they have for this
+ * connection, and then de-allocates associated memory.
+ * \param[in] conn Stream Server to be destroyed */
+void osmo_stream_srv_destroy(struct osmo_stream_srv *conn)
+{
+ switch (conn->mode) {
+ case OSMO_STREAM_MODE_OSMO_FD:
+ osmo_fd_unregister(&conn->ofd);
+ close(conn->ofd.fd);
+ msgb_queue_free(&conn->tx_queue);
+ conn->ofd.fd = -1;
+ break;
+ case OSMO_STREAM_MODE_OSMO_IO:
+ osmo_iofd_free(conn->iofd);
+ conn->iofd = NULL;
+ break;
+ default:
+ OSMO_ASSERT(false);
+ }
+ if (conn->closed_cb)
+ conn->closed_cb(conn);
+ talloc_free(conn);
+}
+
+/*! Enqueue data to be sent via an Osmocom stream server.
+ * \param[in] conn Stream Server through which we want to send
+ * \param[in] msg Message buffer to enqueue in transmit queue */
+void osmo_stream_srv_send(struct osmo_stream_srv *conn, struct msgb *msg)
+{
+ int rc;
+
+ OSMO_ASSERT(conn);
+ OSMO_ASSERT(msg);
+ if (conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY) {
+ LOGSSRV(conn, LOGL_DEBUG, "Connection is being flushed and closed; ignoring new outgoing message\n");
+ msgb_free(msg);
+ return;
+ }
+
+ switch (conn->mode) {
+ case OSMO_STREAM_MODE_OSMO_FD:
+ msgb_enqueue(&conn->tx_queue, msg);
+ osmo_fd_write_enable(&conn->ofd);
+ break;
+ case OSMO_STREAM_MODE_OSMO_IO:
+ if (conn->srv->proto == IPPROTO_SCTP)
+ rc = stream_iofd_sctp_send_msgb(conn->iofd, msg, MSG_NOSIGNAL);
+ else
+ rc = osmo_iofd_write_msgb(conn->iofd, msg);
+ if (rc < 0)
+ msgb_free(msg);
+ break;
+ default:
+ OSMO_ASSERT(false);
+ }
+}
+
+/*! Receive data via an Osmocom stream server in osmo_fd mode.
+ * \param[in] conn Stream Server from which to receive
+ * \param msg pre-allocate message buffer to which received data is appended
+ * \returns number of bytes read, negative on error.
+ *
+ * Application programs using the legacy osmo_fd mode of osmo_stream_srv will use
+ * this function to read/receive from a stream socket after they have been notified that
+ * it is readable (via select/poll).
+ *
+ * If conn is an SCTP connection, additional specific considerations shall be taken:
+ * - msg->cb is always filled with SCTP ppid, and SCTP stream values, see msgb_sctp_*() APIs.
+ * - If an SCTP notification was received when reading from the SCTP socket,
+ * msgb_sctp_msg_flags(msg) will contain bit flag
+ * OSMO_STREAM_SCTP_MSG_FLAGS_NOTIFICATION set, and the msgb will
+ * contain a "union sctp_notification" instead of user data. In this case the
+ * return code will be either 0 (if conn is considered dead after the
+ * notification) or -EAGAIN (if conn is considered still alive after the
+ * notification) resembling the standard recv() API.
+ */
+int osmo_stream_srv_recv(struct osmo_stream_srv *conn, struct msgb *msg)
+{
+ int ret;
+ OSMO_ASSERT(conn);
+ OSMO_ASSERT(msg);
+ OSMO_ASSERT(conn->mode == OSMO_STREAM_MODE_OSMO_FD);
+
+ switch (conn->srv->sk_domain) {
+ case AF_UNIX:
+ ret = recv(conn->ofd.fd, msg->tail, msgb_tailroom(msg), 0);
+ break;
+ case AF_INET:
+ case AF_INET6:
+ case AF_UNSPEC:
+ switch (conn->srv->proto) {
+#ifdef HAVE_LIBSCTP
+ case IPPROTO_SCTP:
+ {
+ char log_pfx[128];
+ snprintf(log_pfx, sizeof(log_pfx), "SRV(%s,%s)", conn->name ? : "", conn->sockname);
+ ret = stream_sctp_recvmsg_wrapper(conn->ofd.fd, msg, log_pfx);
+ break;
+ }
+#endif
+ case IPPROTO_TCP:
+ default:
+ ret = recv(conn->ofd.fd, msg->tail, msgb_tailroom(msg), 0);
+ break;
+ }
+ break;
+ default:
+ ret = -ENOTSUP;
+ }
+
+ if (ret < 0) {
+ if (errno == EPIPE || errno == ECONNRESET)
+ LOGSSRV(conn, LOGL_ERROR, "lost connection with client\n");
+ return ret;
+ } else if (ret == 0) {
+ LOGSSRV(conn, LOGL_ERROR, "connection closed with client\n");
+ return ret;
+ }
+ msgb_put(msg, ret);
+ LOGSSRV(conn, LOGL_DEBUG, "received %d bytes from client\n", ret);
+ return ret;
+}
+
+void osmo_stream_srv_clear_tx_queue(struct osmo_stream_srv *conn)
+{
+ switch (conn->mode) {
+ case OSMO_STREAM_MODE_OSMO_FD:
+ msgb_queue_free(&conn->tx_queue);
+ osmo_fd_write_disable(&conn->ofd);
+ break;
+ case OSMO_STREAM_MODE_OSMO_IO:
+ osmo_iofd_txqueue_clear(conn->iofd);
+ break;
+ case OSMO_STREAM_MODE_UNKNOWN:
+ default:
+ break;
+ }
+
+ if (conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY)
+ osmo_stream_srv_destroy(conn);
+}
+
+/*! @} */