diff options
author | Harald Welte <laforge@osmocom.org> | 2020-10-19 13:14:41 +0200 |
---|---|---|
committer | Harald Welte <laforge@osmocom.org> | 2021-11-12 21:58:14 +0100 |
commit | b7a0e043b5e50b651ed78d5ab1ae70b8856ad1a4 (patch) | |
tree | e9b4140bd0be462dd6890bdecc4aca54f59d5142 | |
parent | 004ea1398a1bfe3b847bcfb40eb8177641630027 (diff) |
rtp-load-gen: Add CTRL interfacelaforge/rtp-load-gen
Change-Id: I1122e5c125f03b615d587f734fc1b7eadf827a27
-rw-r--r-- | contrib/rtp-load-gen/Makefile | 9 | ||||
-rw-r--r-- | contrib/rtp-load-gen/ctrl_if.c | 214 | ||||
-rw-r--r-- | contrib/rtp-load-gen/internal.h | 25 | ||||
-rw-r--r-- | contrib/rtp-load-gen/rtp-load-gen.c | 82 |
4 files changed, 302 insertions, 28 deletions
diff --git a/contrib/rtp-load-gen/Makefile b/contrib/rtp-load-gen/Makefile index 4bec99f11..e65d112c4 100644 --- a/contrib/rtp-load-gen/Makefile +++ b/contrib/rtp-load-gen/Makefile @@ -1,8 +1,11 @@ -CFLAGS=-g -Wall $(shell pkg-config --cflags libosmocore libosmo-netif liburing) -LIBS=-lpthread $(shell pkg-config --libs libosmocore libosmo-netif liburing) +CFLAGS=-g -Wall $(shell pkg-config --cflags libosmocore libosmoctrl libosmo-netif liburing) +LIBS=-lpthread $(shell pkg-config --libs libosmocore libosmoctrl libosmo-netif liburing) -rtp-load-gen: rtp-load-gen.o rtp_provider.o rtp_provider_static.o +rtp-load-gen: rtp-load-gen.o rtp_provider.o rtp_provider_static.o ctrl_if.o $(CC) -o $@ $^ $(LIBS) %.o: %.c $(CC) $(CFLAGS) -o $@ -c $^ + +clean: + @rm rtp-load-gen *.o diff --git a/contrib/rtp-load-gen/ctrl_if.c b/contrib/rtp-load-gen/ctrl_if.c new file mode 100644 index 000000000..93356eed7 --- /dev/null +++ b/contrib/rtp-load-gen/ctrl_if.c @@ -0,0 +1,214 @@ +/* CTRL interface of rtpsource program + * + * (C) 2020 by Harald Welte <laforge@gnumonks.org> + * + * All Rights Reserved + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include <osmocom/ctrl/control_cmd.h> + +#include "internal.h" +#include "rtp_provider.h" + +static struct rtpsim_connection *find_connection_by_cname(const char *cname) +{ + struct rtpsim_connection *rtpc; + struct rtpsim_instance *ri; + + pthread_rwlock_rdlock(&g_rtpsim->rwlock); + llist_for_each_entry(ri, &g_rtpsim->instances, list) { + rtpc = rtpsim_conn_find(ri, cname); + if (rtpc) { + pthread_rwlock_unlock(&g_rtpsim->rwlock); + return rtpc; + } + } + pthread_rwlock_unlock(&g_rtpsim->rwlock); + return NULL; +} + +static struct rtpsim_connection *create_connection(const char *cname, enum codec_type codec) +{ + struct rtpsim_connection *rtpc; + struct rtpsim_instance *ri; + + pthread_rwlock_rdlock(&g_rtpsim->rwlock); + llist_for_each_entry(ri, &g_rtpsim->instances, list) { + rtpc = rtpsim_conn_reserve(ri, cname, codec); + if (rtpc) { + pthread_rwlock_unlock(&g_rtpsim->rwlock); + return rtpc; + } + } + pthread_rwlock_unlock(&g_rtpsim->rwlock); + return NULL; +} + +static int connect_connection(struct rtpsim_connection *rtpc, const char *remote_host, + uint16_t remote_port, uint8_t pt) +{ + int rc; + + osmo_sockaddr_str_from_str(&rtpc->cfg.remote, remote_host, remote_port); + rtpc->cfg.pt = pt; + + rc = rtpsim_conn_connect(rtpc); + if (rc < 0) + return rc; + + rc = rtpsim_conn_start(rtpc); + + return rc; +} + +static int delete_connection(struct rtpsim_connection *rtpc) +{ + rtpsim_conn_stop(rtpc); + rtpsim_conn_unreserve(rtpc); + return 0; +} + +CTRL_CMD_DEFINE_WO_NOVRF(rtp_create, "rtp_create"); +static int set_rtp_create(struct ctrl_cmd *cmd, void *data) +{ + struct rtpsim_connection *conn; + const char *cname, *codec_str; + char *tmp, *saveptr; + enum codec_type codec; + + tmp = talloc_strdup(cmd, cmd->value); + OSMO_ASSERT(tmp); + + cname = strtok_r(tmp, ",", &saveptr); + codec_str = strtok_r(NULL, ",", &saveptr); + + if (!cname || !codec_str) { + cmd->reply = "Format is cname,codec"; + goto error; + } + + if (find_connection_by_cname(cname)) { + cmd->reply = "Connection already exists for cname"; + goto error; + } + + codec = get_string_value(codec_type_names, codec_str); + if (codec < 0) { + cmd->reply = "Invalid codec name (try GSM_FR, GSM_EFR etc.)"; + goto error; + } + + conn = create_connection(cname, codec); + if (!conn) { + cmd->reply = "Error creating RTP connection"; + goto error; + } + + /* Respond */ + cmd->reply = talloc_asprintf(cmd, "%s,%s,%d", conn->cname, conn->cfg.local.ip, conn->cfg.local.port); + talloc_free(tmp); + return CTRL_CMD_REPLY; + +error: + talloc_free(tmp); + return CTRL_CMD_ERROR; +} + +CTRL_CMD_DEFINE_WO_NOVRF(rtp_connect, "rtp_connect"); +static int set_rtp_connect(struct ctrl_cmd *cmd, void *data) +{ + struct rtpsim_connection *conn; + const char *cname, *remote_host, *remote_port, *pt; + char *tmp, *saveptr; + int rc; + + tmp = talloc_strdup(cmd, cmd->value); + OSMO_ASSERT(tmp); + + /* FIXME: parse command */ + cname = strtok_r(tmp, ",", &saveptr); + remote_host = strtok_r(NULL, ",", &saveptr); + remote_port = strtok_r(NULL, ",", &saveptr); + pt = strtok_r(NULL, ",", &saveptr); + + if (!cname || !remote_host || !remote_port || !pt) { + cmd->reply = "Format is cname,remote_host,remote_port,pt"; + talloc_free(tmp); + return CTRL_CMD_ERROR; + } + + conn = find_connection_by_cname(cname); + if (!conn) { + cmd->reply = "Error finding RTP connection for connect"; + talloc_free(tmp); + return CTRL_CMD_ERROR; + } + + rc = connect_connection(conn, remote_host, atoi(remote_port), atoi(pt)); + if (rc < 0) { + cmd->reply = "Error binding RTP connection"; + talloc_free(tmp); + return CTRL_CMD_ERROR; + } + + /* Respond */ + talloc_free(tmp); + cmd->reply = talloc_asprintf(cmd, "%s,%s,%d,%d", conn->cname, conn->cfg.remote.ip, + conn->cfg.remote.port, conn->cfg.pt); + return CTRL_CMD_REPLY; +} + +CTRL_CMD_DEFINE_WO_NOVRF(rtp_delete, "rtp_delete"); +static int set_rtp_delete(struct ctrl_cmd *cmd, void *data) +{ + struct rtpsim_connection *conn; + const char *cname = cmd->value; + + conn = find_connection_by_cname(cname); + if (!conn) { + cmd->reply = "Error finding RTP connection for delete"; + return CTRL_CMD_ERROR; + } + cmd->reply = talloc_asprintf(cmd, "%s", conn->cname); + + delete_connection(conn); + + /* Respond */ + return CTRL_CMD_REPLY; +} + + + + + +int rtpsource_ctrl_cmds_install(void) +{ + int rc; + + rc = ctrl_cmd_install(CTRL_NODE_ROOT, &cmd_rtp_create); + if (rc) + goto end; + + rc = ctrl_cmd_install(CTRL_NODE_ROOT, &cmd_rtp_connect); + if (rc) + goto end; + + rc = ctrl_cmd_install(CTRL_NODE_ROOT, &cmd_rtp_delete); + if (rc) + goto end; +end: + return rc; +} diff --git a/contrib/rtp-load-gen/internal.h b/contrib/rtp-load-gen/internal.h index fb76645ea..f859e5729 100644 --- a/contrib/rtp-load-gen/internal.h +++ b/contrib/rtp-load-gen/internal.h @@ -1,10 +1,13 @@ #pragma once #include <stdint.h> +#include <pthread.h> #include <liburing.h> #include <osmocom/core/linuxlist.h> #include <osmocom/core/sockaddr_str.h> #include <osmocom/core/rate_ctr.h> +#include <osmocom/core/select.h> +#include <osmocom/ctrl/control_if.h> struct rtp_provider_instance; @@ -61,7 +64,6 @@ struct rtpsim_connection { struct rtpsim_instance_cfg { int num; - void *ctx; uint16_t base_port; unsigned int num_flows; }; @@ -83,7 +85,28 @@ struct rtpsim_instance { unsigned int connections_size; }; +struct rtpsim_global { + /* global list of instances */ + struct llist_head instances; + pthread_rwlock_t rwlock; + + struct ctrl_handle *ctrl; +}; + enum { DMAIN, }; + +enum codec_type; + +extern struct rtpsim_global *g_rtpsim; + +struct rtpsim_connection *rtpsim_conn_find(struct rtpsim_instance *ri, const char *cname); +struct rtpsim_connection *rtpsim_conn_reserve(struct rtpsim_instance *ri, const char *cname, enum codec_type codec); +int rtpsim_conn_start(struct rtpsim_connection *rtpc); +void rtpsim_conn_stop(struct rtpsim_connection *rtpc); +void rtpsim_conn_unreserve(struct rtpsim_connection *rtpc); +int rtpsim_conn_connect(struct rtpsim_connection *rtpc); + +int rtpsource_ctrl_cmds_install(void); diff --git a/contrib/rtp-load-gen/rtp-load-gen.c b/contrib/rtp-load-gen/rtp-load-gen.c index 5fd7c2c01..762d8580a 100644 --- a/contrib/rtp-load-gen/rtp-load-gen.c +++ b/contrib/rtp-load-gen/rtp-load-gen.c @@ -11,6 +11,7 @@ #include <osmocom/core/socket.h> #include <osmocom/core/rate_ctr.h> #include <osmocom/core/application.h> +#include <osmocom/core/stats.h> #include <osmocom/netif/rtp.h> #include "internal.h" @@ -40,6 +41,8 @@ #define TX_BUF_IDX 0 #define RX_BUF_IDX 1 +struct rtpsim_global *g_rtpsim; + enum rtpsim_conn_ctr { RTP_CONN_CTR_TX_PKTS, RTP_CONN_CTR_TX_BYTES, @@ -79,9 +82,9 @@ static const struct rate_ctr_group_desc rtpsim_ctrg_desc = { .ctr_desc = rtpsim_ctrs, }; -struct rtpsim_instance *rtpsim_instance_init(const struct rtpsim_instance_cfg *rmp) +struct rtpsim_instance *rtpsim_instance_init(void *ctx, const struct rtpsim_instance_cfg *rmp) { - struct rtpsim_instance *ri = talloc_zero(rmp->ctx, struct rtpsim_instance); + struct rtpsim_instance *ri = talloc_zero(ctx, struct rtpsim_instance); int rc; if (!ri) @@ -173,9 +176,14 @@ struct rtpsim_connection *rtpsim_conn_find(struct rtpsim_instance *ri, const cha } /* reserve a connection; associates cname with it */ -struct rtpsim_connection *rtpsim_conn_reserve(struct rtpsim_instance *ri, const char *cname) +struct rtpsim_connection *rtpsim_conn_reserve(struct rtpsim_instance *ri, const char *cname, + enum codec_type codec) { struct rtpsim_connection *rtpc; + const struct rtp_provider *rtp_prov; + + rtp_prov = rtp_provider_find("static"); // TODO: configurable */ + OSMO_ASSERT(rtp_prov); rtpc = rtpsim_conn_find(ri, NULL); if (!rtpc) @@ -183,37 +191,34 @@ struct rtpsim_connection *rtpsim_conn_reserve(struct rtpsim_instance *ri, const /* this is called from main thread, we cannot use per-thread talloc contexts * such as ri or rtpc */ - rtpc->cname = talloc_strdup(NULL, cname); + rtpc->cname = talloc_strdup(g_rtpsim, cname); + rtpc->tx.rtp_prov_inst = rtp_provider_instance_alloc(g_rtpsim, rtp_prov, codec); + OSMO_ASSERT(rtpc->tx.rtp_prov_inst); + + /* re-start from zero transmit sequence number */ + rtpc->tx.seq = 0; return rtpc; } -int rtpsim_conn_start(struct rtpsim_connection *rtpc, enum codec_type codec) +int rtpsim_conn_start(struct rtpsim_connection *rtpc) { - const struct rtp_provider *rtp_prov; - rtp_prov = rtp_provider_find("static"); // TODO: configurable */ - OSMO_ASSERT(rtp_prov); - - /* this is called from main thread, we cannot use per-thread talloc contexts - * such as ri or rtpc */ - rtpc->tx.rtp_prov_inst = rtp_provider_instance_alloc(NULL, rtp_prov, codec); - OSMO_ASSERT(rtpc->tx.rtp_prov_inst); - rtpc->tx.enabled = true; rtpc->rx.enabled = true; return 0; } -/* unreserve a connection; stops all rx/tx and removes cname */ -void rtpsim_conn_unreserve(struct rtpsim_connection *rtpc) +void rtpsim_conn_stop(struct rtpsim_connection *rtpc) { /* disable Rx and Tx */ rtpc->tx.enabled = false; rtpc->rx.enabled = false; - /* re-start from zero transmit sequence number */ - rtpc->tx.seq = 0; +} +/* unreserve a connection; stops all rx/tx and removes cname */ +void rtpsim_conn_unreserve(struct rtpsim_connection *rtpc) +{ rtp_provider_instance_free(rtpc->tx.rtp_prov_inst); rtpc->tx.rtp_prov_inst = NULL; @@ -335,15 +340,25 @@ static void *reap_completion(void *_ri) } #endif +extern int osmo_ctx_init(const char *id); + static void rtpsim_main(const struct rtpsim_instance_cfg *rmp) { struct rtpsim_instance *ri; struct rtpsim_connection *rtpc; int i, rc; - ri = rtpsim_instance_init(rmp); + char namebuf[32]; + snprintf(namebuf, sizeof(namebuf), "rtpsim_worker%d", rmp->num); + osmo_ctx_init(namebuf); + + ri = rtpsim_instance_init(OTC_GLOBAL, rmp); OSMO_ASSERT(ri); + pthread_rwlock_wrlock(&g_rtpsim->rwlock); + llist_add_tail(&ri->list, &g_rtpsim->instances); + pthread_rwlock_unlock(&g_rtpsim->rwlock); + /* create desired number of sockets */ printf("binding sockets\n"); for (i = 0; i < rmp->num_flows; i++) { @@ -367,16 +382,18 @@ static void rtpsim_main(const struct rtpsim_instance_cfg *rmp) OSMO_ASSERT(rtpc); } +#if 1 /* HACK */ printf("connecting sockets\n"); for (i = 0; i < rmp->num_flows; i++) { char namebuf[32]; snprintf(namebuf, sizeof(namebuf), "conn%d", i); - struct rtpsim_connection *rtpc = rtpsim_conn_reserve(ri, namebuf); + struct rtpsim_connection *rtpc = rtpsim_conn_reserve(ri, namebuf, CODEC_GSM_FR); OSMO_ASSERT(rtpc); OSMO_ASSERT(rtpsim_conn_connect(rtpc) == 0); - OSMO_ASSERT(rtpsim_conn_start(rtpc, CODEC_GSM_FR) == 0); + OSMO_ASSERT(rtpsim_conn_start(rtpc) == 0); } +#endif #ifdef USE_REGISTERED_FILES /* register all our file descriptors; seems to fail on 5.8.x ? */ @@ -506,13 +523,19 @@ int main(int argc, char **argv) { pthread_t worker[NR_WORKERS]; struct rtpsim_instance_cfg rmp[NR_WORKERS]; - int i; + int i, rc; - osmo_init_logging2(NULL, NULL); + g_rtpsim = talloc_zero(NULL, struct rtpsim_global); + OSMO_ASSERT(g_rtpsim); + INIT_LLIST_HEAD(&g_rtpsim->instances); + pthread_rwlock_init(&g_rtpsim->rwlock, NULL); + osmo_init_logging2(g_rtpsim, NULL); +// osmo_stats_init(g_rtpsim); + + /* Create worker threads */ for (i = 0; i < NR_WORKERS; i++) { int rc; - rmp[i].ctx = talloc_named(NULL, 0, "rtpsim%d", i); rmp[i].num = i; rmp[i].num_flows = NUM_FLOWS_PER_WORKER; rmp[i].base_port = 10000 + i * (2 * rmp[i].num_flows); @@ -520,6 +543,17 @@ int main(int argc, char **argv) OSMO_ASSERT(rc >= 0); } + /* CTRL interface */ + //g_rtpsim->ctrl = ctrl_interface_setup_dynip(g_rss, ctrl_vty_get_bind_addr(), 11111, NULL); + g_rtpsim->ctrl = ctrl_interface_setup_dynip(g_rtpsim, "127.0.0.1", 11111, NULL); + OSMO_ASSERT(g_rtpsim->ctrl); + rc = rtpsource_ctrl_cmds_install(); + OSMO_ASSERT(rc == 0); + + while (1) { + osmo_select_main(0); + } + for (i = 0; i < NR_WORKERS; i++) { pthread_join(worker[i], NULL); } |