aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorPau Espin Pedrol <pespin@sysmocom.de>2017-04-12 19:36:47 +0200
committerPau Espin Pedrol <pespin@sysmocom.de>2018-04-13 15:51:43 +0200
commitce820763c1c527856e8f75719b958de77d8c74aa (patch)
tree3c4f88488e3c0da6a54119ba1d475bdd3c5d2a85 /src
parent5c962c2f3eb1c4e1497edf0f540006c76deb06fd (diff)
jibuf: Add initial implementation of Jitter Buffer
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am1
-rw-r--r--src/jibuf.c381
2 files changed, 382 insertions, 0 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 81a55b4..79e3685 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -18,6 +18,7 @@ libosmonetif_la_SOURCES = amr.c \
datagram.c \
ipa.c \
ipa_unit.c \
+ jibuf.c \
osmux.c \
rs232.c \
rtp.c \
diff --git a/src/jibuf.c b/src/jibuf.c
new file mode 100644
index 0000000..c3d6bad
--- /dev/null
+++ b/src/jibuf.c
@@ -0,0 +1,381 @@
+/* (C) 2017 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
+ *
+ * Author: Pau Espin Pedrol <pespin@sysmocom.de>
+ *
+ * SPDX-License-Identifier: GPL-2.0+
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <inttypes.h>
+
+#include <osmocom/core/msgb.h>
+#include <osmocom/core/timer.h>
+#include <osmocom/core/talloc.h>
+#include <osmocom/core/logging.h>
+#include <osmocom/core/utils.h>
+
+#include <osmocom/netif/amr.h>
+#include <osmocom/netif/rtp.h>
+#include <osmocom/netif/jibuf.h>
+
+#include <arpa/inet.h>
+
+/*! \addtogroup jibuf Osmocom Jitter Buffer
+ * @{
+ */
+
+/*! \file jibuf.c
+ * \brief Osmocom Jitter Buffer helpers
+ */
+
+/* Sampling rate (in Hz) */
+/* TODO: SAMPLE RATE can be guessed from rtp.p_type */
+#define SAMPLE_RATE 8000
+
+/* TUNABLE PARAMETERS: */
+
+/* default {min,max}_delay values if set_{min,max}_delay() is never called */
+#define JIBUF_DEFAULT_MIN_DELAY_MS 60
+#define JIBUF_DEFAULT_MAX_DELAY_MS 200
+
+/* How frequently (num of input packets) do we reselect a new reference? */
+#define JIBUF_REFERENCE_TS_FREQ 60
+
+/* How frequently (num of input packets) do we check if we should adapt the
+ * buffer size (threshold_delay) ? */
+#define JIBUF_BUFFER_RECALC_FREQ 40
+/* How many pkts should be dropped at max every JIBUF_BUFFER_RECALC_FREQ input
+ * pkts? */
+#define JIBUF_ALLOWED_PKT_DROP 3
+/* How many consecutive pkts can be dropped before triggering a buffer size incr ? */
+#define JIBUF_ALLOWED_PKT_CONSECUTIVE_DROP 1
+/* How much do we incr/decr the buffer size every time we recalculate it? */
+#define JIBUF_BUFFER_INC_STEP 20
+#define JIBUF_BUFFER_DEC_STEP 5
+
+struct osmo_jibuf_msgb_cb {
+ struct timeval ts;
+ unsigned long *old_cb;
+};
+
+#define JIBUF_MSGB_CB(__msgb) ((struct osmo_jibuf_msgb_cb *)&((__msgb)->cb[0]))
+
+static void ms2timeval(struct timeval *ts, uint32_t ms)
+{
+ ts->tv_sec = ms / 1000;
+ ts->tv_usec = (ms % 1000) * 1000;
+}
+
+static uint32_t timeval2ms(const struct timeval *ts)
+{
+ return ts->tv_sec * 1000 + ts->tv_usec / 1000;
+}
+
+static int clock_gettime_timeval(clockid_t clk_id, struct timeval *tp)
+{
+ struct timespec now;
+ int n;
+
+ n = osmo_clock_gettime(clk_id, &now);
+ tp->tv_sec = now.tv_sec;
+ tp->tv_usec = now.tv_nsec / 1000;
+
+ return n;
+}
+
+static struct timeval *msgb_scheduled_ts(const struct msgb *msg)
+{
+ struct osmo_jibuf_msgb_cb *jbcb = JIBUF_MSGB_CB(msg);
+ return &jbcb->ts;
+}
+
+/* Add msgb to the list sorted by its scheduled output ts */
+static void llist_add_sorted(struct msgb *msg, struct llist_head *msg_list)
+{
+ struct msgb *cur;
+ struct timeval *msg_ts = msgb_scheduled_ts(msg);
+
+ /* TODO: not sure if I need to use _safe here */
+ llist_for_each_entry(cur, msg_list, list) {
+ struct timeval *cur_ts = msgb_scheduled_ts(cur);
+ if (timercmp(msg_ts, cur_ts, <)) {
+ __llist_add(&msg->list, cur->list.prev, &cur->list);
+ return;
+ }
+ }
+
+ /* we reached the end, add to the tail: */
+ llist_add_tail(&msg->list, msg_list);
+
+}
+
+static uint16_t msg_get_sequence(struct msgb *msg)
+{
+ struct rtp_hdr *rtph = osmo_rtp_get_hdr(msg);
+ return ntohs(rtph->sequence);
+}
+
+static uint32_t msg_get_timestamp(struct msgb *msg)
+{
+ struct rtp_hdr *rtph = osmo_rtp_get_hdr(msg);
+ return ntohl(rtph->timestamp);
+}
+
+static int32_t samples2ms(int32_t samples)
+{
+ /* XXX: SAMPLE RATE can be guessed from rtp.p_type */
+ return samples * 1000 / SAMPLE_RATE;
+}
+
+/* Calculates pkt delay related to reference pkt. Similar concept to D(i,j) as
+ * defined in RFC3550 (RTP). */
+static int calc_pkt_rel_delay(struct osmo_jibuf *jb, struct msgb *msg)
+{
+ uint32_t current_rx_ts = timeval2ms(&jb->last_enqueue_time);
+ uint32_t current_tx_ts = msg_get_timestamp(msg);
+
+ return samples2ms((current_tx_ts - jb->ref_tx_ts)) - (current_rx_ts - jb->ref_rx_ts);
+}
+
+static void msg_set_as_reference(struct osmo_jibuf *jb, struct msgb *msg)
+{
+ jb->ref_rx_ts = timeval2ms(&jb->last_enqueue_time);
+ jb->ref_tx_ts = msg_get_timestamp(msg);
+ jb->ref_tx_seq = msg_get_sequence(msg);
+
+ LOGP(DLJIBUF, LOGL_DEBUG, "New reference (seq=%"PRIu16" rx=%"PRIu32 \
+ " tx=%"PRIu32")\n", jb->ref_tx_seq, jb->ref_rx_ts, jb->ref_tx_ts);
+}
+
+static void dequeue_msg(struct osmo_jibuf *jb, struct msgb *msg)
+{
+ unsigned long *old_cb = JIBUF_MSGB_CB(msg)->old_cb;
+ memcpy(msg->cb, old_cb, sizeof(msg->cb));
+ talloc_free(old_cb);
+ llist_del(&msg->list);
+
+ jb->dequeue_cb(msg, jb->dequeue_cb_data);
+}
+
+static void timer_expired(void *data)
+{
+ struct osmo_jibuf *jb = (struct osmo_jibuf*) data;
+ struct timeval delay_ts, now;
+ struct msgb *msg, *next;
+
+ llist_for_each_entry_safe(msg, next, &jb->msg_list, list) {
+ struct timeval *msg_ts = msgb_scheduled_ts(msg);
+ clock_gettime_timeval(CLOCK_MONOTONIC, &now);
+ if (timercmp(msg_ts, &now, >)) {
+ jb->next_dequeue_time = *msg_ts;
+ timersub(msg_ts, &now, &delay_ts);
+ osmo_timer_schedule(&jb->timer,
+ delay_ts.tv_sec, delay_ts.tv_usec);
+ return;
+ }
+
+ dequeue_msg(jb, msg);
+ }
+
+ /* XXX: maybe try to tune the threshold based on the calculated output jitter? */
+ /* XXX: try to find holes in the list and create fake pkts to improve the
+ jitter when packets do not arrive on time */
+}
+
+static void recalc_threshold_delay(struct osmo_jibuf *jb)
+{
+
+ /* Recalculate every JIBUF_RECALC_FREQ_PKTS handled packets, or if we have too
+ many consecutive drops */
+ uint32_t sum_pkts = jb->stats.total_enqueued + jb->stats.total_dropped +
+ jb->last_dropped;
+
+ if (jb->consecutive_drops <= JIBUF_ALLOWED_PKT_CONSECUTIVE_DROP &&
+ sum_pkts % JIBUF_BUFFER_RECALC_FREQ != 0)
+ return;
+
+ if (jb->consecutive_drops > JIBUF_ALLOWED_PKT_CONSECUTIVE_DROP ||
+ jb->last_dropped > JIBUF_ALLOWED_PKT_DROP)
+ jb->threshold_delay = OSMO_MIN(
+ jb->threshold_delay + JIBUF_BUFFER_INC_STEP,
+ jb->max_delay);
+ else
+ jb->threshold_delay = OSMO_MAX(
+ jb->threshold_delay - JIBUF_BUFFER_DEC_STEP,
+ jb->min_delay);
+ LOGP(DLJIBUF, LOGL_DEBUG, "New threshold: %u ms (freq=%d dropped=%d/%d consecutive=%d/%d)\n",
+ jb->threshold_delay, JIBUF_BUFFER_RECALC_FREQ,
+ jb->last_dropped, JIBUF_ALLOWED_PKT_DROP,
+ jb->consecutive_drops, JIBUF_ALLOWED_PKT_CONSECUTIVE_DROP);
+
+ jb->stats.total_dropped += jb->last_dropped;
+ jb->last_dropped = 0;
+
+}
+
+//----------------------------------
+
+/*! \brief Allocate a new jitter buffer instance
+ * \return the new allocated instance
+ */
+struct osmo_jibuf *osmo_jibuf_alloc(void *talloc_ctx)
+{
+ struct osmo_jibuf *jb;
+ jb = talloc_zero(talloc_ctx, struct osmo_jibuf);
+
+ jb->min_delay = JIBUF_DEFAULT_MIN_DELAY_MS;
+ jb->max_delay = JIBUF_DEFAULT_MAX_DELAY_MS;
+ jb->threshold_delay = jb->min_delay;
+
+ INIT_LLIST_HEAD(&jb->msg_list);
+
+ jb->timer.cb = timer_expired;
+ jb->timer.data = jb;
+
+ return jb;
+}
+
+/*! \brief Destroy a previously allocated jitter buffer instance
+ * \param[in] jb Previously allocated (non-null) jitter buffer instance
+ *
+ * All the queued packets are dequeued before deleting the instance.
+ */
+void osmo_jibuf_delete(struct osmo_jibuf *jb)
+{
+ struct msgb *msg, *tmp;
+ osmo_timer_del(&jb->timer);
+ llist_for_each_entry_safe(msg, tmp, &jb->msg_list, list)
+ dequeue_msg(jb, msg);
+
+ talloc_free(jb);
+}
+
+/*! \brief Try to enqueue a packet into the jitter buffer
+ * \param[in] jb jitter buffer instance
+ * \param[in] msg msgb to enqueue, containing an RTP packet
+ * \return <0 if the packet was dropped, 0 otherwise
+ *
+ * This function calculates the delay for the enqueued packet. If the delay is
+ * bigger than the current buffer size, the function returns -1 and the caller
+ * owns the packet again and can free it if required. If the packet is enqueued,
+ * 0 is returned and the exact same packet (ownership transfer, no copy is made)
+ * will be available again through the dequeue_cb() when the queue timer for
+ * this packet expires.
+ */
+int osmo_jibuf_enqueue(struct osmo_jibuf *jb, struct msgb *msg)
+{
+ int rel_delay, delay;
+ struct timeval delay_ts, sched_ts;
+
+ clock_gettime_timeval(CLOCK_MONOTONIC, &jb->last_enqueue_time);
+
+ if (!jb->started) {
+ jb->started = true;
+ msg_set_as_reference(jb, msg);
+ rel_delay = 0;
+ } else {
+ rel_delay = calc_pkt_rel_delay(jb, msg);
+ }
+
+ /* Avoid time skew with sender (or drop-everything state),
+ reselect a new reference from time to time */
+ //if ((int)(msg_get_sequence(msg) - jb->ref_tx_seq) > JIBUF_REFERENCE_TS_FREQ)
+ // msg_set_as_reference(jb, msg);
+
+ delay = jb->threshold_delay + rel_delay;
+
+ /* packet too late, let's drop it and incr buffer size if encouraged */
+ if (delay < 0) {
+ jb->last_dropped++;
+ jb->consecutive_drops++;
+
+ LOGP(DLJIBUF, LOGL_DEBUG, "dropped %u > %u (seq=%"PRIu16" ts=%"PRIu32")\n",
+ rel_delay, jb->threshold_delay, msg_get_sequence(msg),
+ msg_get_timestamp(msg));
+
+ recalc_threshold_delay(jb);
+ return -1;
+ } else {
+ jb->consecutive_drops = 0;
+ jb->stats.total_enqueued++;
+ }
+
+ ms2timeval(&delay_ts, (uint32_t) delay);
+ timeradd(&jb->last_enqueue_time, &delay_ts, &sched_ts);
+
+ LOGP(DLJIBUF, LOGL_DEBUG, "enqueuing packet seq=%"PRIu16" rel=%d delay=%d" \
+ " thres=%d {%lu.%06lu -> %lu.%06lu}\n",
+ msg_get_sequence(msg), rel_delay, delay, jb->threshold_delay,
+ jb->last_enqueue_time.tv_sec, jb->last_enqueue_time.tv_usec,
+ sched_ts.tv_sec, sched_ts.tv_usec);
+
+ /* Add scheduled dequeue time in msg->cb so we can check it later */
+ unsigned long *old_cb = talloc_memdup(jb->talloc_ctx, msg->cb, sizeof(msg->cb));
+ struct osmo_jibuf_msgb_cb *jbcb = JIBUF_MSGB_CB(msg);
+ jbcb->ts = sched_ts;
+ jbcb->old_cb = old_cb;
+
+ llist_add_sorted(msg, &jb->msg_list);
+
+
+ /* See if updating the timer is needed: */
+ if (!osmo_timer_pending(&jb->timer) ||
+ timercmp(&sched_ts, &jb->next_dequeue_time, <)) {
+ jb->next_dequeue_time = sched_ts;
+ osmo_timer_schedule(&jb->timer, 0, delay * 1000);
+ }
+
+ /* Let's check packet loss stats to see if buffer_size must be changed */
+ recalc_threshold_delay(jb);
+
+ return 0;
+}
+
+/*! \brief Check whether the jitter buffer instance has packets queued or not.
+ * \param[in] jb jitter buffer instance
+ * \return true if the queue is empty, false otherwise.
+ */
+bool osmo_jibuf_empty(struct osmo_jibuf *jb)
+{
+ return llist_empty(&jb->msg_list);
+}
+
+/*! \brief Set minimum buffer size for the jitter buffer
+ * \param[in] jb jitter buffer instance
+ * \param[in] min_delay Minimum buffer size, as in minimum delay in milliseconds
+ */
+void osmo_jibuf_set_min_delay(struct osmo_jibuf *jb, uint32_t min_delay)
+{
+ jb->min_delay = min_delay ? min_delay : JIBUF_DEFAULT_MIN_DELAY_MS;
+ jb->threshold_delay = OSMO_MAX(jb->min_delay, jb->threshold_delay);
+}
+
+/*! \brief Set maximum buffer size for the jitter buffer
+ * \param[in] jb jitter buffer instance
+ * \param[in] max_delay Maximum buffer size, as in maximum delay in milliseconds
+ */
+void osmo_jibuf_set_max_delay(struct osmo_jibuf *jb, uint32_t max_delay)
+{
+ jb->max_delay = max_delay ? max_delay : JIBUF_DEFAULT_MAX_DELAY_MS;
+ jb->threshold_delay = OSMO_MIN(jb->max_delay, jb->threshold_delay);
+}
+
+/*! \brief Set dequeue callback for the jitter buffer
+ * \param[in] jb jitter buffer instance
+ * \param[in] dequeue_cb function pointer to call back when the dequeue timer for a given packet expires
+ * \param[in] cb_data data pointer to be passed to dequeue_cb together with the msgb.
+ */
+void osmo_jibuf_set_dequeue_cb(struct osmo_jibuf *jb, osmo_jibuf_dequeue_cb
+ dequeue_cb, void* cb_data)
+{
+ jb->dequeue_cb = dequeue_cb;
+ jb->dequeue_cb_data = cb_data;
+}
+
+/*! @} */