aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHarald Welte <laforge@osmocom.org>2020-10-19 13:14:41 +0200
committerHarald Welte <laforge@osmocom.org>2021-11-12 21:58:14 +0100
commitb7a0e043b5e50b651ed78d5ab1ae70b8856ad1a4 (patch)
treee9b4140bd0be462dd6890bdecc4aca54f59d5142
parent004ea1398a1bfe3b847bcfb40eb8177641630027 (diff)
rtp-load-gen: Add CTRL interfacelaforge/rtp-load-gen
-rw-r--r--contrib/rtp-load-gen/Makefile9
-rw-r--r--contrib/rtp-load-gen/ctrl_if.c214
-rw-r--r--contrib/rtp-load-gen/internal.h25
-rw-r--r--contrib/rtp-load-gen/rtp-load-gen.c82
4 files changed, 302 insertions, 28 deletions
diff --git a/contrib/rtp-load-gen/Makefile b/contrib/rtp-load-gen/Makefile
index 4bec99f11..e65d112c4 100644
--- a/contrib/rtp-load-gen/Makefile
+++ b/contrib/rtp-load-gen/Makefile
@@ -1,8 +1,11 @@
-CFLAGS=-g -Wall $(shell pkg-config --cflags libosmocore libosmo-netif liburing)
-LIBS=-lpthread $(shell pkg-config --libs libosmocore libosmo-netif liburing)
+CFLAGS=-g -Wall $(shell pkg-config --cflags libosmocore libosmoctrl libosmo-netif liburing)
+LIBS=-lpthread $(shell pkg-config --libs libosmocore libosmoctrl libosmo-netif liburing)
-rtp-load-gen: rtp-load-gen.o rtp_provider.o rtp_provider_static.o
+rtp-load-gen: rtp-load-gen.o rtp_provider.o rtp_provider_static.o ctrl_if.o
$(CC) -o $@ $^ $(LIBS)
%.o: %.c
$(CC) $(CFLAGS) -o $@ -c $^
+
+clean:
+ @rm rtp-load-gen *.o
diff --git a/contrib/rtp-load-gen/ctrl_if.c b/contrib/rtp-load-gen/ctrl_if.c
new file mode 100644
index 000000000..93356eed7
--- /dev/null
+++ b/contrib/rtp-load-gen/ctrl_if.c
@@ -0,0 +1,214 @@
+/* CTRL interface of rtpsource program
+ *
+ * (C) 2020 by Harald Welte <laforge@gnumonks.org>
+ *
+ * All Rights Reserved
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <osmocom/ctrl/control_cmd.h>
+
+#include "internal.h"
+#include "rtp_provider.h"
+
+static struct rtpsim_connection *find_connection_by_cname(const char *cname)
+{
+ struct rtpsim_connection *rtpc;
+ struct rtpsim_instance *ri;
+
+ pthread_rwlock_rdlock(&g_rtpsim->rwlock);
+ llist_for_each_entry(ri, &g_rtpsim->instances, list) {
+ rtpc = rtpsim_conn_find(ri, cname);
+ if (rtpc) {
+ pthread_rwlock_unlock(&g_rtpsim->rwlock);
+ return rtpc;
+ }
+ }
+ pthread_rwlock_unlock(&g_rtpsim->rwlock);
+ return NULL;
+}
+
+static struct rtpsim_connection *create_connection(const char *cname, enum codec_type codec)
+{
+ struct rtpsim_connection *rtpc;
+ struct rtpsim_instance *ri;
+
+ pthread_rwlock_rdlock(&g_rtpsim->rwlock);
+ llist_for_each_entry(ri, &g_rtpsim->instances, list) {
+ rtpc = rtpsim_conn_reserve(ri, cname, codec);
+ if (rtpc) {
+ pthread_rwlock_unlock(&g_rtpsim->rwlock);
+ return rtpc;
+ }
+ }
+ pthread_rwlock_unlock(&g_rtpsim->rwlock);
+ return NULL;
+}
+
+static int connect_connection(struct rtpsim_connection *rtpc, const char *remote_host,
+ uint16_t remote_port, uint8_t pt)
+{
+ int rc;
+
+ osmo_sockaddr_str_from_str(&rtpc->cfg.remote, remote_host, remote_port);
+ rtpc->cfg.pt = pt;
+
+ rc = rtpsim_conn_connect(rtpc);
+ if (rc < 0)
+ return rc;
+
+ rc = rtpsim_conn_start(rtpc);
+
+ return rc;
+}
+
+static int delete_connection(struct rtpsim_connection *rtpc)
+{
+ rtpsim_conn_stop(rtpc);
+ rtpsim_conn_unreserve(rtpc);
+ return 0;
+}
+
+CTRL_CMD_DEFINE_WO_NOVRF(rtp_create, "rtp_create");
+static int set_rtp_create(struct ctrl_cmd *cmd, void *data)
+{
+ struct rtpsim_connection *conn;
+ const char *cname, *codec_str;
+ char *tmp, *saveptr;
+ enum codec_type codec;
+
+ tmp = talloc_strdup(cmd, cmd->value);
+ OSMO_ASSERT(tmp);
+
+ cname = strtok_r(tmp, ",", &saveptr);
+ codec_str = strtok_r(NULL, ",", &saveptr);
+
+ if (!cname || !codec_str) {
+ cmd->reply = "Format is cname,codec";
+ goto error;
+ }
+
+ if (find_connection_by_cname(cname)) {
+ cmd->reply = "Connection already exists for cname";
+ goto error;
+ }
+
+ codec = get_string_value(codec_type_names, codec_str);
+ if (codec < 0) {
+ cmd->reply = "Invalid codec name (try GSM_FR, GSM_EFR etc.)";
+ goto error;
+ }
+
+ conn = create_connection(cname, codec);
+ if (!conn) {
+ cmd->reply = "Error creating RTP connection";
+ goto error;
+ }
+
+ /* Respond */
+ cmd->reply = talloc_asprintf(cmd, "%s,%s,%d", conn->cname, conn->cfg.local.ip, conn->cfg.local.port);
+ talloc_free(tmp);
+ return CTRL_CMD_REPLY;
+
+error:
+ talloc_free(tmp);
+ return CTRL_CMD_ERROR;
+}
+
+CTRL_CMD_DEFINE_WO_NOVRF(rtp_connect, "rtp_connect");
+static int set_rtp_connect(struct ctrl_cmd *cmd, void *data)
+{
+ struct rtpsim_connection *conn;
+ const char *cname, *remote_host, *remote_port, *pt;
+ char *tmp, *saveptr;
+ int rc;
+
+ tmp = talloc_strdup(cmd, cmd->value);
+ OSMO_ASSERT(tmp);
+
+ /* FIXME: parse command */
+ cname = strtok_r(tmp, ",", &saveptr);
+ remote_host = strtok_r(NULL, ",", &saveptr);
+ remote_port = strtok_r(NULL, ",", &saveptr);
+ pt = strtok_r(NULL, ",", &saveptr);
+
+ if (!cname || !remote_host || !remote_port || !pt) {
+ cmd->reply = "Format is cname,remote_host,remote_port,pt";
+ talloc_free(tmp);
+ return CTRL_CMD_ERROR;
+ }
+
+ conn = find_connection_by_cname(cname);
+ if (!conn) {
+ cmd->reply = "Error finding RTP connection for connect";
+ talloc_free(tmp);
+ return CTRL_CMD_ERROR;
+ }
+
+ rc = connect_connection(conn, remote_host, atoi(remote_port), atoi(pt));
+ if (rc < 0) {
+ cmd->reply = "Error binding RTP connection";
+ talloc_free(tmp);
+ return CTRL_CMD_ERROR;
+ }
+
+ /* Respond */
+ talloc_free(tmp);
+ cmd->reply = talloc_asprintf(cmd, "%s,%s,%d,%d", conn->cname, conn->cfg.remote.ip,
+ conn->cfg.remote.port, conn->cfg.pt);
+ return CTRL_CMD_REPLY;
+}
+
+CTRL_CMD_DEFINE_WO_NOVRF(rtp_delete, "rtp_delete");
+static int set_rtp_delete(struct ctrl_cmd *cmd, void *data)
+{
+ struct rtpsim_connection *conn;
+ const char *cname = cmd->value;
+
+ conn = find_connection_by_cname(cname);
+ if (!conn) {
+ cmd->reply = "Error finding RTP connection for delete";
+ return CTRL_CMD_ERROR;
+ }
+ cmd->reply = talloc_asprintf(cmd, "%s", conn->cname);
+
+ delete_connection(conn);
+
+ /* Respond */
+ return CTRL_CMD_REPLY;
+}
+
+
+
+
+
+int rtpsource_ctrl_cmds_install(void)
+{
+ int rc;
+
+ rc = ctrl_cmd_install(CTRL_NODE_ROOT, &cmd_rtp_create);
+ if (rc)
+ goto end;
+
+ rc = ctrl_cmd_install(CTRL_NODE_ROOT, &cmd_rtp_connect);
+ if (rc)
+ goto end;
+
+ rc = ctrl_cmd_install(CTRL_NODE_ROOT, &cmd_rtp_delete);
+ if (rc)
+ goto end;
+end:
+ return rc;
+}
diff --git a/contrib/rtp-load-gen/internal.h b/contrib/rtp-load-gen/internal.h
index fb76645ea..f859e5729 100644
--- a/contrib/rtp-load-gen/internal.h
+++ b/contrib/rtp-load-gen/internal.h
@@ -1,10 +1,13 @@
#pragma once
#include <stdint.h>
+#include <pthread.h>
#include <liburing.h>
#include <osmocom/core/linuxlist.h>
#include <osmocom/core/sockaddr_str.h>
#include <osmocom/core/rate_ctr.h>
+#include <osmocom/core/select.h>
+#include <osmocom/ctrl/control_if.h>
struct rtp_provider_instance;
@@ -61,7 +64,6 @@ struct rtpsim_connection {
struct rtpsim_instance_cfg {
int num;
- void *ctx;
uint16_t base_port;
unsigned int num_flows;
};
@@ -83,7 +85,28 @@ struct rtpsim_instance {
unsigned int connections_size;
};
+struct rtpsim_global {
+ /* global list of instances */
+ struct llist_head instances;
+ pthread_rwlock_t rwlock;
+
+ struct ctrl_handle *ctrl;
+};
+
enum {
DMAIN,
};
+
+enum codec_type;
+
+extern struct rtpsim_global *g_rtpsim;
+
+struct rtpsim_connection *rtpsim_conn_find(struct rtpsim_instance *ri, const char *cname);
+struct rtpsim_connection *rtpsim_conn_reserve(struct rtpsim_instance *ri, const char *cname, enum codec_type codec);
+int rtpsim_conn_start(struct rtpsim_connection *rtpc);
+void rtpsim_conn_stop(struct rtpsim_connection *rtpc);
+void rtpsim_conn_unreserve(struct rtpsim_connection *rtpc);
+int rtpsim_conn_connect(struct rtpsim_connection *rtpc);
+
+int rtpsource_ctrl_cmds_install(void);
diff --git a/contrib/rtp-load-gen/rtp-load-gen.c b/contrib/rtp-load-gen/rtp-load-gen.c
index 5fd7c2c01..762d8580a 100644
--- a/contrib/rtp-load-gen/rtp-load-gen.c
+++ b/contrib/rtp-load-gen/rtp-load-gen.c
@@ -11,6 +11,7 @@
#include <osmocom/core/socket.h>
#include <osmocom/core/rate_ctr.h>
#include <osmocom/core/application.h>
+#include <osmocom/core/stats.h>
#include <osmocom/netif/rtp.h>
#include "internal.h"
@@ -40,6 +41,8 @@
#define TX_BUF_IDX 0
#define RX_BUF_IDX 1
+struct rtpsim_global *g_rtpsim;
+
enum rtpsim_conn_ctr {
RTP_CONN_CTR_TX_PKTS,
RTP_CONN_CTR_TX_BYTES,
@@ -79,9 +82,9 @@ static const struct rate_ctr_group_desc rtpsim_ctrg_desc = {
.ctr_desc = rtpsim_ctrs,
};
-struct rtpsim_instance *rtpsim_instance_init(const struct rtpsim_instance_cfg *rmp)
+struct rtpsim_instance *rtpsim_instance_init(void *ctx, const struct rtpsim_instance_cfg *rmp)
{
- struct rtpsim_instance *ri = talloc_zero(rmp->ctx, struct rtpsim_instance);
+ struct rtpsim_instance *ri = talloc_zero(ctx, struct rtpsim_instance);
int rc;
if (!ri)
@@ -173,9 +176,14 @@ struct rtpsim_connection *rtpsim_conn_find(struct rtpsim_instance *ri, const cha
}
/* reserve a connection; associates cname with it */
-struct rtpsim_connection *rtpsim_conn_reserve(struct rtpsim_instance *ri, const char *cname)
+struct rtpsim_connection *rtpsim_conn_reserve(struct rtpsim_instance *ri, const char *cname,
+ enum codec_type codec)
{
struct rtpsim_connection *rtpc;
+ const struct rtp_provider *rtp_prov;
+
+ rtp_prov = rtp_provider_find("static"); // TODO: configurable */
+ OSMO_ASSERT(rtp_prov);
rtpc = rtpsim_conn_find(ri, NULL);
if (!rtpc)
@@ -183,37 +191,34 @@ struct rtpsim_connection *rtpsim_conn_reserve(struct rtpsim_instance *ri, const
/* this is called from main thread, we cannot use per-thread talloc contexts
* such as ri or rtpc */
- rtpc->cname = talloc_strdup(NULL, cname);
+ rtpc->cname = talloc_strdup(g_rtpsim, cname);
+ rtpc->tx.rtp_prov_inst = rtp_provider_instance_alloc(g_rtpsim, rtp_prov, codec);
+ OSMO_ASSERT(rtpc->tx.rtp_prov_inst);
+
+ /* re-start from zero transmit sequence number */
+ rtpc->tx.seq = 0;
return rtpc;
}
-int rtpsim_conn_start(struct rtpsim_connection *rtpc, enum codec_type codec)
+int rtpsim_conn_start(struct rtpsim_connection *rtpc)
{
- const struct rtp_provider *rtp_prov;
- rtp_prov = rtp_provider_find("static"); // TODO: configurable */
- OSMO_ASSERT(rtp_prov);
-
- /* this is called from main thread, we cannot use per-thread talloc contexts
- * such as ri or rtpc */
- rtpc->tx.rtp_prov_inst = rtp_provider_instance_alloc(NULL, rtp_prov, codec);
- OSMO_ASSERT(rtpc->tx.rtp_prov_inst);
-
rtpc->tx.enabled = true;
rtpc->rx.enabled = true;
return 0;
}
-/* unreserve a connection; stops all rx/tx and removes cname */
-void rtpsim_conn_unreserve(struct rtpsim_connection *rtpc)
+void rtpsim_conn_stop(struct rtpsim_connection *rtpc)
{
/* disable Rx and Tx */
rtpc->tx.enabled = false;
rtpc->rx.enabled = false;
- /* re-start from zero transmit sequence number */
- rtpc->tx.seq = 0;
+}
+/* unreserve a connection; stops all rx/tx and removes cname */
+void rtpsim_conn_unreserve(struct rtpsim_connection *rtpc)
+{
rtp_provider_instance_free(rtpc->tx.rtp_prov_inst);
rtpc->tx.rtp_prov_inst = NULL;
@@ -335,15 +340,25 @@ static void *reap_completion(void *_ri)
}
#endif
+extern int osmo_ctx_init(const char *id);
+
static void rtpsim_main(const struct rtpsim_instance_cfg *rmp)
{
struct rtpsim_instance *ri;
struct rtpsim_connection *rtpc;
int i, rc;
- ri = rtpsim_instance_init(rmp);
+ char namebuf[32];
+ snprintf(namebuf, sizeof(namebuf), "rtpsim_worker%d", rmp->num);
+ osmo_ctx_init(namebuf);
+
+ ri = rtpsim_instance_init(OTC_GLOBAL, rmp);
OSMO_ASSERT(ri);
+ pthread_rwlock_wrlock(&g_rtpsim->rwlock);
+ llist_add_tail(&ri->list, &g_rtpsim->instances);
+ pthread_rwlock_unlock(&g_rtpsim->rwlock);
+
/* create desired number of sockets */
printf("binding sockets\n");
for (i = 0; i < rmp->num_flows; i++) {
@@ -367,16 +382,18 @@ static void rtpsim_main(const struct rtpsim_instance_cfg *rmp)
OSMO_ASSERT(rtpc);
}
+#if 1
/* HACK */
printf("connecting sockets\n");
for (i = 0; i < rmp->num_flows; i++) {
char namebuf[32];
snprintf(namebuf, sizeof(namebuf), "conn%d", i);
- struct rtpsim_connection *rtpc = rtpsim_conn_reserve(ri, namebuf);
+ struct rtpsim_connection *rtpc = rtpsim_conn_reserve(ri, namebuf, CODEC_GSM_FR);
OSMO_ASSERT(rtpc);
OSMO_ASSERT(rtpsim_conn_connect(rtpc) == 0);
- OSMO_ASSERT(rtpsim_conn_start(rtpc, CODEC_GSM_FR) == 0);
+ OSMO_ASSERT(rtpsim_conn_start(rtpc) == 0);
}
+#endif
#ifdef USE_REGISTERED_FILES
/* register all our file descriptors; seems to fail on 5.8.x ? */
@@ -506,13 +523,19 @@ int main(int argc, char **argv)
{
pthread_t worker[NR_WORKERS];
struct rtpsim_instance_cfg rmp[NR_WORKERS];
- int i;
+ int i, rc;
- osmo_init_logging2(NULL, NULL);
+ g_rtpsim = talloc_zero(NULL, struct rtpsim_global);
+ OSMO_ASSERT(g_rtpsim);
+ INIT_LLIST_HEAD(&g_rtpsim->instances);
+ pthread_rwlock_init(&g_rtpsim->rwlock, NULL);
+ osmo_init_logging2(g_rtpsim, NULL);
+// osmo_stats_init(g_rtpsim);
+
+ /* Create worker threads */
for (i = 0; i < NR_WORKERS; i++) {
int rc;
- rmp[i].ctx = talloc_named(NULL, 0, "rtpsim%d", i);
rmp[i].num = i;
rmp[i].num_flows = NUM_FLOWS_PER_WORKER;
rmp[i].base_port = 10000 + i * (2 * rmp[i].num_flows);
@@ -520,6 +543,17 @@ int main(int argc, char **argv)
OSMO_ASSERT(rc >= 0);
}
+ /* CTRL interface */
+ //g_rtpsim->ctrl = ctrl_interface_setup_dynip(g_rss, ctrl_vty_get_bind_addr(), 11111, NULL);
+ g_rtpsim->ctrl = ctrl_interface_setup_dynip(g_rtpsim, "127.0.0.1", 11111, NULL);
+ OSMO_ASSERT(g_rtpsim->ctrl);
+ rc = rtpsource_ctrl_cmds_install();
+ OSMO_ASSERT(rc == 0);
+
+ while (1) {
+ osmo_select_main(0);
+ }
+
for (i = 0; i < NR_WORKERS; i++) {
pthread_join(worker[i], NULL);
}