aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorPablo Neira Ayuso <pablo@gnumonks.org>2012-10-15 20:33:32 +0200
committerPablo Neira Ayuso <pablo@netfilter.org>2012-10-15 23:09:36 +0200
commit8c9caa86075c1e63f5729fb7b649977d6b7ef686 (patch)
treed39b368900c0c2cfcff12229832a459a6d6c487b /src
parente472f44dd3ed4d8a371c00f1cb10cc3fea59ff08 (diff)
osmux: rewrite batching function
Rework batching routine to reduce its complexity, updates: * Now it uses a list of lists to store the messages that will be batched. batch list | `-> node SSRC=a ---> ... ---> node SSRC=b | | msg seq=x1 msg seq=y1 | | msg seq=x2 msg seq=y2 | | msg seq=x3 msg seq=y3 | | msg seq=x4 msg seq y4 This keeps easier the creation of the final batch that is sent from that data structure. * We also detect duplicate messages in the batch, ie. messages with the same sequence are skipped. Still pending to resolve reordering, corruption and omissions (reliability is desired).
Diffstat (limited to 'src')
-rw-r--r--src/osmux.c164
1 files changed, 97 insertions, 67 deletions
diff --git a/src/osmux.c b/src/osmux.c
index a428b64..bfac31c 100644
--- a/src/osmux.c
+++ b/src/osmux.c
@@ -110,16 +110,22 @@ int osmux_xfrm_output(struct osmux_hdr *osmuxh, struct osmux_out_handle *h,
INIT_LLIST_HEAD(list);
for (i=0; i<osmuxh->ctr+1; i++) {
+ struct rtp_hdr *rtph;
+ char buf[4096];
+
msg = osmux_rebuild_rtp(h, osmuxh,
osmux_get_payload(osmuxh) +
i * osmo_amr_bytes(osmuxh->amr_cmr),
osmo_amr_bytes(osmuxh->amr_cmr));
if (msg == NULL)
- break;
+ continue;
- LOGP(DLMIB, LOGL_DEBUG, "extracted RTP message from batch "
- "msg=%p\n", msg);
+ rtph = osmo_rtp_get_hdr(msg);
+ if (rtph == NULL)
+ continue;
+ osmo_rtp_snprintf(buf, sizeof(buf), msg);
+ LOGP(DLMIB, LOGL_DEBUG, "extracted: %s\n", buf);
llist_add_tail(&msg->list, list);
}
return i;
@@ -128,7 +134,7 @@ int osmux_xfrm_output(struct osmux_hdr *osmuxh, struct osmux_out_handle *h,
struct osmux_batch {
struct osmo_timer_list timer;
struct osmux_hdr *osmuxh;
- struct llist_head msgb_list;
+ struct llist_head node_list;
unsigned int remaining_bytes;
uint8_t seq;
int64_t ccid[OSMUX_MAX_CONCURRENT_CALLS];
@@ -196,46 +202,55 @@ osmux_xfrm_encode_amr(struct osmux_in_handle *h,
return 0;
}
+struct batch_list_node {
+ struct llist_head head;
+ uint32_t ssrc;
+ struct llist_head list;
+};
+
static struct msgb *osmux_build_batch(struct osmux_in_handle *h)
{
- struct msgb *cur, *tmp, *batch_msg;
- uint32_t last_rtp_ssrc;
- int last_rtp_ssrc_set = 0, add_osmux_hdr = 1;
+ struct msgb *batch_msg;
+ struct batch_list_node *node, *tnode;
struct osmux_batch *batch = (struct osmux_batch *)h->data;
+ LOGP(DLMIB, LOGL_DEBUG, "Now building batch\n");
+
batch_msg = msgb_alloc(OSMUX_BATCH_MAX, "OSMUX");
if (batch_msg == NULL) {
LOGP(DLMIB, LOGL_ERROR, "Not enough memory\n");
return NULL;
}
- LOGP(DLMIB, LOGL_DEBUG, "Now building batch\n");
+ llist_for_each_entry_safe(node, tnode, &batch->node_list, head) {
+ struct msgb *cur, *tmp;
+ int ctr = 0;
- llist_for_each_entry_safe(cur, tmp, &batch->msgb_list, list) {
- struct rtp_hdr *rtph;
- char buf[4096];
+ llist_for_each_entry_safe(cur, tmp, &node->list, list) {
+ struct rtp_hdr *rtph;
+ char buf[4096];
+ int add_osmux_hdr = 0;
- rtph = osmo_rtp_get_hdr(cur);
- if (rtph == NULL)
- return NULL;
+ osmo_rtp_snprintf(buf, sizeof(buf), cur);
+ LOGP(DLMIB, LOGL_DEBUG, "built: %s\n", buf);
- if (last_rtp_ssrc_set) {
- add_osmux_hdr = (last_rtp_ssrc != rtph->ssrc);
- if (add_osmux_hdr)
- LOGP(DLMIB, LOGL_DEBUG, "add osmux header\n");
- }
+ rtph = osmo_rtp_get_hdr(cur);
+ if (rtph == NULL)
+ return NULL;
- osmo_rtp_snprintf(buf, sizeof(buf), cur);
-
- LOGP(DLMIB, LOGL_DEBUG, "%s\n", buf);
-
- osmux_xfrm_encode_amr(h, batch_msg, rtph, cur, add_osmux_hdr);
-
- last_rtp_ssrc_set = 1;
- last_rtp_ssrc = rtph->ssrc;
+ if (ctr == 0) {
+ LOGP(DLMIB, LOGL_DEBUG, "add osmux header\n");
+ add_osmux_hdr = 1;
+ }
- llist_del(&cur->list);
- msgb_free(cur);
+ osmux_xfrm_encode_amr(h, batch_msg, rtph, cur,
+ add_osmux_hdr);
+ llist_del(&cur->list);
+ msgb_free(cur);
+ ctr++;
+ }
+ llist_del(&node->head);
+ talloc_free(node);
}
return batch_msg;
}
@@ -256,7 +271,7 @@ static void osmux_batch_timer_expired(void *data)
{
struct osmux_in_handle *h = data;
- LOGP(DLMIB, LOGL_DEBUG, "received message from stream\n");
+ LOGP(DLMIB, LOGL_DEBUG, "osmux_batch_timer_expired\n");
osmux_xfrm_input_deliver(h);
}
@@ -276,52 +291,67 @@ static int
osmux_msgb_batch_queue_add(struct osmux_batch *batch, struct msgb *msg)
{
struct rtp_hdr *rtph;
- struct msgb *cur;
- int found_matching = 0, found_room = 0, bytes = 0;
+ struct batch_list_node *node;
+ int found = 0, bytes = 0;
rtph = osmo_rtp_get_hdr(msg);
if (rtph == NULL)
- return -1;
+ return 0;
- llist_for_each_entry(cur, &batch->msgb_list, list) {
- struct rtp_hdr *rtph2;
-
- rtph2 = osmo_rtp_get_hdr(cur);
- if (rtph2 == NULL)
- return -1;
-
- if (rtph->ssrc == rtph2->ssrc) {
- found_matching = 1;
- continue;
- }
-
- /* We insert messages in order based on the RTP SSRC. This is
- * useful to build the batch.
- */
- if (rtph->ssrc > rtph2->ssrc) {
- found_room = 1;
+ /* Yes, there is room. Check if we have more message with same ssrc */
+ llist_for_each_entry(node, &batch->node_list, head) {
+ if (node->ssrc == rtph->ssrc) {
+ found = 1;
break;
}
}
+ /* First check if there is room for this message in the batch */
bytes += osmux_rtp_amr_payload_len(msg, rtph);
- if (!found_matching)
+ if (!found)
bytes += sizeof(struct osmux_hdr);
- /* Still room in this batch for this message? if there is not
- * then deliver current batch.
- */
+ /* No room, sorry. You'll have to retry */
if (bytes > batch->remaining_bytes)
return 1;
- batch->remaining_bytes -= bytes;
+ if (found) {
+ struct msgb *cur;
- if (found_room)
- llist_add_tail(&msg->list, &cur->list);
- else
- llist_add_tail(&msg->list, &batch->msgb_list);
+ /* Extra validation: check if this message already exists,
+ * should not happen but make sure we don't propagate
+ * duplicated messages.
+ */
+ llist_for_each_entry(cur, &node->list, list) {
+ struct rtp_hdr *rtph2 = osmo_rtp_get_hdr(cur);
+ if (rtph2 == NULL)
+ return 0;
+
+ /* Already exists message with this sequence, skip */
+ if (rtph2->sequence == rtph->sequence) {
+ LOGP(DLMIB, LOGL_DEBUG, "already exists "
+ "message with seq=%u, skip it\n",
+ rtph->sequence);
+ return 0;
+ }
+ }
+ } else {
+ /* This is the first message with that ssrc we've seen */
+ node = talloc_zero(NULL, struct batch_list_node);
+ if (node == NULL)
+ return 0;
- LOGP(DLMIB, LOGL_DEBUG, "adding to batch (%p)\n", msg);
+ node->ssrc = rtph->ssrc;
+ INIT_LLIST_HEAD(&node->list);
+ llist_add_tail(&node->head, &batch->node_list);
+ }
+
+ LOGP(DLMIB, LOGL_DEBUG, "adding msg with ssrc=%u to batch\n",
+ rtph->ssrc);
+ llist_add_tail(&msg->list, &node->list);
+
+ /* Update remaining room in this batch */
+ batch->remaining_bytes -= bytes;
return 0;
}
@@ -330,9 +360,9 @@ osmux_msgb_batch_queue_add(struct osmux_batch *batch, struct msgb *msg)
* osmux_xfrm_input - add RTP message to OSmux batch
* \param msg: RTP message that you want to batch into one OSmux message
*
- * This function returns -1 on error. If 0 is returned, this indicates
- * that the message has been batched. If 1 is returned, you have to
- * invoke osmux_xfrm_input_deliver and try again.
+ * If 0 is returned, this indicates that the message has been batched or that
+ * an error occured and we have skipped the message. If 1 is returned, you
+ * have to invoke osmux_xfrm_input_deliver and try again.
*/
int osmux_xfrm_input(struct osmux_in_handle *h, struct msgb *msg)
{
@@ -342,7 +372,7 @@ int osmux_xfrm_input(struct osmux_in_handle *h, struct msgb *msg)
rtph = osmo_rtp_get_hdr(msg);
if (rtph == NULL)
- return -1;
+ return 0;
switch(rtph->payload_type) {
case RTP_PT_RTCP:
@@ -351,7 +381,7 @@ int osmux_xfrm_input(struct osmux_in_handle *h, struct msgb *msg)
/* This is the first message in the batch, start the
* batch timer to deliver it.
*/
- if (llist_empty(&batch->msgb_list)) {
+ if (llist_empty(&batch->node_list)) {
LOGP(DLMIB, LOGL_DEBUG,
"osmux start timer batch\n");
@@ -362,7 +392,7 @@ int osmux_xfrm_input(struct osmux_in_handle *h, struct msgb *msg)
break;
default:
/* Only AMR supported so far, sorry. */
- ret = -1;
+ ret = 0;
break;
}
return ret;
@@ -379,7 +409,7 @@ void osmux_xfrm_input_init(struct osmux_in_handle *h)
if (batch == NULL)
return;
- INIT_LLIST_HEAD(&batch->msgb_list);
+ INIT_LLIST_HEAD(&batch->node_list);
batch->remaining_bytes = OSMUX_BATCH_MAX;
batch->timer.cb = osmux_batch_timer_expired;
batch->timer.data = h;