diff options
Diffstat (limited to 'src/osmux.c')
-rw-r--r-- | src/osmux.c | 164 |
1 files changed, 97 insertions, 67 deletions
diff --git a/src/osmux.c b/src/osmux.c index a428b64..bfac31c 100644 --- a/src/osmux.c +++ b/src/osmux.c @@ -110,16 +110,22 @@ int osmux_xfrm_output(struct osmux_hdr *osmuxh, struct osmux_out_handle *h, INIT_LLIST_HEAD(list); for (i=0; i<osmuxh->ctr+1; i++) { + struct rtp_hdr *rtph; + char buf[4096]; + msg = osmux_rebuild_rtp(h, osmuxh, osmux_get_payload(osmuxh) + i * osmo_amr_bytes(osmuxh->amr_cmr), osmo_amr_bytes(osmuxh->amr_cmr)); if (msg == NULL) - break; + continue; - LOGP(DLMIB, LOGL_DEBUG, "extracted RTP message from batch " - "msg=%p\n", msg); + rtph = osmo_rtp_get_hdr(msg); + if (rtph == NULL) + continue; + osmo_rtp_snprintf(buf, sizeof(buf), msg); + LOGP(DLMIB, LOGL_DEBUG, "extracted: %s\n", buf); llist_add_tail(&msg->list, list); } return i; @@ -128,7 +134,7 @@ int osmux_xfrm_output(struct osmux_hdr *osmuxh, struct osmux_out_handle *h, struct osmux_batch { struct osmo_timer_list timer; struct osmux_hdr *osmuxh; - struct llist_head msgb_list; + struct llist_head node_list; unsigned int remaining_bytes; uint8_t seq; int64_t ccid[OSMUX_MAX_CONCURRENT_CALLS]; @@ -196,46 +202,55 @@ osmux_xfrm_encode_amr(struct osmux_in_handle *h, return 0; } +struct batch_list_node { + struct llist_head head; + uint32_t ssrc; + struct llist_head list; +}; + static struct msgb *osmux_build_batch(struct osmux_in_handle *h) { - struct msgb *cur, *tmp, *batch_msg; - uint32_t last_rtp_ssrc; - int last_rtp_ssrc_set = 0, add_osmux_hdr = 1; + struct msgb *batch_msg; + struct batch_list_node *node, *tnode; struct osmux_batch *batch = (struct osmux_batch *)h->data; + LOGP(DLMIB, LOGL_DEBUG, "Now building batch\n"); + batch_msg = msgb_alloc(OSMUX_BATCH_MAX, "OSMUX"); if (batch_msg == NULL) { LOGP(DLMIB, LOGL_ERROR, "Not enough memory\n"); return NULL; } - LOGP(DLMIB, LOGL_DEBUG, "Now building batch\n"); + llist_for_each_entry_safe(node, tnode, &batch->node_list, head) { + struct msgb *cur, *tmp; + int ctr = 0; - llist_for_each_entry_safe(cur, tmp, &batch->msgb_list, list) { - struct rtp_hdr *rtph; - char buf[4096]; + llist_for_each_entry_safe(cur, tmp, &node->list, list) { + struct rtp_hdr *rtph; + char buf[4096]; + int add_osmux_hdr = 0; - rtph = osmo_rtp_get_hdr(cur); - if (rtph == NULL) - return NULL; + osmo_rtp_snprintf(buf, sizeof(buf), cur); + LOGP(DLMIB, LOGL_DEBUG, "built: %s\n", buf); - if (last_rtp_ssrc_set) { - add_osmux_hdr = (last_rtp_ssrc != rtph->ssrc); - if (add_osmux_hdr) - LOGP(DLMIB, LOGL_DEBUG, "add osmux header\n"); - } + rtph = osmo_rtp_get_hdr(cur); + if (rtph == NULL) + return NULL; - osmo_rtp_snprintf(buf, sizeof(buf), cur); - - LOGP(DLMIB, LOGL_DEBUG, "%s\n", buf); - - osmux_xfrm_encode_amr(h, batch_msg, rtph, cur, add_osmux_hdr); - - last_rtp_ssrc_set = 1; - last_rtp_ssrc = rtph->ssrc; + if (ctr == 0) { + LOGP(DLMIB, LOGL_DEBUG, "add osmux header\n"); + add_osmux_hdr = 1; + } - llist_del(&cur->list); - msgb_free(cur); + osmux_xfrm_encode_amr(h, batch_msg, rtph, cur, + add_osmux_hdr); + llist_del(&cur->list); + msgb_free(cur); + ctr++; + } + llist_del(&node->head); + talloc_free(node); } return batch_msg; } @@ -256,7 +271,7 @@ static void osmux_batch_timer_expired(void *data) { struct osmux_in_handle *h = data; - LOGP(DLMIB, LOGL_DEBUG, "received message from stream\n"); + LOGP(DLMIB, LOGL_DEBUG, "osmux_batch_timer_expired\n"); osmux_xfrm_input_deliver(h); } @@ -276,52 +291,67 @@ static int osmux_msgb_batch_queue_add(struct osmux_batch *batch, struct msgb *msg) { struct rtp_hdr *rtph; - struct msgb *cur; - int found_matching = 0, found_room = 0, bytes = 0; + struct batch_list_node *node; + int found = 0, bytes = 0; rtph = osmo_rtp_get_hdr(msg); if (rtph == NULL) - return -1; + return 0; - llist_for_each_entry(cur, &batch->msgb_list, list) { - struct rtp_hdr *rtph2; - - rtph2 = osmo_rtp_get_hdr(cur); - if (rtph2 == NULL) - return -1; - - if (rtph->ssrc == rtph2->ssrc) { - found_matching = 1; - continue; - } - - /* We insert messages in order based on the RTP SSRC. This is - * useful to build the batch. - */ - if (rtph->ssrc > rtph2->ssrc) { - found_room = 1; + /* Yes, there is room. Check if we have more message with same ssrc */ + llist_for_each_entry(node, &batch->node_list, head) { + if (node->ssrc == rtph->ssrc) { + found = 1; break; } } + /* First check if there is room for this message in the batch */ bytes += osmux_rtp_amr_payload_len(msg, rtph); - if (!found_matching) + if (!found) bytes += sizeof(struct osmux_hdr); - /* Still room in this batch for this message? if there is not - * then deliver current batch. - */ + /* No room, sorry. You'll have to retry */ if (bytes > batch->remaining_bytes) return 1; - batch->remaining_bytes -= bytes; + if (found) { + struct msgb *cur; - if (found_room) - llist_add_tail(&msg->list, &cur->list); - else - llist_add_tail(&msg->list, &batch->msgb_list); + /* Extra validation: check if this message already exists, + * should not happen but make sure we don't propagate + * duplicated messages. + */ + llist_for_each_entry(cur, &node->list, list) { + struct rtp_hdr *rtph2 = osmo_rtp_get_hdr(cur); + if (rtph2 == NULL) + return 0; + + /* Already exists message with this sequence, skip */ + if (rtph2->sequence == rtph->sequence) { + LOGP(DLMIB, LOGL_DEBUG, "already exists " + "message with seq=%u, skip it\n", + rtph->sequence); + return 0; + } + } + } else { + /* This is the first message with that ssrc we've seen */ + node = talloc_zero(NULL, struct batch_list_node); + if (node == NULL) + return 0; - LOGP(DLMIB, LOGL_DEBUG, "adding to batch (%p)\n", msg); + node->ssrc = rtph->ssrc; + INIT_LLIST_HEAD(&node->list); + llist_add_tail(&node->head, &batch->node_list); + } + + LOGP(DLMIB, LOGL_DEBUG, "adding msg with ssrc=%u to batch\n", + rtph->ssrc); + llist_add_tail(&msg->list, &node->list); + + /* Update remaining room in this batch */ + batch->remaining_bytes -= bytes; return 0; } @@ -330,9 +360,9 @@ osmux_msgb_batch_queue_add(struct osmux_batch *batch, struct msgb *msg) * osmux_xfrm_input - add RTP message to OSmux batch * \param msg: RTP message that you want to batch into one OSmux message * - * This function returns -1 on error. If 0 is returned, this indicates - * that the message has been batched. If 1 is returned, you have to - * invoke osmux_xfrm_input_deliver and try again. + * If 0 is returned, this indicates that the message has been batched or that + * an error occured and we have skipped the message. If 1 is returned, you + * have to invoke osmux_xfrm_input_deliver and try again. */ int osmux_xfrm_input(struct osmux_in_handle *h, struct msgb *msg) { @@ -342,7 +372,7 @@ int osmux_xfrm_input(struct osmux_in_handle *h, struct msgb *msg) rtph = osmo_rtp_get_hdr(msg); if (rtph == NULL) - return -1; + return 0; switch(rtph->payload_type) { case RTP_PT_RTCP: @@ -351,7 +381,7 @@ int osmux_xfrm_input(struct osmux_in_handle *h, struct msgb *msg) /* This is the first message in the batch, start the * batch timer to deliver it. */ - if (llist_empty(&batch->msgb_list)) { + if (llist_empty(&batch->node_list)) { LOGP(DLMIB, LOGL_DEBUG, "osmux start timer batch\n"); @@ -362,7 +392,7 @@ int osmux_xfrm_input(struct osmux_in_handle *h, struct msgb *msg) break; default: /* Only AMR supported so far, sorry. */ - ret = -1; + ret = 0; break; } return ret; @@ -379,7 +409,7 @@ void osmux_xfrm_input_init(struct osmux_in_handle *h) if (batch == NULL) return; - INIT_LLIST_HEAD(&batch->msgb_list); + INIT_LLIST_HEAD(&batch->node_list); batch->remaining_bytes = OSMUX_BATCH_MAX; batch->timer.cb = osmux_batch_timer_expired; batch->timer.data = h; |