diff options
author | Holger Hans Peter Freyther <zecke@selfish.org> | 2011-08-30 13:54:44 +0200 |
---|---|---|
committer | Holger Hans Peter Freyther <zecke@selfish.org> | 2011-08-30 15:04:16 +0200 |
commit | de51b57c9111f195b342c63121c6471863f89772 (patch) | |
tree | 8eb455cbbdded1f8502d9cd4062f8451062cf8cf /openbsc | |
parent | 7217ee6c71629632281293219d80f71f69bbc128 (diff) |
mgcp: Work on compress/uncompress on a per endpoint usage...
Diffstat (limited to 'openbsc')
-rw-r--r-- | openbsc/include/openbsc/mgcp.h | 2 | ||||
-rw-r--r-- | openbsc/include/openbsc/mgcp_internal.h | 6 | ||||
-rw-r--r-- | openbsc/src/libmgcp/mgcp_network.c | 221 | ||||
-rw-r--r-- | openbsc/src/libmgcp/mgcp_protocol.c | 6 | ||||
-rw-r--r-- | openbsc/src/libmgcp/rtp_helper.c | 12 |
5 files changed, 221 insertions, 26 deletions
diff --git a/openbsc/include/openbsc/mgcp.h b/openbsc/include/openbsc/mgcp.h index 7c290c7f3..4bf9403cb 100644 --- a/openbsc/include/openbsc/mgcp.h +++ b/openbsc/include/openbsc/mgcp.h @@ -119,6 +119,8 @@ struct mgcp_trunk_config { unsigned int number_endpoints; struct mgcp_endpoint *endpoints; + + enum { COMPR_NONE, COMPR_BTS, COMPR_NET } compress_dir; }; struct mgcp_config { diff --git a/openbsc/include/openbsc/mgcp_internal.h b/openbsc/include/openbsc/mgcp_internal.h index 138cc091d..edf08d34e 100644 --- a/openbsc/include/openbsc/mgcp_internal.h +++ b/openbsc/include/openbsc/mgcp_internal.h @@ -145,6 +145,9 @@ struct mgcp_endpoint { int compr_enabled; struct mgcp_rtp_compr_state compr_loc_state; struct mgcp_rtp_compr_state compr_rem_state; + /* right now one can only compress in one direction */ + struct llist_head compr_queue; + int compr_queue_size; }; #define ENDPOINT_NUMBER(endp) abs(endp - endp->tcfg->endpoints) @@ -179,4 +182,7 @@ int rtp_decompress(struct mgcp_rtp_compr_state *state, struct llist_head *list, struct msgb *msg); +void mgcp_msgb_clear_queue(struct llist_head *list); +struct msgb *mgcp_msgb_alloc(void); + #endif diff --git a/openbsc/src/libmgcp/mgcp_network.c b/openbsc/src/libmgcp/mgcp_network.c index 91a97047e..da3029408 100644 --- a/openbsc/src/libmgcp/mgcp_network.c +++ b/openbsc/src/libmgcp/mgcp_network.c @@ -25,6 +25,7 @@ #include <stdlib.h> #include <unistd.h> #include <errno.h> +#include <assert.h> #include <sys/socket.h> #include <arpa/inet.h> @@ -55,6 +56,8 @@ enum { #define DUMMY_LOAD 0x23 +static int send_to(struct mgcp_endpoint *endp, int dest, int is_rtp, + struct sockaddr_in *addr, char *buf, int rc); static int udp_send(int fd, struct in_addr *addr, int port, char *buf, int len) { @@ -74,6 +77,16 @@ int mgcp_send_dummy(struct mgcp_endpoint *endp) endp->net_end.rtp_port, buf, 1); } +void mgcp_msgb_clear_queue(struct llist_head *head) +{ + struct msgb *msg; + + while (!llist_empty(head)) { + msg = msgb_dequeue(head); + msgb_free(msg); + } +} + static void patch_and_count(struct mgcp_endpoint *endp, struct mgcp_rtp_state *state, int payload, struct sockaddr_in *addr, char *data, int len) { @@ -166,6 +179,184 @@ static int send_transcoder(struct mgcp_rtp_end *end, struct mgcp_config *cfg, return rc; } +static struct msgb *from_data(char *buf, int rc) +{ + struct msgb *msg = mgcp_msgb_alloc(); + if (!msg) { + LOGP(DMGCP, LOGL_ERROR, "Failed to allocate.\n"); + return NULL; + } + + msg->l2h = msgb_put(msg, rc); + memcpy(msg->l2h, buf, rc); + return msg; +} + +static int maybe_send_queue(struct mgcp_endpoint *endp, + struct mgcp_rtp_end *rtp_end) +{ + struct msgb *out; + int rc; + + /* Queue up to four messages per endpoint */ + if (++endp->compr_queue_size != 4) + return 0; + + /* Allocate the outgoing data */ + out = mgcp_msgb_alloc(); + if (!out) { + LOGP(DMGCP, LOGL_ERROR, "Failed to allocate.\n"); + goto cleanup; + } + + /* Attempt to compress the samples */ + rc = rtp_compress(&endp->compr_loc_state, out, + ENDPOINT_NUMBER(endp), &endp->compr_queue); + if (rc == 0) { + LOGP(DMGCP, LOGL_ERROR, "Failed to compress. %d\n", rc); + goto cleanup; + } + + /* send them out */ + rc = udp_send(rtp_end->rtp.fd, &rtp_end->addr, rtp_end->rtp_port, + (char *) out->l2h, msgb_l2len(out)); + + /* cleanup */ + msgb_free(out); + assert(llist_empty(&endp->compr_queue)); + endp->compr_queue_size = 0; + return 1; + +cleanup: + mgcp_msgb_clear_queue(&endp->compr_queue); + endp->compr_queue_size = 0; + return 0; +} + +static int queue_for_compr_net(struct mgcp_endpoint *endp, char *buf, int rc) +{ + struct msgb *msg = from_data(buf, rc); + if (!msg) + return -1; + + msgb_enqueue(&endp->compr_queue, msg); + return maybe_send_queue(endp, &endp->net_end); +} + +static int queue_for_compr_bts(struct mgcp_endpoint *endp, char *buf, int rc) +{ + struct msgb *msg = from_data(buf, rc); + if (!msg) + return -1; + + msgb_enqueue(&endp->compr_queue, msg); + return maybe_send_queue(endp, &endp->net_end); +} + +static int dispatch_from_net(struct osmo_fd *fd, int proto, + struct sockaddr_in *addr, + struct mgcp_endpoint *endp, + char *buf, int rc) +{ + endp->net_end.packets += 1; + + forward_data(fd->fd, &endp->taps[MGCP_TAP_NET_IN], buf, rc); + if (endp->is_transcoded) + return send_transcoder(&endp->trans_net, endp->cfg, proto == PROTO_RTP, &buf[0], rc); + else { + return send_to(endp, DEST_BTS, proto == PROTO_RTP, addr, &buf[0], rc); + } +} + +static int handle_compr_net(struct osmo_fd *fd, int proto, + struct sockaddr_in *addr, + struct mgcp_endpoint *endp, + char *buf, int rc) +{ + struct msgb *msg, *tmp; + struct llist_head out_list; + INIT_LLIST_HEAD(&out_list); + + msg = from_data(buf, rc); + if (!msg) { + LOGP(DMGCP, LOGL_ERROR, + "Failed to allocate buffer for decode on %d/0x%x\n", + ENDPOINT_NUMBER(endp), ENDPOINT_NUMBER(endp)); + return -1; + } + + rc = rtp_decompress(&endp->compr_rem_state, &out_list, msg); + msgb_free(msg); + + if (rc != 0) { + LOGP(DMGCP, LOGL_ERROR, + "Failed to decode RTP stream %d/0x%x\n", + ENDPOINT_NUMBER(endp), ENDPOINT_NUMBER(endp)); + return -1; + } + + llist_for_each_entry_safe(msg, tmp, &out_list, list) { + dispatch_from_net(fd, proto, addr, endp, (char *) msg->l2h, msgb_l2len(msg)); + llist_del(&msg->list); + msgb_free(msg); + } + + return 0; +} + +static int dispatch_from_bts(struct osmo_fd *fd, int proto, + struct sockaddr_in *addr, + struct mgcp_endpoint *endp, + char *buf, int rc) +{ + /* do this before the loop handling */ + endp->bts_end.packets += 1; + + forward_data(fd->fd, &endp->taps[MGCP_TAP_BTS_IN], buf, rc); + if (endp->is_transcoded) + return send_transcoder(&endp->trans_bts, endp->cfg, proto == PROTO_RTP, &buf[0], rc); + else + return send_to(endp, DEST_NETWORK, proto == PROTO_RTP, addr, &buf[0], rc); +} + +static int handle_compr_bts(struct osmo_fd *fd, int proto, + struct sockaddr_in *addr, + struct mgcp_endpoint *endp, + char *buf, int rc) +{ + struct msgb *msg, *tmp; + struct llist_head out_list; + INIT_LLIST_HEAD(&out_list); + +#warning "REMOVE code duplication here..." + + msg = from_data(buf, rc); + if (!msg) { + LOGP(DMGCP, LOGL_ERROR, + "Failed to allocate buffer for decode on %d/0x%x\n", + ENDPOINT_NUMBER(endp), ENDPOINT_NUMBER(endp)); + return -1; + } + + rc = rtp_decompress(&endp->compr_rem_state, &out_list, msg); + msgb_free(msg); + + if (rc != 0) { + LOGP(DMGCP, LOGL_ERROR, + "Failed to decode RTP stream %d/0x%x\n", + ENDPOINT_NUMBER(endp), ENDPOINT_NUMBER(endp)); + return -1; + } + + llist_for_each_entry_safe(msg, tmp, &out_list, list) { + dispatch_from_bts(fd, proto, addr, endp, (char *) msg->l2h, msgb_l2len(msg)); + llist_del(&msg->list); + msgb_free(msg); + } + + return 0; +} + static int send_to(struct mgcp_endpoint *endp, int dest, int is_rtp, struct sockaddr_in *addr, char *buf, int rc) { @@ -185,9 +376,11 @@ static int send_to(struct mgcp_endpoint *endp, int dest, int is_rtp, addr, buf, rc); forward_data(endp->net_end.rtp.fd, &endp->taps[MGCP_TAP_NET_OUT], buf, rc); + if (tcfg->compress_dir == COMPR_NET && endp->compr_enabled) + return queue_for_compr_net(endp, buf, rc); return udp_send(endp->net_end.rtp.fd, &endp->net_end.addr, endp->net_end.rtp_port, buf, rc); - } else { + } else if (!endp->compr_enabled) { return udp_send(endp->net_end.rtcp.fd, &endp->net_end.addr, endp->net_end.rtcp_port, buf, rc); } @@ -198,13 +391,17 @@ static int send_to(struct mgcp_endpoint *endp, int dest, int is_rtp, addr, buf, rc); forward_data(endp->bts_end.rtp.fd, &endp->taps[MGCP_TAP_BTS_OUT], buf, rc); + if (tcfg->compress_dir == COMPR_BTS && endp->compr_enabled) + return queue_for_compr_bts(endp, buf, rc); return udp_send(endp->bts_end.rtp.fd, &endp->bts_end.addr, endp->bts_end.rtp_port, buf, rc); - } else { + } else if (!endp->compr_enabled) { return udp_send(endp->bts_end.rtcp.fd, &endp->bts_end.addr, endp->bts_end.rtcp_port, buf, rc); } } + + return -1; } static int receive_from(struct mgcp_endpoint *endp, int fd, struct sockaddr_in *addr, @@ -266,13 +463,11 @@ static int rtp_data_net(struct osmo_fd *fd, unsigned int what) } proto = fd == &endp->net_end.rtp ? PROTO_RTP : PROTO_RTCP; - endp->net_end.packets += 1; - forward_data(fd->fd, &endp->taps[MGCP_TAP_NET_IN], buf, rc); - if (endp->is_transcoded) - return send_transcoder(&endp->trans_net, endp->cfg, proto == PROTO_RTP, &buf[0], rc); - else - return send_to(endp, DEST_BTS, proto == PROTO_RTP, &addr, &buf[0], rc); + if (endp->compr_enabled && endp->tcfg->compress_dir == COMPR_NET) + return handle_compr_net(fd, proto, &addr, endp, buf, rc); + + return dispatch_from_net(fd, proto, &addr, endp, buf, rc); } static void discover_bts(struct mgcp_endpoint *endp, int proto, struct sockaddr_in *addr) @@ -345,14 +540,10 @@ static int rtp_data_bts(struct osmo_fd *fd, unsigned int what) return 0; } - /* do this before the loop handling */ - endp->bts_end.packets += 1; + if (endp->compr_enabled && endp->tcfg->compress_dir == COMPR_BTS) + return handle_compr_bts(fd, proto, &addr, endp, buf, rc); - forward_data(fd->fd, &endp->taps[MGCP_TAP_BTS_IN], buf, rc); - if (endp->is_transcoded) - return send_transcoder(&endp->trans_bts, endp->cfg, proto == PROTO_RTP, &buf[0], rc); - else - return send_to(endp, DEST_NETWORK, proto == PROTO_RTP, &addr, &buf[0], rc); + return dispatch_from_bts(fd, proto, &addr, endp, buf, rc); } static int rtp_data_transcoder(struct mgcp_rtp_end *end, struct mgcp_endpoint *_endp, diff --git a/openbsc/src/libmgcp/mgcp_protocol.c b/openbsc/src/libmgcp/mgcp_protocol.c index b049b7854..335d1dd93 100644 --- a/openbsc/src/libmgcp/mgcp_protocol.c +++ b/openbsc/src/libmgcp/mgcp_protocol.c @@ -132,7 +132,7 @@ static const struct mgcp_request mgcp_requests [] = { MGCP_REQUEST("RSIP", handle_rsip, "ReSetInProgress") }; -static struct msgb *mgcp_msgb_alloc(void) +struct msgb *mgcp_msgb_alloc(void) { struct msgb *msg; msg = msgb_alloc_headroom(4096, 128, "MGCP msg"); @@ -1009,6 +1009,7 @@ int mgcp_endpoints_allocate(struct mgcp_trunk_config *tcfg) tcfg->endpoints[i].compr_loc_state.last_ts = -1; tcfg->endpoints[i].compr_rem_state.last_ts = -1; + INIT_LLIST_HEAD(&tcfg->endpoints[i].compr_queue); } return 0; @@ -1045,6 +1046,9 @@ void mgcp_free_endp(struct mgcp_endpoint *endp) memset(&endp->taps, 0, sizeof(endp->taps)); endp->compr_enabled = 0; + mgcp_msgb_clear_queue(&endp->compr_queue); + endp->compr_queue_size = 0; + memset(&endp->compr_loc_state, 0, sizeof(endp->compr_loc_state)); endp->compr_loc_state.last_ts = -1; diff --git a/openbsc/src/libmgcp/rtp_helper.c b/openbsc/src/libmgcp/rtp_helper.c index 0ad895d72..d1c356cd0 100644 --- a/openbsc/src/libmgcp/rtp_helper.c +++ b/openbsc/src/libmgcp/rtp_helper.c @@ -221,11 +221,7 @@ static int read_compressed_big(struct msgb *msg, return 0; clean_all: - while (llist_empty(list)) { - struct msgb *msg = msgb_dequeue(list); - talloc_free(msg); - } - + mgcp_msgb_clear_queue(list); return -8; } @@ -276,11 +272,7 @@ static int read_compressed_slim(struct msgb *msg, return 0; clean_all: - while (llist_empty(list)) { - struct msgb *msg = msgb_dequeue(list); - talloc_free(msg); - } - + mgcp_msgb_clear_queue(list); return -8; } |