aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorfile <file@f38db490-d61c-443f-a65b-d21fe96a405b>2006-04-11 16:44:10 +0000
committerfile <file@f38db490-d61c-443f-a65b-d21fe96a405b>2006-04-11 16:44:10 +0000
commit0cb2bbad85f6a4cf7a167966f062082f7cf2b7f1 (patch)
treed7ae6e38aed5f1ffd6419f12725e0f351d2a5d18
parent65817479e81e0ce8098055b44c701911abf33663 (diff)
Convert chan_iax2 to use linked lists for multithreading, and add dynamic threads. These are used when all pool threads are in use, and will stick around until load dies down. The theory is that during high load you'll have more threads available, and during low load you'll only have the normal pool threads sticking around.
git-svn-id: http://svn.digium.com/svn/asterisk/trunk@19254 f38db490-d61c-443f-a65b-d21fe96a405b
-rw-r--r--channels/chan_iax2.c277
-rw-r--r--configs/iax.conf.sample2
2 files changed, 202 insertions, 77 deletions
diff --git a/channels/chan_iax2.c b/channels/chan_iax2.c
index 75b3f6693..33152b13e 100644
--- a/channels/chan_iax2.c
+++ b/channels/chan_iax2.c
@@ -92,6 +92,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/devicestate.h"
#include "asterisk/netsock.h"
#include "asterisk/stringfields.h"
+#include "asterisk/linkedlists.h"
#include "iax2.h"
#include "iax2-parser.h"
@@ -134,6 +135,7 @@ static int nochecksums = 0;
#define CALLNO_TO_PTR(a) ((void *)(unsigned long)(a))
#define DEFAULT_THREAD_COUNT 10
+#define DEFAULT_MAX_THREAD_COUNT 100
#define DEFAULT_RETRY_TIME 1000
#define MEMORY_SIZE 100
#define DEFAULT_DROP 3
@@ -444,6 +446,8 @@ static int max_jitter_buffer = MAX_JITTER_BUFFER;
static int min_jitter_buffer = MIN_JITTER_BUFFER;
static int iaxthreadcount = DEFAULT_THREAD_COUNT;
+static int iaxmaxthreadcount = DEFAULT_MAX_THREAD_COUNT;
+static int iaxdynamicthreadcount = 0;
struct iax_rr {
int jitter;
@@ -681,8 +685,12 @@ static int ast_cli_netstats(struct mansession *s, int fd, int limit_fmt);
#define IAX_IOSTATE_PROCESSING 2
#define IAX_IOSTATE_SCHEDREADY 3
+#define IAX_TYPE_POOL 1
+#define IAX_TYPE_DYNAMIC 2
+
struct iax2_thread {
- ASTOBJ_COMPONENTS(struct iax2_thread);
+ AST_LIST_ENTRY(iax2_thread) list;
+ int type;
int iostate;
#ifdef SCHED_MULTITHREADED
void (*schedfunc)(void *);
@@ -704,11 +712,12 @@ struct iax2_thread {
ast_cond_t cond;
};
-struct iax2_thread_list {
- ASTOBJ_CONTAINER_COMPONENTS(struct iax2_thread);
-};
+/* Thread lists */
+static AST_LIST_HEAD_STATIC(idle_list, iax2_thread);
+static AST_LIST_HEAD_STATIC(active_list, iax2_thread);
+static AST_LIST_HEAD_STATIC(dynamic_list, iax2_thread);
-static struct iax2_thread_list idlelist, activelist;
+static void *iax2_process_thread(void *data);
static void signal_condition(ast_mutex_t *lock, ast_cond_t *cond)
{
@@ -831,19 +840,59 @@ static const struct ast_channel_tech iax2_tech = {
static struct iax2_thread *find_idle_thread(void)
{
- struct iax2_thread *thread;
- thread = ASTOBJ_CONTAINER_UNLINK_START(&idlelist);
+ struct iax2_thread *thread = NULL;
+
+ /* Find free idle thread in the list, get a pointer to it, and remove it from the list */
+ AST_LIST_LOCK(&idle_list);
+ thread = AST_LIST_FIRST(&idle_list);
+ if (thread != NULL) {
+ AST_LIST_REMOVE(&idle_list, thread, list);
+ thread->list.next = NULL;
+ }
+ AST_LIST_UNLOCK(&idle_list);
+
+ /* If no idle thread is available from the regular list, try dynamic */
+ if (thread == NULL) {
+ AST_LIST_LOCK(&dynamic_list);
+ thread = AST_LIST_FIRST(&dynamic_list);
+ if (thread != NULL) {
+ AST_LIST_REMOVE(&dynamic_list, thread, list);
+ thread->list.next = NULL;
+ }
+ /* Make sure we absolutely have a thread... if not, try to make one if allowed */
+ if (thread == NULL && iaxmaxthreadcount > iaxdynamicthreadcount) {
+ /* We need to MAKE a thread! */
+ thread = ast_calloc(1, sizeof(*thread));
+ if (thread != NULL) {
+ thread->threadnum = iaxdynamicthreadcount;
+ thread->type = IAX_TYPE_DYNAMIC;
+ ast_mutex_init(&thread->lock);
+ ast_cond_init(&thread->cond, NULL);
+ if (ast_pthread_create(&thread->threadid, NULL, iax2_process_thread, thread)) {
+ free(thread);
+ thread = NULL;
+ } else {
+ /* All went well and the thread is up, so increment our count */
+ iaxdynamicthreadcount++;
+ }
+ }
+ }
+ AST_LIST_UNLOCK(&dynamic_list);
+ }
+
return thread;
}
#ifdef SCHED_MULTITHREADED
static int __schedule_action(void (*func)(void *data), void *data, const char *funcname)
{
- struct iax2_thread *thread;
+ struct iax2_thread *thread = NULL;
static time_t lasterror;
static time_t t;
+
thread = find_idle_thread();
- if (thread) {
+
+ if (thread != NULL) {
thread->schedfunc = func;
thread->scheddata = data;
thread->iostate = IAX_IOSTATE_SCHEDREADY;
@@ -857,6 +906,7 @@ static int __schedule_action(void (*func)(void *data), void *data, const char *f
if (t != lasterror)
ast_log(LOG_NOTICE, "Out of idle IAX2 threads for scheduling!\n");
lasterror = t;
+
return -1;
}
#define schedule_action(func, data) __schedule_action(func, data, __PRETTY_FUNCTION__)
@@ -4419,42 +4469,60 @@ static int __iax2_show_peers(int manager, int fd, struct mansession *s, int argc
static int iax2_show_threads(int fd, int argc, char *argv[])
{
+ struct iax2_thread *thread = NULL;
time_t t;
- int threadcount = 0;
+ int threadcount = 0, dynamiccount = 0;
+ char type;
+
if (argc != 3)
return RESULT_SHOWUSAGE;
ast_cli(fd, "IAX2 Thread Information\n");
time(&t);
ast_cli(fd, "Idle Threads:\n");
+ AST_LIST_LOCK(&idle_list);
+ AST_LIST_TRAVERSE(&idle_list, thread, list) {
#ifdef DEBUG_SCHED_MULTITHREAD
- ASTOBJ_CONTAINER_TRAVERSE(&idlelist, 1, {
- ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d, refcnt=%d, func ='%s'\n",
- iterator->threadnum, iterator->iostate, (int)(t - iterator->checktime), iterator->actions, iterator->refcount, iterator->curfunc);
- threadcount++;
- });
+ ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d, func ='%s'\n",
+ thread->threadnum, thread->iostate, (int)(t - thread->checktime), thread->actions, thread->curfunc);
#else
- ASTOBJ_CONTAINER_TRAVERSE(&idlelist, 1, {
- ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d, refcnt=%d\n",
- iterator->threadnum, iterator->iostate, (int)(t - iterator->checktime), iterator->actions, iterator->refcount);
- threadcount++;
- });
+ ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d\n",
+ thread->threadnum, thread->iostate, (int)(t - thread->checktime), thread->actions);
#endif
+ threadcount++;
+ }
+ AST_LIST_UNLOCK(&idle_list);
ast_cli(fd, "Active Threads:\n");
+ AST_LIST_LOCK(&active_list);
+ AST_LIST_TRAVERSE(&active_list, thread, list) {
+ if (thread->type == IAX_TYPE_DYNAMIC)
+ type = 'D';
+ else
+ type = 'P';
#ifdef DEBUG_SCHED_MULTITHREAD
- ASTOBJ_CONTAINER_TRAVERSE(&activelist, 1, {
- ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d, refcnt=%d, func ='%s'\n",
- iterator->threadnum, iterator->iostate, (int)(t - iterator->checktime), iterator->actions, iterator->refcount, iterator->curfunc);
- threadcount++;
- });
+ ast_cli(fd, "Thread %c%d: state=%d, update=%d, actions=%d, func ='%s'\n",
+ type, thread->threadnum, thread->iostate, (int)(t - thread->checktime), thread->actions, thread->curfunc);
#else
- ASTOBJ_CONTAINER_TRAVERSE(&activelist, 1, {
- ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d, refcnt=%d\n",
- iterator->threadnum, iterator->iostate, (int)(t - iterator->checktime), iterator->actions, iterator->refcount);
+ ast_cli(fd, "Thread %c%d: state=%d, update=%d, actions=%d\n",
+ type, thread->threadnum, thread->iostate, (int)(t - thread->checktime), thread->actions);
+#endif
threadcount++;
- });
+ }
+ AST_LIST_UNLOCK(&active_list);
+ ast_cli(fd, "Dynamic Threads:\n");
+ AST_LIST_LOCK(&dynamic_list);
+ AST_LIST_TRAVERSE(&dynamic_list, thread, list) {
+#ifdef DEBUG_SCHED_MULTITHREAD
+ ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d, func ='%s'\n",
+ thread->threadnum, thread->iostate, (int)(t - thread->checktime), thread->actions, thread->curfunc);
+#else
+ ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d\n",
+ thread->threadnum, thread->iostate, (int)(t - thread->checktime), thread->actions);
#endif
- ast_cli(fd, "%d of %d threads accounted for\n", threadcount, iaxthreadcount);
+ dynamiccount++;
+ }
+ AST_LIST_UNLOCK(&dynamic_list);
+ ast_cli(fd, "%d of %d threads accounted for with %d dynamic threads\n", threadcount, iaxthreadcount, dynamiccount);
return RESULT_SUCCESS;
}
@@ -6512,11 +6580,15 @@ static int socket_read(int *id, int fd, short events, void *cbdata)
if (errno != ECONNREFUSED)
ast_log(LOG_WARNING, "Error: %s\n", strerror(errno));
handle_error();
- ASTOBJ_CONTAINER_LINK_END(&idlelist, thread);
+ AST_LIST_LOCK(&idle_list);
+ AST_LIST_INSERT_TAIL(&idle_list, thread, list);
+ AST_LIST_UNLOCK(&idle_list);
return 1;
}
if (test_losspct && ((100.0 * ast_random() / (RAND_MAX + 1.0)) < test_losspct)) { /* simulate random loss condition */
- ASTOBJ_CONTAINER_LINK_END(&idlelist, thread);
+ AST_LIST_LOCK(&idle_list);
+ AST_LIST_INSERT_TAIL(&idle_list, thread, list);
+ AST_LIST_UNLOCK(&idle_list);
return 1;
}
/* Mark as ready and send on its way */
@@ -7891,34 +7963,44 @@ retryowner2:
return 1;
}
-static void destroy_helper(struct iax2_thread *thread)
-{
- ast_log(LOG_DEBUG, "Destroying helper %d!\n", thread->threadnum);
- ast_mutex_destroy(&thread->lock);
- ast_cond_destroy(&thread->cond);
- free(thread);
-}
-
static void *iax2_process_thread(void *data)
{
- struct iax2_thread *thread_copy, *thread = data;
+ struct iax2_thread *thread = data;
+ struct timeval tv;
+ struct timespec ts;
for(;;) {
/* Wait for something to signal us to be awake */
ast_mutex_lock(&thread->lock);
- ast_cond_wait(&thread->cond, &thread->lock);
+ if (thread->type == IAX_TYPE_DYNAMIC) {
+ /* Wait to be signalled or time out */
+ tv = ast_tvadd(ast_tvnow(), ast_samp2tv(30000, 1000));
+ ts.tv_sec = tv.tv_sec;
+ ts.tv_nsec = tv.tv_usec * 1000;
+ if (ast_cond_timedwait(&thread->cond, &thread->lock, &ts) == ETIMEDOUT) {
+ ast_mutex_unlock(&thread->lock);
+ AST_LIST_LOCK(&dynamic_list);
+ AST_LIST_REMOVE(&dynamic_list, thread, list);
+ iaxdynamicthreadcount--;
+ AST_LIST_UNLOCK(&dynamic_list);
+ break;
+ }
+ } else {
+ ast_cond_wait(&thread->cond, &thread->lock);
+ }
ast_mutex_unlock(&thread->lock);
- /* Unlink from idlelist / activelist if there*/
- ASTOBJ_CONTAINER_UNLINK(&idlelist, thread);
- ASTOBJ_CONTAINER_UNLINK(&activelist, thread);
- /* If instructed to halt, stop now */
+
+ /* If we were signalled, then we are already out of both lists or we are shutting down */
if (thread->halt) {
- ast_log(LOG_DEBUG, "Halting, refcount = %d\n", thread->refcount);
- ASTOBJ_UNREF(thread, destroy_helper);
break;
}
- /* Remove our reference */
- ASTOBJ_CONTAINER_LINK_END(&activelist, thread);
+
+ /* Add ourselves to the active list now */
+ AST_LIST_LOCK(&active_list);
+ AST_LIST_INSERT_HEAD(&active_list, thread, list);
+ AST_LIST_UNLOCK(&active_list);
+
+ /* See what we need to do */
switch(thread->iostate) {
case IAX_IOSTATE_READY:
thread->actions++;
@@ -7938,16 +8020,31 @@ static void *iax2_process_thread(void *data)
#ifdef DEBUG_SCHED_MULTITHREAD
thread->curfunc[0]='\0';
#endif
- ASTOBJ_CONTAINER_UNLINK(&activelist, thread);
- ASTOBJ_CONTAINER_LINK_END(&idlelist, thread);
- /* Make a copy so we don't lose thread, but if
- we become unreferenced here, our thread gets
- cancelled anyway, so it's okay */
- thread_copy = thread;
- ASTOBJ_UNREF(thread_copy, destroy_helper);
- thread_copy = thread;
- ASTOBJ_UNREF(thread_copy, destroy_helper);
+
+ /* Now... remove ourselves from the active list, and return to the idle list */
+ AST_LIST_LOCK(&active_list);
+ AST_LIST_REMOVE(&active_list, thread, list);
+ thread->list.next = NULL;
+ AST_LIST_UNLOCK(&active_list);
+
+ /* Go back into our respective list */
+ if (thread->type == IAX_TYPE_DYNAMIC) {
+ AST_LIST_LOCK(&dynamic_list);
+ AST_LIST_INSERT_TAIL(&dynamic_list, thread, list);
+ AST_LIST_UNLOCK(&dynamic_list);
+ } else {
+ AST_LIST_LOCK(&idle_list);
+ AST_LIST_INSERT_TAIL(&idle_list, thread, list);
+ AST_LIST_UNLOCK(&idle_list);
+ }
}
+
+ /* Free our own memory */
+ ast_mutex_destroy(&thread->lock);
+ ast_cond_destroy(&thread->cond);
+ free(thread);
+ thread = NULL;
+
return NULL;
}
@@ -8352,12 +8449,10 @@ static int start_network_thread(void)
{
int threadcount = 0;
int x;
- ASTOBJ_CONTAINER_INIT(&idlelist);
- ASTOBJ_CONTAINER_INIT(&activelist);
for (x = 0; x < iaxthreadcount; x++) {
struct iax2_thread *thread = ast_calloc(1, sizeof(struct iax2_thread));
if (thread) {
- ASTOBJ_INIT(thread);
+ thread->type = IAX_TYPE_POOL;
thread->threadnum = ++threadcount;
ast_mutex_init(&thread->lock);
ast_cond_init(&thread->cond, NULL);
@@ -8366,8 +8461,9 @@ static int start_network_thread(void)
free(thread);
thread = NULL;
}
- ASTOBJ_CONTAINER_LINK_END(&idlelist, thread);
- ASTOBJ_UNREF(thread, destroy_helper);
+ AST_LIST_LOCK(&idle_list);
+ AST_LIST_INSERT_TAIL(&idle_list, thread, list);
+ AST_LIST_UNLOCK(&idle_list);
}
}
ast_pthread_create(&schedthreadid, NULL, sched_thread, NULL);
@@ -9020,6 +9116,21 @@ static int set_config(char *config_file, int reload)
iaxthreadcount = 256;
}
}
+ } else if (!strcasecmp(v->name, "iaxmaxthreadcount")) {
+ if (reload) {
+ AST_LIST_LOCK(&dynamic_list);
+ iaxmaxthreadcount = atoi(v->value);
+ AST_LIST_UNLOCK(&dynamic_list);
+ } else {
+ iaxmaxthreadcount = atoi(v->value);
+ if (iaxmaxthreadcount < 0) {
+ ast_log(LOG_NOTICE, "iaxmaxthreadcount must be at least 0.\n");
+ iaxmaxthreadcount = 0;
+ } else if (iaxmaxthreadcount > 256) {
+ ast_log(LOG_NOTICE, "Limiting iaxmaxthreadcount to 256\n");
+ iaxmaxthreadcount = 256;
+ }
+ }
} else if (!strcasecmp(v->name, "nochecksums")) {
#ifdef SO_NO_CHECK
if (ast_true(v->value))
@@ -9891,7 +10002,9 @@ static struct ast_cli_entry iax2_cli[] = {
static int __unload_module(void)
{
+ struct iax2_thread *thread = NULL;
int x;
+
/* Cancel the network thread, close the net socket */
if (netthreadid != AST_PTHREADT_NULL) {
pthread_cancel(netthreadid);
@@ -9905,19 +10018,29 @@ static int __unload_module(void)
ast_mutex_unlock(&sched_lock);
pthread_join(schedthreadid, NULL);
}
- while (idlelist.head || activelist.head) {
- ASTOBJ_CONTAINER_TRAVERSE(&idlelist, 1, {
- iterator->halt = 1;
- signal_condition(&iterator->lock, &iterator->cond);
- });
- ASTOBJ_CONTAINER_TRAVERSE(&activelist, 1, {
- iterator->halt = 1;
- signal_condition(&iterator->lock, &iterator->cond);
- });
- usleep(100000);
- }
- ASTOBJ_CONTAINER_DESTROY(&idlelist);
- ASTOBJ_CONTAINER_DESTROY(&activelist);
+
+ /* Call for all threads to halt */
+ AST_LIST_LOCK(&idle_list);
+ AST_LIST_TRAVERSE(&idle_list, thread, list) {
+ thread->halt = 1;
+ signal_condition(&thread->lock, &thread->cond);
+ }
+ AST_LIST_UNLOCK(&idle_list);
+
+ AST_LIST_LOCK(&active_list);
+ AST_LIST_TRAVERSE(&active_list, thread, list) {
+ thread->halt = 1;
+ signal_condition(&thread->lock, &thread->cond);
+ }
+ AST_LIST_UNLOCK(&active_list);
+
+ AST_LIST_LOCK(&dynamic_list);
+ AST_LIST_TRAVERSE(&dynamic_list, thread, list) {
+ thread->halt = 1;
+ signal_condition(&thread->lock, &thread->cond);
+ }
+ AST_LIST_UNLOCK(&dynamic_list);
+
ast_netsock_release(netsock);
for (x=0;x<IAX_MAX_CALLS;x++)
if (iaxs[x])
diff --git a/configs/iax.conf.sample b/configs/iax.conf.sample
index 2c5e030a9..3eeb760e5 100644
--- a/configs/iax.conf.sample
+++ b/configs/iax.conf.sample
@@ -170,6 +170,8 @@ forcejitterbuffer=no
; IAX helper threads
; Establishes the number of iax helper threads to handle I/O.
; iaxthreadcount = 10
+; Establishes the number of extra dynamic threads that may be spawned to handle I/O
+; iaxmaxthreadcount = 100
;
; We can register with another IAX server to let him know where we are
; in case we have a dynamic IP address for example