summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHolger Hans Peter Freyther <zecke@selfish.org>2011-08-30 13:54:44 +0200
committerHolger Hans Peter Freyther <zecke@selfish.org>2011-08-30 15:04:16 +0200
commitde51b57c9111f195b342c63121c6471863f89772 (patch)
tree8eb455cbbdded1f8502d9cd4062f8451062cf8cf
parent7217ee6c71629632281293219d80f71f69bbc128 (diff)
mgcp: Work on compress/uncompress on a per endpoint usage...
-rw-r--r--openbsc/include/openbsc/mgcp.h2
-rw-r--r--openbsc/include/openbsc/mgcp_internal.h6
-rw-r--r--openbsc/src/libmgcp/mgcp_network.c221
-rw-r--r--openbsc/src/libmgcp/mgcp_protocol.c6
-rw-r--r--openbsc/src/libmgcp/rtp_helper.c12
5 files changed, 221 insertions, 26 deletions
diff --git a/openbsc/include/openbsc/mgcp.h b/openbsc/include/openbsc/mgcp.h
index 7c290c7..4bf9403 100644
--- a/openbsc/include/openbsc/mgcp.h
+++ b/openbsc/include/openbsc/mgcp.h
@@ -119,6 +119,8 @@ struct mgcp_trunk_config {
unsigned int number_endpoints;
struct mgcp_endpoint *endpoints;
+
+ enum { COMPR_NONE, COMPR_BTS, COMPR_NET } compress_dir;
};
struct mgcp_config {
diff --git a/openbsc/include/openbsc/mgcp_internal.h b/openbsc/include/openbsc/mgcp_internal.h
index 138cc09..edf08d3 100644
--- a/openbsc/include/openbsc/mgcp_internal.h
+++ b/openbsc/include/openbsc/mgcp_internal.h
@@ -145,6 +145,9 @@ struct mgcp_endpoint {
int compr_enabled;
struct mgcp_rtp_compr_state compr_loc_state;
struct mgcp_rtp_compr_state compr_rem_state;
+ /* right now one can only compress in one direction */
+ struct llist_head compr_queue;
+ int compr_queue_size;
};
#define ENDPOINT_NUMBER(endp) abs(endp - endp->tcfg->endpoints)
@@ -179,4 +182,7 @@ int rtp_decompress(struct mgcp_rtp_compr_state *state, struct llist_head *list,
struct msgb *msg);
+void mgcp_msgb_clear_queue(struct llist_head *list);
+struct msgb *mgcp_msgb_alloc(void);
+
#endif
diff --git a/openbsc/src/libmgcp/mgcp_network.c b/openbsc/src/libmgcp/mgcp_network.c
index 91a9704..da30294 100644
--- a/openbsc/src/libmgcp/mgcp_network.c
+++ b/openbsc/src/libmgcp/mgcp_network.c
@@ -25,6 +25,7 @@
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
+#include <assert.h>
#include <sys/socket.h>
#include <arpa/inet.h>
@@ -55,6 +56,8 @@ enum {
#define DUMMY_LOAD 0x23
+static int send_to(struct mgcp_endpoint *endp, int dest, int is_rtp,
+ struct sockaddr_in *addr, char *buf, int rc);
static int udp_send(int fd, struct in_addr *addr, int port, char *buf, int len)
{
@@ -74,6 +77,16 @@ int mgcp_send_dummy(struct mgcp_endpoint *endp)
endp->net_end.rtp_port, buf, 1);
}
+void mgcp_msgb_clear_queue(struct llist_head *head)
+{
+ struct msgb *msg;
+
+ while (!llist_empty(head)) {
+ msg = msgb_dequeue(head);
+ msgb_free(msg);
+ }
+}
+
static void patch_and_count(struct mgcp_endpoint *endp, struct mgcp_rtp_state *state,
int payload, struct sockaddr_in *addr, char *data, int len)
{
@@ -166,6 +179,184 @@ static int send_transcoder(struct mgcp_rtp_end *end, struct mgcp_config *cfg,
return rc;
}
+static struct msgb *from_data(char *buf, int rc)
+{
+ struct msgb *msg = mgcp_msgb_alloc();
+ if (!msg) {
+ LOGP(DMGCP, LOGL_ERROR, "Failed to allocate.\n");
+ return NULL;
+ }
+
+ msg->l2h = msgb_put(msg, rc);
+ memcpy(msg->l2h, buf, rc);
+ return msg;
+}
+
+static int maybe_send_queue(struct mgcp_endpoint *endp,
+ struct mgcp_rtp_end *rtp_end)
+{
+ struct msgb *out;
+ int rc;
+
+ /* Queue up to four messages per endpoint */
+ if (++endp->compr_queue_size != 4)
+ return 0;
+
+ /* Allocate the outgoing data */
+ out = mgcp_msgb_alloc();
+ if (!out) {
+ LOGP(DMGCP, LOGL_ERROR, "Failed to allocate.\n");
+ goto cleanup;
+ }
+
+ /* Attempt to compress the samples */
+ rc = rtp_compress(&endp->compr_loc_state, out,
+ ENDPOINT_NUMBER(endp), &endp->compr_queue);
+ if (rc == 0) {
+ LOGP(DMGCP, LOGL_ERROR, "Failed to compress. %d\n", rc);
+ goto cleanup;
+ }
+
+ /* send them out */
+ rc = udp_send(rtp_end->rtp.fd, &rtp_end->addr, rtp_end->rtp_port,
+ (char *) out->l2h, msgb_l2len(out));
+
+ /* cleanup */
+ msgb_free(out);
+ assert(llist_empty(&endp->compr_queue));
+ endp->compr_queue_size = 0;
+ return 1;
+
+cleanup:
+ mgcp_msgb_clear_queue(&endp->compr_queue);
+ endp->compr_queue_size = 0;
+ return 0;
+}
+
+static int queue_for_compr_net(struct mgcp_endpoint *endp, char *buf, int rc)
+{
+ struct msgb *msg = from_data(buf, rc);
+ if (!msg)
+ return -1;
+
+ msgb_enqueue(&endp->compr_queue, msg);
+ return maybe_send_queue(endp, &endp->net_end);
+}
+
+static int queue_for_compr_bts(struct mgcp_endpoint *endp, char *buf, int rc)
+{
+ struct msgb *msg = from_data(buf, rc);
+ if (!msg)
+ return -1;
+
+ msgb_enqueue(&endp->compr_queue, msg);
+ return maybe_send_queue(endp, &endp->net_end);
+}
+
+static int dispatch_from_net(struct osmo_fd *fd, int proto,
+ struct sockaddr_in *addr,
+ struct mgcp_endpoint *endp,
+ char *buf, int rc)
+{
+ endp->net_end.packets += 1;
+
+ forward_data(fd->fd, &endp->taps[MGCP_TAP_NET_IN], buf, rc);
+ if (endp->is_transcoded)
+ return send_transcoder(&endp->trans_net, endp->cfg, proto == PROTO_RTP, &buf[0], rc);
+ else {
+ return send_to(endp, DEST_BTS, proto == PROTO_RTP, addr, &buf[0], rc);
+ }
+}
+
+static int handle_compr_net(struct osmo_fd *fd, int proto,
+ struct sockaddr_in *addr,
+ struct mgcp_endpoint *endp,
+ char *buf, int rc)
+{
+ struct msgb *msg, *tmp;
+ struct llist_head out_list;
+ INIT_LLIST_HEAD(&out_list);
+
+ msg = from_data(buf, rc);
+ if (!msg) {
+ LOGP(DMGCP, LOGL_ERROR,
+ "Failed to allocate buffer for decode on %d/0x%x\n",
+ ENDPOINT_NUMBER(endp), ENDPOINT_NUMBER(endp));
+ return -1;
+ }
+
+ rc = rtp_decompress(&endp->compr_rem_state, &out_list, msg);
+ msgb_free(msg);
+
+ if (rc != 0) {
+ LOGP(DMGCP, LOGL_ERROR,
+ "Failed to decode RTP stream %d/0x%x\n",
+ ENDPOINT_NUMBER(endp), ENDPOINT_NUMBER(endp));
+ return -1;
+ }
+
+ llist_for_each_entry_safe(msg, tmp, &out_list, list) {
+ dispatch_from_net(fd, proto, addr, endp, (char *) msg->l2h, msgb_l2len(msg));
+ llist_del(&msg->list);
+ msgb_free(msg);
+ }
+
+ return 0;
+}
+
+static int dispatch_from_bts(struct osmo_fd *fd, int proto,
+ struct sockaddr_in *addr,
+ struct mgcp_endpoint *endp,
+ char *buf, int rc)
+{
+ /* do this before the loop handling */
+ endp->bts_end.packets += 1;
+
+ forward_data(fd->fd, &endp->taps[MGCP_TAP_BTS_IN], buf, rc);
+ if (endp->is_transcoded)
+ return send_transcoder(&endp->trans_bts, endp->cfg, proto == PROTO_RTP, &buf[0], rc);
+ else
+ return send_to(endp, DEST_NETWORK, proto == PROTO_RTP, addr, &buf[0], rc);
+}
+
+static int handle_compr_bts(struct osmo_fd *fd, int proto,
+ struct sockaddr_in *addr,
+ struct mgcp_endpoint *endp,
+ char *buf, int rc)
+{
+ struct msgb *msg, *tmp;
+ struct llist_head out_list;
+ INIT_LLIST_HEAD(&out_list);
+
+#warning "REMOVE code duplication here..."
+
+ msg = from_data(buf, rc);
+ if (!msg) {
+ LOGP(DMGCP, LOGL_ERROR,
+ "Failed to allocate buffer for decode on %d/0x%x\n",
+ ENDPOINT_NUMBER(endp), ENDPOINT_NUMBER(endp));
+ return -1;
+ }
+
+ rc = rtp_decompress(&endp->compr_rem_state, &out_list, msg);
+ msgb_free(msg);
+
+ if (rc != 0) {
+ LOGP(DMGCP, LOGL_ERROR,
+ "Failed to decode RTP stream %d/0x%x\n",
+ ENDPOINT_NUMBER(endp), ENDPOINT_NUMBER(endp));
+ return -1;
+ }
+
+ llist_for_each_entry_safe(msg, tmp, &out_list, list) {
+ dispatch_from_bts(fd, proto, addr, endp, (char *) msg->l2h, msgb_l2len(msg));
+ llist_del(&msg->list);
+ msgb_free(msg);
+ }
+
+ return 0;
+}
+
static int send_to(struct mgcp_endpoint *endp, int dest, int is_rtp,
struct sockaddr_in *addr, char *buf, int rc)
{
@@ -185,9 +376,11 @@ static int send_to(struct mgcp_endpoint *endp, int dest, int is_rtp,
addr, buf, rc);
forward_data(endp->net_end.rtp.fd,
&endp->taps[MGCP_TAP_NET_OUT], buf, rc);
+ if (tcfg->compress_dir == COMPR_NET && endp->compr_enabled)
+ return queue_for_compr_net(endp, buf, rc);
return udp_send(endp->net_end.rtp.fd, &endp->net_end.addr,
endp->net_end.rtp_port, buf, rc);
- } else {
+ } else if (!endp->compr_enabled) {
return udp_send(endp->net_end.rtcp.fd, &endp->net_end.addr,
endp->net_end.rtcp_port, buf, rc);
}
@@ -198,13 +391,17 @@ static int send_to(struct mgcp_endpoint *endp, int dest, int is_rtp,
addr, buf, rc);
forward_data(endp->bts_end.rtp.fd,
&endp->taps[MGCP_TAP_BTS_OUT], buf, rc);
+ if (tcfg->compress_dir == COMPR_BTS && endp->compr_enabled)
+ return queue_for_compr_bts(endp, buf, rc);
return udp_send(endp->bts_end.rtp.fd, &endp->bts_end.addr,
endp->bts_end.rtp_port, buf, rc);
- } else {
+ } else if (!endp->compr_enabled) {
return udp_send(endp->bts_end.rtcp.fd, &endp->bts_end.addr,
endp->bts_end.rtcp_port, buf, rc);
}
}
+
+ return -1;
}
static int receive_from(struct mgcp_endpoint *endp, int fd, struct sockaddr_in *addr,
@@ -266,13 +463,11 @@ static int rtp_data_net(struct osmo_fd *fd, unsigned int what)
}
proto = fd == &endp->net_end.rtp ? PROTO_RTP : PROTO_RTCP;
- endp->net_end.packets += 1;
- forward_data(fd->fd, &endp->taps[MGCP_TAP_NET_IN], buf, rc);
- if (endp->is_transcoded)
- return send_transcoder(&endp->trans_net, endp->cfg, proto == PROTO_RTP, &buf[0], rc);
- else
- return send_to(endp, DEST_BTS, proto == PROTO_RTP, &addr, &buf[0], rc);
+ if (endp->compr_enabled && endp->tcfg->compress_dir == COMPR_NET)
+ return handle_compr_net(fd, proto, &addr, endp, buf, rc);
+
+ return dispatch_from_net(fd, proto, &addr, endp, buf, rc);
}
static void discover_bts(struct mgcp_endpoint *endp, int proto, struct sockaddr_in *addr)
@@ -345,14 +540,10 @@ static int rtp_data_bts(struct osmo_fd *fd, unsigned int what)
return 0;
}
- /* do this before the loop handling */
- endp->bts_end.packets += 1;
+ if (endp->compr_enabled && endp->tcfg->compress_dir == COMPR_BTS)
+ return handle_compr_bts(fd, proto, &addr, endp, buf, rc);
- forward_data(fd->fd, &endp->taps[MGCP_TAP_BTS_IN], buf, rc);
- if (endp->is_transcoded)
- return send_transcoder(&endp->trans_bts, endp->cfg, proto == PROTO_RTP, &buf[0], rc);
- else
- return send_to(endp, DEST_NETWORK, proto == PROTO_RTP, &addr, &buf[0], rc);
+ return dispatch_from_bts(fd, proto, &addr, endp, buf, rc);
}
static int rtp_data_transcoder(struct mgcp_rtp_end *end, struct mgcp_endpoint *_endp,
diff --git a/openbsc/src/libmgcp/mgcp_protocol.c b/openbsc/src/libmgcp/mgcp_protocol.c
index b049b78..335d1dd 100644
--- a/openbsc/src/libmgcp/mgcp_protocol.c
+++ b/openbsc/src/libmgcp/mgcp_protocol.c
@@ -132,7 +132,7 @@ static const struct mgcp_request mgcp_requests [] = {
MGCP_REQUEST("RSIP", handle_rsip, "ReSetInProgress")
};
-static struct msgb *mgcp_msgb_alloc(void)
+struct msgb *mgcp_msgb_alloc(void)
{
struct msgb *msg;
msg = msgb_alloc_headroom(4096, 128, "MGCP msg");
@@ -1009,6 +1009,7 @@ int mgcp_endpoints_allocate(struct mgcp_trunk_config *tcfg)
tcfg->endpoints[i].compr_loc_state.last_ts = -1;
tcfg->endpoints[i].compr_rem_state.last_ts = -1;
+ INIT_LLIST_HEAD(&tcfg->endpoints[i].compr_queue);
}
return 0;
@@ -1045,6 +1046,9 @@ void mgcp_free_endp(struct mgcp_endpoint *endp)
memset(&endp->taps, 0, sizeof(endp->taps));
endp->compr_enabled = 0;
+ mgcp_msgb_clear_queue(&endp->compr_queue);
+ endp->compr_queue_size = 0;
+
memset(&endp->compr_loc_state, 0, sizeof(endp->compr_loc_state));
endp->compr_loc_state.last_ts = -1;
diff --git a/openbsc/src/libmgcp/rtp_helper.c b/openbsc/src/libmgcp/rtp_helper.c
index 0ad895d..d1c356c 100644
--- a/openbsc/src/libmgcp/rtp_helper.c
+++ b/openbsc/src/libmgcp/rtp_helper.c
@@ -221,11 +221,7 @@ static int read_compressed_big(struct msgb *msg,
return 0;
clean_all:
- while (llist_empty(list)) {
- struct msgb *msg = msgb_dequeue(list);
- talloc_free(msg);
- }
-
+ mgcp_msgb_clear_queue(list);
return -8;
}
@@ -276,11 +272,7 @@ static int read_compressed_slim(struct msgb *msg,
return 0;
clean_all:
- while (llist_empty(list)) {
- struct msgb *msg = msgb_dequeue(list);
- talloc_free(msg);
- }
-
+ mgcp_msgb_clear_queue(list);
return -8;
}