aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHarald Welte <laforge@osmocom.org>2024-03-17 12:03:24 +0100
committerHarald Welte <laforge@osmocom.org>2024-04-02 19:40:03 +0200
commit352b967d1bee4b3bc51ecc383306bad0fc5b7fbf (patch)
tree1c0b11e97ca1207dd56bdee0430f00681308c2f3
parented4da25f2f7b45145a894b1b99a373818eb68da8 (diff)
Convert RTP/RTCP/OSMUX I/O from osmo_fd to osmo_ioHEADmaster
Converting from osmo_fd to osmo_io allows us to switch to the new io_uring backend and benefit from related performance benefits. In a benchmark running 200 concurrent bi-directional voice calls with GSM-EFR codec, I am observing: * the code before this patch uses 40..42% of a single core on a Ryzen 5950X at 200 calls (=> 200 endpoints with each two connections) * no increase in CPU utilization before/after this patch, i.e. the osmo_io overhead for the osmo_fd backend is insignificant compared to the direct osmo_fd mode before * an almost exactly 50% reduction of CPU utilization when running the same osmo-mgw build with LIBOSMO_IO_BACKEND=IO_URING - top shows 19..21% for the same workload instead of 40..42% with the OSMO_FD default backend. * An increase of about 4 Megabytes in both RSS and VIRT size when enabling the OSMO_IO backend. This is likely the memory-mapped rings. No memory leakage is observed when using either of the backends. Change-Id: I8471960d5d8088a70cf105f2f40dfa5d5458169a
-rw-r--r--include/osmocom/mgcp/mgcp.h3
-rw-r--r--include/osmocom/mgcp/mgcp_network.h7
-rw-r--r--src/libosmo-mgcp/mgcp_conn.c4
-rw-r--r--src/libosmo-mgcp/mgcp_iuup.c5
-rw-r--r--src/libosmo-mgcp/mgcp_network.c195
-rw-r--r--src/libosmo-mgcp/mgcp_osmux.c148
-rw-r--r--tests/mgcp/mgcp_test.c11
7 files changed, 204 insertions, 169 deletions
diff --git a/include/osmocom/mgcp/mgcp.h b/include/osmocom/mgcp/mgcp.h
index e61ba890c..4dff4d086 100644
--- a/include/osmocom/mgcp/mgcp.h
+++ b/include/osmocom/mgcp/mgcp.h
@@ -24,6 +24,7 @@
#include <osmocom/core/msgb.h>
#include <osmocom/core/socket.h>
+#include <osmocom/core/osmo_io.h>
#include <osmocom/core/write_queue.h>
#include <osmocom/core/timer.h>
#include <osmocom/core/logging.h>
@@ -205,4 +206,4 @@ int mgcp_send_reset_all(struct mgcp_config *cfg);
int mgcp_create_bind(const char *source_addr, int port, uint8_t dscp, uint8_t prio);
-int mgcp_udp_send(int fd, const struct osmo_sockaddr *addr, const char *buf, int len);
+int mgcp_udp_send(struct osmo_io_fd *iofd, const struct osmo_sockaddr *addr, const char *buf, int len);
diff --git a/include/osmocom/mgcp/mgcp_network.h b/include/osmocom/mgcp/mgcp_network.h
index 1ec897936..8f6505c11 100644
--- a/include/osmocom/mgcp/mgcp_network.h
+++ b/include/osmocom/mgcp/mgcp_network.h
@@ -4,6 +4,7 @@
#include <stdbool.h>
#include <osmocom/core/socket.h>
+#include <osmocom/core/osmo_io.h>
#include <osmocom/mgcp/mgcp.h>
@@ -120,8 +121,8 @@ struct mgcp_rtp_end {
bool rfc5993_hr_convert;
/* Each end has a separate socket for RTP and RTCP */
- struct osmo_fd rtp;
- struct osmo_fd rtcp;
+ struct osmo_io_fd *rtp;
+ struct osmo_io_fd *rtcp;
/* local UDP port number of the RTP socket; RTCP is +1 */
int local_port;
@@ -179,7 +180,7 @@ void rtpconn_rate_ctr_add(struct mgcp_conn_rtp *conn_rtp, struct mgcp_endpoint *
int id, int inc);
void rtpconn_rate_ctr_inc(struct mgcp_conn_rtp *conn_rtp, struct mgcp_endpoint *endp,
int id);
-void forward_data_tap(int fd, struct mgcp_rtp_tap *tap, struct msgb *msg);
+void forward_data_tap(struct osmo_io_fd *iofd, struct mgcp_rtp_tap *tap, struct msgb *msg);
uint32_t mgcp_get_current_ts(unsigned codec_rate);
int amr_oa_bwe_convert(struct mgcp_endpoint *endp, struct msgb *msg, bool target_is_oa);
diff --git a/src/libosmo-mgcp/mgcp_conn.c b/src/libosmo-mgcp/mgcp_conn.c
index d9bc57371..5eb48971c 100644
--- a/src/libosmo-mgcp/mgcp_conn.c
+++ b/src/libosmo-mgcp/mgcp_conn.c
@@ -106,8 +106,8 @@ static int mgcp_rtp_conn_init(struct mgcp_conn_rtp *conn_rtp, struct mgcp_conn *
/* backpointer to the generic part of the connection */
conn->u.rtp.conn = conn;
- end->rtp.fd = -1;
- end->rtcp.fd = -1;
+ end->rtp = NULL;
+ end->rtcp = NULL;
memset(&end->addr, 0, sizeof(end->addr));
end->rtcp_port = 0;
diff --git a/src/libosmo-mgcp/mgcp_iuup.c b/src/libosmo-mgcp/mgcp_iuup.c
index 3818d3e23..7531e42ce 100644
--- a/src/libosmo-mgcp/mgcp_iuup.c
+++ b/src/libosmo-mgcp/mgcp_iuup.c
@@ -512,10 +512,9 @@ static int mgcp_send_iuup(struct mgcp_endpoint *endp, struct msgb *msg,
osmo_sockaddr_port(&rtp_end->addr.u.sa), ntohs(rtp_end->rtcp_port));
/* Forward a copy of the RTP data to a debug ip/port */
- forward_data_tap(rtp_end->rtp.fd, &conn_src->tap_out,
- msg);
+ forward_data_tap(rtp_end->rtp, &conn_src->tap_out, msg);
- len = mgcp_udp_send(rtp_end->rtp.fd, &rtp_end->addr, (char *)hdr, buflen);
+ len = mgcp_udp_send(rtp_end->rtp, &rtp_end->addr, (char *)hdr, buflen);
if (len <= 0)
return len;
diff --git a/src/libosmo-mgcp/mgcp_network.c b/src/libosmo-mgcp/mgcp_network.c
index bdf516e70..1fc2c56cd 100644
--- a/src/libosmo-mgcp/mgcp_network.c
+++ b/src/libosmo-mgcp/mgcp_network.c
@@ -4,6 +4,7 @@
/*
* (C) 2009-2012 by Holger Hans Peter Freyther <zecke@selfish.org>
* (C) 2009-2012 by On-Waves
+ * (C) 2013-2024 by sysmocom - s.f.m.c. GmbH
* All Rights Reserved
*
* This program is free software; you can redistribute it and/or modify
@@ -794,16 +795,18 @@ static int amr_oa_check(char *data, int len)
/* Forward data to a debug tap. This is debug function that is intended for
* debugging the voice traffic with tools like gstreamer */
-void forward_data_tap(int fd, struct mgcp_rtp_tap *tap, struct msgb *msg)
+void forward_data_tap(struct osmo_io_fd *iofd, struct mgcp_rtp_tap *tap, struct msgb *msg)
{
int rc;
if (!tap->enabled)
return;
- rc = sendto(fd, msgb_data(msg), msgb_length(msg), 0, (struct sockaddr *)&tap->forward,
- sizeof(tap->forward));
+ struct msgb *msg2 = msgb_copy(msg, "RTP TAP Tx");
+ if (!msg2)
+ return;
+ rc = osmo_iofd_sendto_msgb(iofd, msg2, 0, &tap->forward);
if (rc < 0)
LOGP(DRTP, LOGL_ERROR,
"Forwarding tapped (debug) voice data failed.\n");
@@ -1039,29 +1042,36 @@ static int mgcp_conn_rtp_dispatch_rtp(struct mgcp_conn_rtp *conn_dst, struct msg
return -1;
}
-/*! send udp packet.
- * \param[in] fd associated file descriptor.
+/*! send message buffer via udp socket. If it succeeds, it takes ownership of the msgb and internally calls
+ * msgb_free() after the aynchronous sendto() completes. In case of error, the msgb is still owned by the
+ * caller and must be free'd accordingly.
+ * \param[in] iofd associated file descriptor.
+ * \param[in] addr destination ip-address.
+ * \param[in] msg message buffer that holds the data to be send.
+ * \returns 0 in case of success (takes msgb ownership), -1 on error (doesn't take msgb ownership). */
+static int mgcp_udp_send_msg(struct osmo_io_fd *iofd, const struct osmo_sockaddr *addr, struct msgb *msg)
+{
+ LOGP(DRTP, LOGL_DEBUG, "sending %d bytes length packet to %s ...\n", msgb_length(msg),
+ osmo_sockaddr_to_str(addr));
+
+ return osmo_iofd_sendto_msgb(iofd, msg, 0, addr);
+}
+
+/*! send udp packet from raw buffer/length.
+ * \param[in] iofd associated file descriptor.
* \param[in] addr destination ip-address.
* \param[in] buf buffer that holds the data to be send.
* \param[in] len length of the data to be sent.
* \returns bytes sent, -1 on error. */
-int mgcp_udp_send(int fd, const struct osmo_sockaddr *addr, const char *buf, int len)
+int mgcp_udp_send(struct osmo_io_fd *iofd, const struct osmo_sockaddr *addr, const char *buf, int len)
{
- char ipbuf[INET6_ADDRSTRLEN];
- size_t addr_len;
-
- LOGP(DRTP, LOGL_DEBUG,
- "sending %d bytes length packet to %s:%u ...\n", len,
- osmo_sockaddr_ntop(&addr->u.sa, ipbuf),
- osmo_sockaddr_port(&addr->u.sa));
-
- if (addr->u.sa.sa_family == AF_INET6) {
- addr_len = sizeof(addr->u.sin6);
- } else {
- addr_len = sizeof(addr->u.sin);
- }
+ struct msgb *msg = msgb_alloc_c(iofd, len, "mgcp_udp_send");
+ if (!msg)
+ return -ENOMEM;
+ memcpy(msg->tail, buf, len);
+ msgb_put(msg, len);
- return sendto(fd, buf, len, 0, &addr->u.sa, addr_len);
+ return mgcp_udp_send_msg(iofd, addr, msg);
}
/*! send RTP dummy packet (to keep NAT connection open).
@@ -1089,8 +1099,7 @@ int mgcp_send_dummy(struct mgcp_endpoint *endp, struct mgcp_conn_rtp *conn)
if (mgcp_conn_rtp_is_iuup(conn))
rc = mgcp_conn_iuup_send_dummy(conn);
else
- rc = mgcp_udp_send(conn->end.rtp.fd, &conn->end.addr,
- rtp_dummy_payload, sizeof(rtp_dummy_payload));
+ rc = mgcp_udp_send(conn->end.rtp, &conn->end.addr, rtp_dummy_payload, sizeof(rtp_dummy_payload));
if (rc == -1)
goto failed;
@@ -1101,7 +1110,7 @@ int mgcp_send_dummy(struct mgcp_endpoint *endp, struct mgcp_conn_rtp *conn)
was_rtcp = 1;
rtcp_addr = conn->end.addr;
osmo_sockaddr_set_port(&rtcp_addr.u.sa, ntohs(conn->end.rtcp_port));
- rc = mgcp_udp_send(conn->end.rtcp.fd, &rtcp_addr,
+ rc = mgcp_udp_send(conn->end.rtcp, &rtcp_addr,
rtp_dummy_payload, sizeof(rtp_dummy_payload));
if (rc >= 0)
@@ -1225,22 +1234,21 @@ int mgcp_send(struct mgcp_endpoint *endp, int is_rtp, struct osmo_sockaddr *addr
);
/* Forward a copy of the RTP data to a debug ip/port */
- forward_data_tap(rtp_end->rtp.fd, &conn_src->tap_out,
- msg);
+ forward_data_tap(rtp_end->rtp, &conn_src->tap_out, msg);
+
+ len = msgb_length(msg);
- len = mgcp_udp_send(rtp_end->rtp.fd, &rtp_end->addr,
- (char *)msgb_data(msg), msgb_length(msg));
- if (len <= 0) {
+ rc = mgcp_udp_send_msg(rtp_end->rtp, &rtp_end->addr, msg);
+ if (rc < 0) {
msgb_free(msg);
- return len;
+ return rc;
}
rtpconn_rate_ctr_inc(conn_dst, endp, RTP_PACKETS_TX_CTR);
rtpconn_rate_ctr_add(conn_dst, endp, RTP_OCTETS_TX_CTR, len);
rtp_state->alt_rtp_tx_sequence++;
- msgb_free(msg);
- return len;
+ return 0;
} else if (!trunk->omit_rtcp) {
struct osmo_sockaddr rtcp_addr = rtp_end->addr;
osmo_sockaddr_set_port(&rtcp_addr.u.sa, rtp_end->rtcp_port);
@@ -1251,15 +1259,19 @@ int mgcp_send(struct mgcp_endpoint *endp, int is_rtp, struct osmo_sockaddr *addr
osmo_sockaddr_port(&rtcp_addr.u.sa)
);
- len = mgcp_udp_send(rtp_end->rtcp.fd, &rtcp_addr,
- (char *)msgb_data(msg), msgb_length(msg));
+ len = msgb_length(msg);
+
+ rc = mgcp_udp_send_msg(rtp_end->rtcp, &rtcp_addr, msg);
+ if (rc < 0) {
+ msgb_free(msg);
+ return rc;
+ }
rtpconn_rate_ctr_inc(conn_dst, endp, RTP_PACKETS_TX_CTR);
rtpconn_rate_ctr_add(conn_dst, endp, RTP_OCTETS_TX_CTR, len);
rtp_state->alt_rtp_tx_sequence++;
- msgb_free(msg);
- return len;
+ return 0;
}
msgb_free(msg);
@@ -1461,7 +1473,7 @@ void mgcp_cleanup_e1_bridge_cb(struct mgcp_endpoint *endp, struct mgcp_conn *con
}
/* Handle incoming RTP data from NET */
-static int rtp_data_net(struct osmo_fd *fd, unsigned int what)
+static void rtp_recvfrom_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg, const struct osmo_sockaddr *saddr)
{
/* NOTE: This is a generic implementation. RTP data is received. In
* case of loopback the data is just sent back to its origin. All
@@ -1472,49 +1484,34 @@ static int rtp_data_net(struct osmo_fd *fd, unsigned int what)
struct mgcp_conn_rtp *conn_src;
struct mgcp_endpoint *endp;
- struct osmo_sockaddr addr;
- socklen_t slen = sizeof(addr);
- char ipbuf[INET6_ADDRSTRLEN];
- int ret;
enum rtp_proto proto;
struct osmo_rtp_msg_ctx *mc;
- struct msgb *msg;
- int rc;
- conn_src = (struct mgcp_conn_rtp *)fd->data;
+ conn_src = (struct mgcp_conn_rtp *) osmo_iofd_get_data(iofd);
OSMO_ASSERT(conn_src);
endp = conn_src->conn->endp;
OSMO_ASSERT(endp);
- msg = msgb_alloc_c(endp->trunk, RTP_BUF_SIZE, "RTP-rx");
- proto = (fd == &conn_src->end.rtp) ? MGCP_PROTO_RTP : MGCP_PROTO_RTCP;
+ proto = (iofd == conn_src->end.rtp) ? MGCP_PROTO_RTP : MGCP_PROTO_RTCP;
- ret = recvfrom(fd->fd, msgb_data(msg), msg->data_len, 0, (struct sockaddr *)&addr.u.sa, &slen);
-
- if (ret <= 0) {
- LOG_CONN_RTP(conn_src, LOGL_ERROR, "recvfrom error: %s\n", strerror(errno));
- rc = -1;
- goto out;
+ if (res <= 0) {
+ LOG_CONN_RTP(conn_src, LOGL_ERROR, "recvfrom error: %s\n", strerror(-res));
+ goto out_free;
}
- msgb_put(msg, ret);
-
- LOG_CONN_RTP(conn_src, LOGL_DEBUG, "%s: rx %u bytes from %s:%u\n",
+ LOG_CONN_RTP(conn_src, LOGL_DEBUG, "%s: rx %u bytes from %s\n",
proto == MGCP_PROTO_RTP ? "RTP" : "RTCP",
- msgb_length(msg), osmo_sockaddr_ntop(&addr.u.sa, ipbuf),
- osmo_sockaddr_port(&addr.u.sa));
+ msgb_length(msg), osmo_sockaddr_to_str(saddr));
if ((proto == MGCP_PROTO_RTP && check_rtp(conn_src, msg))
|| (proto == MGCP_PROTO_RTCP && check_rtcp(conn_src, msg))) {
/* Logging happened in the two check_ functions */
- rc = -1;
- goto out;
+ goto out_free;
}
if (mgcp_is_rtp_dummy_payload(msg)) {
LOG_CONN_RTP(conn_src, LOGL_DEBUG, "rx dummy packet (dropped)\n");
- rc = 0;
- goto out;
+ goto out_free;
}
/* Since the msgb remains owned and freed by this function, the msg ctx data struct can just be on the stack and
@@ -1523,7 +1520,7 @@ static int rtp_data_net(struct osmo_fd *fd, unsigned int what)
*mc = (struct osmo_rtp_msg_ctx){
.proto = proto,
.conn_src = conn_src,
- .from_addr = &addr,
+ .from_addr = (struct osmo_sockaddr *) saddr,
};
LOG_CONN_RTP(conn_src, LOGL_DEBUG, "msg ctx: %d %p %s\n",
mc->proto, mc->conn_src,
@@ -1538,12 +1535,13 @@ static int rtp_data_net(struct osmo_fd *fd, unsigned int what)
/* FIXME: count RTP and RTCP separately, also count IuUP payload-less separately */
/* Forward a copy of the RTP data to a debug ip/port */
- forward_data_tap(fd->fd, &conn_src->tap_in, msg);
+ forward_data_tap(iofd, &conn_src->tap_in, msg);
- rc = rx_rtp(msg);
+ rx_rtp(msg);
+ return;
-out:
- return rc;
+out_free:
+ msgb_free(msg);
}
/* Note: This function is able to handle RTP and RTCP. msgb ownership is transferred, so this function or its
@@ -1590,6 +1588,24 @@ out_free:
return -1;
}
+static void rtp_sendto_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg, const struct osmo_sockaddr *daddr)
+{
+ /* nothing; osmo_io takes care of msgb_free */
+ if (res < 0) {
+ struct mgcp_conn_rtp *conn_rtp = (struct mgcp_conn_rtp *) osmo_iofd_get_data(iofd);
+ int priv_nr = osmo_iofd_get_priv_nr(iofd);
+ char errbuf[129];
+ strerror_r(-res, errbuf, sizeof(errbuf));
+ LOG_CONN_RTP(conn_rtp, LOGL_ERROR, "%s sendto(%s) failed: %s\n", priv_nr ? "RTCP" : "RTP",
+ osmo_sockaddr_to_str(daddr), errbuf);
+ }
+}
+
+static const struct osmo_io_ops rtp_ioops = {
+ .recvfrom_cb = rtp_recvfrom_cb,
+ .sendto_cb = rtp_sendto_cb,
+};
+
/*! bind RTP port to osmo_fd.
* \param[in] source_addr source (local) address to bind on.
* \param[in] port to bind on.
@@ -1617,7 +1633,7 @@ int mgcp_create_bind(const char *source_addr, int port, uint8_t dscp, uint8_t pr
static int bind_rtp(struct mgcp_config *cfg, const char *source_addr,
struct mgcp_rtp_end *rtp_end, struct mgcp_endpoint *endp)
{
- int rc;
+ int rc, rtp_fd, rtcp_fd;
/* NOTE: The port that is used for RTCP is the RTP port incremented by one
* (e.g. RTP-Port = 16000 ==> RTCP-Port = 16001) */
@@ -1629,7 +1645,7 @@ static int bind_rtp(struct mgcp_config *cfg, const char *source_addr,
source_addr, rtp_end->local_port);
goto cleanup0;
}
- rtp_end->rtp.fd = rc;
+ rtp_fd = rc;
rc = mgcp_create_bind(source_addr, rtp_end->local_port + 1, cfg->endp_dscp, cfg->endp_priority);
if (rc < 0) {
@@ -1638,16 +1654,16 @@ static int bind_rtp(struct mgcp_config *cfg, const char *source_addr,
source_addr, rtp_end->local_port + 1);
goto cleanup1;
}
- rtp_end->rtcp.fd = rc;
+ rtcp_fd = rc;
- if (osmo_fd_register(&rtp_end->rtp) != 0) {
+ if (osmo_iofd_register(rtp_end->rtp, rtp_fd) < 0) {
LOGPENDP(endp, DRTP, LOGL_ERROR,
"failed to register RTP port %d\n",
rtp_end->local_port);
goto cleanup2;
}
- if (osmo_fd_register(&rtp_end->rtcp) != 0) {
+ if (osmo_iofd_register(rtp_end->rtcp, rtcp_fd) != 0) {
LOGPENDP(endp, DRTP, LOGL_ERROR,
"failed to register RTCP port %d\n",
rtp_end->local_port + 1);
@@ -1657,13 +1673,11 @@ static int bind_rtp(struct mgcp_config *cfg, const char *source_addr,
return 0;
cleanup3:
- osmo_fd_unregister(&rtp_end->rtp);
+ osmo_iofd_unregister(rtp_end->rtp);
cleanup2:
- close(rtp_end->rtcp.fd);
- rtp_end->rtcp.fd = -1;
+ close(rtcp_fd);
cleanup1:
- close(rtp_end->rtp.fd);
- rtp_end->rtp.fd = -1;
+ close(rtp_fd);
cleanup0:
return -1;
}
@@ -1682,7 +1696,8 @@ int mgcp_bind_net_rtp_port(struct mgcp_endpoint *endp, int rtp_port,
snprintf(name, sizeof(name), "%s-%s", conn->conn->name, conn->conn->id);
end = &conn->end;
- if (end->rtp.fd != -1 || end->rtcp.fd != -1) {
+ if ((end->rtp && osmo_iofd_get_fd(end->rtp) != -1) ||
+ (end->rtcp && osmo_iofd_get_fd(end->rtcp) != -1)) {
LOGPENDP(endp, DRTP, LOGL_ERROR, "%u was already bound on conn:%s\n",
rtp_port, mgcp_conn_dump(conn->conn));
@@ -1695,8 +1710,18 @@ int mgcp_bind_net_rtp_port(struct mgcp_endpoint *endp, int rtp_port,
}
end->local_port = rtp_port;
- osmo_fd_setup(&end->rtp, -1, OSMO_FD_READ, rtp_data_net, conn, 0);
- osmo_fd_setup(&end->rtcp, -1, OSMO_FD_READ, rtp_data_net, conn, 0);
+ end->rtp = osmo_iofd_setup(conn->conn, -1, name, OSMO_IO_FD_MODE_RECVFROM_SENDTO, &rtp_ioops, conn);
+ if (!end->rtp)
+ return -EIO;
+ osmo_iofd_set_alloc_info(end->rtp, RTP_BUF_SIZE, 0);
+ end->rtcp = osmo_iofd_setup(conn->conn, -1, name, OSMO_IO_FD_MODE_RECVFROM_SENDTO, &rtp_ioops, conn);
+ if (!end->rtcp) {
+ osmo_iofd_free(end->rtp);
+ end->rtp = NULL;
+ return -EIO;
+ }
+ osmo_iofd_set_alloc_info(end->rtcp, RTP_BUF_SIZE, 0);
+ osmo_iofd_set_priv_nr(end->rtcp, 1); /* we use priv_nr as identifier for RTCP */
return bind_rtp(endp->trunk->cfg, conn->end.local_addr, end, endp);
}
@@ -1705,15 +1730,13 @@ int mgcp_bind_net_rtp_port(struct mgcp_endpoint *endp, int rtp_port,
* \param[in] end RTP end */
void mgcp_free_rtp_port(struct mgcp_rtp_end *end)
{
- if (end->rtp.fd != -1) {
- osmo_fd_unregister(&end->rtp);
- close(end->rtp.fd);
- end->rtp.fd = -1;
+ if (end->rtp) {
+ osmo_iofd_free(end->rtp);
+ end->rtp = NULL;
}
- if (end->rtcp.fd != -1) {
- osmo_fd_unregister(&end->rtcp);
- close(end->rtcp.fd);
- end->rtcp.fd = -1;
+ if (end->rtcp) {
+ osmo_iofd_free(end->rtcp);
+ end->rtcp = NULL;
}
}
diff --git a/src/libosmo-mgcp/mgcp_osmux.c b/src/libosmo-mgcp/mgcp_osmux.c
index 3df99724e..df91dbc0b 100644
--- a/src/libosmo-mgcp/mgcp_osmux.c
+++ b/src/libosmo-mgcp/mgcp_osmux.c
@@ -1,6 +1,7 @@
/*
* (C) 2012-2013 by Pablo Neira Ayuso <pablo@gnumonks.org>
* (C) 2012-2013 by On Waves ehf <http://www.on-waves.com>
+ * (C) 2013-2024 by sysmocom - s.f.m.c. GmbH
* All rights not specifically granted under this license are reserved.
*
* This program is free software; you can redistribute it and/or modify it
@@ -13,9 +14,11 @@
#include <string.h> /* for memcpy */
#include <stdlib.h> /* for abs */
#include <inttypes.h> /* for PRIu64 */
+#include <unistd.h> /* for PRIu64 */
#include <netinet/in.h>
#include <osmocom/core/msgb.h>
#include <osmocom/core/socket.h>
+#include <osmocom/core/osmo_io.h>
#include <osmocom/core/talloc.h>
#include <osmocom/netif/osmux.h>
@@ -30,8 +33,8 @@
#include <osmocom/mgcp/mgcp_endp.h>
#include <osmocom/mgcp/mgcp_trunk.h>
-static struct osmo_fd osmux_fd_v4;
-static struct osmo_fd osmux_fd_v6;
+static struct osmo_io_fd *osmux_fd_v4;
+static struct osmo_io_fd *osmux_fd_v6;
static LLIST_HEAD(osmux_handle_list);
@@ -76,34 +79,31 @@ static void rtpconn_osmux_rate_ctr_inc(struct mgcp_conn_rtp *conn_rtp, int id)
static void osmux_deliver_cb(struct msgb *batch_msg, void *data)
{
struct osmux_handle *handle = data;
- socklen_t dest_len;
- int rc, fd;
- struct mgcp_trunk *trunk = (struct mgcp_trunk *)osmux_fd_v4.data;
+ int rc;
+ struct osmo_io_fd *iofd;
+ struct mgcp_trunk *trunk = (struct mgcp_trunk *) osmo_iofd_get_data(osmux_fd_v4);
struct rate_ctr_group *all_osmux_stats = trunk->ratectr.all_osmux_conn_stats;
switch (handle->rem_addr.u.sa.sa_family) {
case AF_INET6:
- dest_len = sizeof(handle->rem_addr.u.sin6);
- fd = osmux_fd_v6.fd;
+ iofd = osmux_fd_v6;
break;
case AF_INET:
default:
- dest_len = sizeof(handle->rem_addr.u.sin);
- fd = osmux_fd_v4.fd;
+ iofd = osmux_fd_v4;
break;
}
- rc = sendto(fd, batch_msg->data, batch_msg->len, 0,
- (struct sockaddr *)&handle->rem_addr.u.sa, dest_len);
+ rc = osmo_iofd_sendto_msgb(iofd, batch_msg, 0, &handle->rem_addr);
if (rc < 0) {
char errbuf[129];
- strerror_r(errno, errbuf, sizeof(errbuf));
+ strerror_r(-rc, errbuf, sizeof(errbuf));
LOGP(DOSMUX, LOGL_NOTICE, "osmux sendto(%s) failed: %s\n",
osmo_sockaddr_to_str(&handle->rem_addr), errbuf);
rate_ctr_inc(rate_ctr_group_get_ctr(all_osmux_stats, OSMUX_DROPPED_PACKETS_CTR));
+ msgb_free(batch_msg);
} else {
rate_ctr_inc(rate_ctr_group_get_ctr(all_osmux_stats, OSMUX_PACKETS_TX_CTR));
}
- msgb_free(batch_msg);
}
/* Lookup existing OSMUX handle for specified destination address. */
@@ -325,28 +325,6 @@ static void scheduled_from_osmux_tx_rtp_cb(struct msgb *msg, void *data)
/* dispatch_rtp_cb() has taken ownership of the msgb */
}
-static struct msgb *osmux_recv(struct osmo_fd *ofd, struct osmo_sockaddr *addr)
-{
- struct msgb *msg;
- socklen_t slen = sizeof(addr->u.sas);
- int ret;
-
- msg = msgb_alloc(4096, "OSMUX");
- if (!msg) {
- LOGP(DOSMUX, LOGL_ERROR, "cannot allocate message\n");
- return NULL;
- }
- ret = recvfrom(ofd->fd, msg->data, msg->data_len, 0, &addr->u.sa, &slen);
- if (ret <= 0) {
- msgb_free(msg);
- LOGP(DOSMUX, LOGL_ERROR, "cannot receive message\n");
- return NULL;
- }
- msgb_put(msg, ret);
-
- return msg;
-}
-
/* To be called every time some AMR data is received on a connection
* returns: 0 if conn can process data, negative if an error ocurred and data should not be further processed */
static int conn_osmux_event_data_received(struct mgcp_conn_rtp *conn, const struct osmo_sockaddr *rem_addr)
@@ -442,22 +420,16 @@ out:
}
#define osmux_chunk_length(msg, rem) ((rem) - (msg)->len)
-static int osmux_read_fd_cb(struct osmo_fd *ofd, unsigned int what)
+static void osmux_recvfrom_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg, const struct osmo_sockaddr *rem_addr)
{
- struct msgb *msg;
struct osmux_hdr *osmuxh;
- struct osmo_sockaddr rem_addr;
- uint32_t rem;
- struct mgcp_trunk *trunk = ofd->data;
+ struct mgcp_trunk *trunk = osmo_iofd_get_data(iofd);
struct rate_ctr_group *all_rtp_stats = trunk->ratectr.all_osmux_conn_stats;
+ uint32_t rem;
char addr_str[64];
- msg = osmux_recv(ofd, &rem_addr);
- if (!msg)
- return -1;
-
rate_ctr_inc(rate_ctr_group_get_ctr(all_rtp_stats, OSMUX_PACKETS_RX_CTR));
- osmo_sockaddr_to_str_buf(addr_str, sizeof(addr_str), &rem_addr);
+ osmo_sockaddr_to_str_buf(addr_str, sizeof(addr_str), rem_addr);
if (trunk->cfg->osmux.usage == OSMUX_USAGE_OFF) {
LOGP(DOSMUX, LOGL_ERROR,
@@ -467,14 +439,16 @@ static int osmux_read_fd_cb(struct osmo_fd *ofd, unsigned int what)
}
/* Catch legacy dummy message and process them separately: */
- if (msg->len == 2 && msg->data[0] == MGCP_DUMMY_LOAD)
- return osmux_handle_legacy_dummy(trunk, &rem_addr, msg);
+ if (msg->len == 2 && msg->data[0] == MGCP_DUMMY_LOAD) {
+ osmux_handle_legacy_dummy(trunk, rem_addr, msg);
+ return;
+ }
rem = msg->len;
while((osmuxh = osmux_xfrm_output_pull(msg)) != NULL) {
struct mgcp_conn_rtp *conn_src;
conn_src = osmux_conn_lookup(trunk, osmuxh->circuit_id,
- &rem_addr);
+ rem_addr);
if (!conn_src) {
LOGP(DOSMUX, LOGL_DEBUG,
"Cannot find a src conn for %s CID=%d\n",
@@ -482,7 +456,7 @@ static int osmux_read_fd_cb(struct osmo_fd *ofd, unsigned int what)
goto next;
}
- if (conn_osmux_event_data_received(conn_src, &rem_addr) < 0)
+ if (conn_osmux_event_data_received(conn_src, rem_addr) < 0)
goto next;
mgcp_conn_watchdog_kick(conn_src->conn);
@@ -496,19 +470,38 @@ next:
}
out:
msgb_free(msg);
- return 0;
}
+static void osmux_sendto_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg, const struct osmo_sockaddr *rem_addr)
+{
+ /* nothing; osmo_io takes care of msgb_free */
+ if (res < 0) {
+ struct mgcp_trunk *trunk = (struct mgcp_trunk *) osmo_iofd_get_data(iofd);
+ struct rate_ctr_group *all_osmux_stats = trunk->ratectr.all_osmux_conn_stats;
+ char errbuf[129];
+ strerror_r(-res, errbuf, sizeof(errbuf));
+ LOGP(DOSMUX, LOGL_NOTICE, "osmux sendto(%s) failed: %s\n", osmo_sockaddr_to_str(rem_addr), errbuf);
+ rate_ctr_inc(rate_ctr_group_get_ctr(all_osmux_stats, OSMUX_DROPPED_PACKETS_CTR));
+ }
+}
+
+static const struct osmo_io_ops osmux_ioops = {
+ .recvfrom_cb = osmux_recvfrom_cb,
+ .sendto_cb = osmux_sendto_cb,
+};
+
int osmux_init(struct mgcp_trunk *trunk)
{
- int ret;
+ int ret, fd;
struct mgcp_config *cfg = trunk->cfg;
/* So far we only support running on one trunk: */
OSMO_ASSERT(trunk == mgcp_trunk_by_num(cfg, MGCP_TRUNK_VIRTUAL, MGCP_VIRT_TRUNK_ID));
- osmo_fd_setup(&osmux_fd_v4, -1, OSMO_FD_READ, osmux_read_fd_cb, trunk, 0);
- osmo_fd_setup(&osmux_fd_v6, -1, OSMO_FD_READ, osmux_read_fd_cb, trunk, 0);
+ osmux_fd_v4 = osmo_iofd_setup(trunk, -1, "osmux_fd_v4", OSMO_IO_FD_MODE_RECVFROM_SENDTO, &osmux_ioops, trunk);
+ if (!osmux_fd_v4)
+ goto out;
+ osmo_iofd_set_alloc_info(osmux_fd_v4, 4096, 0);
if (cfg->osmux.local_addr_v4) {
ret = mgcp_create_bind(cfg->osmux.local_addr_v4, cfg->osmux.local_port,
@@ -516,40 +509,55 @@ int osmux_init(struct mgcp_trunk *trunk)
if (ret < 0) {
LOGP(DOSMUX, LOGL_ERROR, "Cannot bind OSMUX IPv4 socket to %s:%u\n",
cfg->osmux.local_addr_v4, cfg->osmux.local_port);
- return ret;
+ goto out_free_v4;
}
- osmux_fd_v4.fd = ret;
+ fd = ret;
- ret = osmo_fd_register(&osmux_fd_v4);
+ ret = osmo_iofd_register(osmux_fd_v4, fd);
if (ret < 0) {
- LOGP(DOSMUX, LOGL_ERROR, "Cannot register OSMUX IPv4 socket %s\n",
- osmo_sock_get_name2(osmux_fd_v4.fd));
- return ret;
+ LOGP(DOSMUX, LOGL_ERROR, "Cannot register OSMUX IPv4 socket %s\n", osmo_sock_get_name2(fd));
+ close(fd);
+ goto out_free_v4;
}
- LOGP(DOSMUX, LOGL_INFO, "OSMUX IPv4 socket listening on %s\n",
- osmo_sock_get_name2(osmux_fd_v4.fd));
+ LOGP(DOSMUX, LOGL_INFO, "OSMUX IPv4 socket listening on %s\n", osmo_sock_get_name2(fd));
}
+
+ osmux_fd_v6 = osmo_iofd_setup(trunk, -1, "osmux_fd_v6", OSMO_IO_FD_MODE_RECVFROM_SENDTO, &osmux_ioops, trunk);
+ if (!osmux_fd_v6)
+ goto out_free_v4;
+ osmo_iofd_set_alloc_info(osmux_fd_v6, 4096, 0);
+
if (cfg->osmux.local_addr_v6) {
ret = mgcp_create_bind(cfg->osmux.local_addr_v6, cfg->osmux.local_port,
cfg->endp_dscp, cfg->endp_priority);
if (ret < 0) {
LOGP(DOSMUX, LOGL_ERROR, "Cannot bind OSMUX IPv6 socket to [%s]:%u\n",
cfg->osmux.local_addr_v6, cfg->osmux.local_port);
- return ret;
+ goto out_free_v6;
}
- osmux_fd_v6.fd = ret;
+ fd = ret;
- ret = osmo_fd_register(&osmux_fd_v6);
+ ret = osmo_iofd_register(osmux_fd_v6, fd);
if (ret < 0) {
- LOGP(DOSMUX, LOGL_ERROR, "Cannot register OSMUX IPv6 socket %s\n",
- osmo_sock_get_name2(osmux_fd_v6.fd));
- return ret;
+ LOGP(DOSMUX, LOGL_ERROR, "Cannot register OSMUX IPv6 socket %s\n", osmo_sock_get_name2(fd));
+ close(fd);
+ goto out_free_v6;
}
- LOGP(DOSMUX, LOGL_INFO, "OSMUX IPv6 socket listening on %s\n",
- osmo_sock_get_name2(osmux_fd_v6.fd));
+ LOGP(DOSMUX, LOGL_INFO, "OSMUX IPv6 socket listening on %s\n", osmo_sock_get_name2(fd));
}
cfg->osmux.initialized = true;
return 0;
+
+out_free_v6:
+ /* osmo_iofd_free performs unregister + close */
+ osmo_iofd_free(osmux_fd_v6);
+ osmux_fd_v6 = NULL;
+out_free_v4:
+ /* osmo_iofd_free performs unregister + close */
+ osmo_iofd_free(osmux_fd_v4);
+ osmux_fd_v4 = NULL;
+out:
+ return -1;
}
/*! relase OSXMUX cid, that had been allocated to this connection.
@@ -715,7 +723,7 @@ int osmux_send_dummy(struct mgcp_conn_rtp *conn)
osmo_sockaddr_ntop(&conn->end.addr.u.sa, ipbuf),
osmo_sockaddr_port(&conn->end.addr.u.sa), conn->osmux.remote_cid);
- return mgcp_udp_send(osmux_fd_v4.fd, &conn->end.addr, (char *)osmuxh, buf_len);
+ return mgcp_udp_send(osmux_fd_v4, &conn->end.addr, (char *)osmuxh, buf_len);
}
/* Keeps track of locally allocated Osmux circuit ID. +7 to round up to 8 bit boundary. */
diff --git a/tests/mgcp/mgcp_test.c b/tests/mgcp/mgcp_test.c
index c76bd9d83..ffc8a202e 100644
--- a/tests/mgcp/mgcp_test.c
+++ b/tests/mgcp/mgcp_test.c
@@ -653,12 +653,13 @@ static struct msgb *create_msg(const char *str, const char *conn_id)
static int dummy_packets = 0;
/* override and forward */
-ssize_t sendto(int sockfd, const void *buf, size_t len, int flags,
- const struct sockaddr *dest_addr, socklen_t addrlen)
+int osmo_iofd_sendto_msgb(struct osmo_io_fd *iofd, struct msgb *msg, int flags, const struct osmo_sockaddr *addr)
{
uint32_t dest_host =
- htonl(((struct sockaddr_in *)dest_addr)->sin_addr.s_addr);
- int dest_port = htons(((struct sockaddr_in *)dest_addr)->sin_port);
+ htonl(((struct sockaddr_in *)addr)->sin_addr.s_addr);
+ int dest_port = htons(((struct sockaddr_in *)addr)->sin_port);
+ const uint8_t *buf = msgb_data(msg);
+ size_t len = msgb_length(msg);
if (len == sizeof(rtp_dummy_payload)
&& memcmp(buf, rtp_dummy_payload, sizeof(rtp_dummy_payload)) == 0) {
@@ -672,6 +673,8 @@ ssize_t sendto(int sockfd, const void *buf, size_t len, int flags,
OSMO_ASSERT(dest_host);
OSMO_ASSERT(dest_port);
+ msgb_free(msg);
+
return len;
}