From 300bccd8d54604627607c0a2431694a21437cffb Mon Sep 17 00:00:00 2001 From: russell Date: Tue, 31 Mar 2009 19:07:58 +0000 Subject: Improve performance of the code handling the frame queue in chan_iax2. In my tests that exercised full frame handling in chan_iax2, the version with these changes took 30% to 40% of the CPU time compared to the same test of Asterisk trunk before these modifications. While doing some profiling for , one function that caught my eye was network_thread() in chan_iax2.c. After the things that I was working on there, it was the next target for analysis and optimization. I used oprofile's source annotation functionality and found that the loop traversing the frame queue in network_thread() was to blame for the excessive CPU cycle consumption. The frame_queue in chan_iax2 previously held all frames that either were pending transmission or had been transmitted and are still pending acknowledgment. In network_thread(), the previous code would go back through the main for loop after reading a single incoming frame or after being signaled because a frame had been queued up for initial transmission. In each iteration of the loop, it traverses the entire frame queue looking for frames that need to be transmitted. On a busy server, this could easily be quite a few entries. This patch is actually quite simple. The frame_queue has become only a list of frames pending acknowledgment. Frames that need to be transmitted are queued up to a dedicated transmit thread via the taskprocessor API. As a result, the code in network_thread() becomes much simpler, as its only job is to read incoming frames. In addition to the previously described changes, this patch includes some additional changes to the frame_queue. Instead of one big frame_queue, now there is a list per call number to further reduce wasted list traversals. The biggest impact of this change is in socket_process(). For additional details on testing and test results, see the review request. Review: http://reviewboard.digium.com/r/212/ git-svn-id: http://svn.digium.com/svn/asterisk/trunk@185432 f38db490-d61c-443f-a65b-d21fe96a405b --- channels/chan_iax2.c | 220 ++++++++++++++++++++++----------------------------- 1 file changed, 96 insertions(+), 124 deletions(-) (limited to 'channels/chan_iax2.c') diff --git a/channels/chan_iax2.c b/channels/chan_iax2.c index aa7d24e42..47f1ddc84 100644 --- a/channels/chan_iax2.c +++ b/channels/chan_iax2.c @@ -88,6 +88,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/event.h" #include "asterisk/astobj2.h" #include "asterisk/timing.h" +#include "asterisk/taskprocessor.h" #include "iax2.h" #include "iax2-parser.h" @@ -758,8 +759,12 @@ struct chan_iax2_pvt { * \note The contents of this list do not need to be explicitly destroyed * on module unload. This is because all active calls are destroyed, and * all frames in this queue will get destroyed as a part of that process. + * + * \note Contents protected by the iaxsl[] locks */ -static AST_LIST_HEAD_STATIC(frame_queue, iax_frame); +static AST_LIST_HEAD_NOLOCK(, iax_frame) frame_queue[IAX_MAX_CALLS]; + +static struct ast_taskprocessor *transmit_processor; /*! * This module will get much higher performance when doing a lot of @@ -1588,20 +1593,18 @@ static void pvt_destructor(void *obj) struct iax_frame *cur = NULL; ast_mutex_lock(&iaxsl[pvt->callno]); + iax2_destroy_helper(pvt); - ast_mutex_unlock(&iaxsl[pvt->callno]); /* Already gone */ - ast_set_flag(pvt, IAX_ALREADYGONE); + ast_set_flag(pvt, IAX_ALREADYGONE); - AST_LIST_LOCK(&frame_queue); - AST_LIST_TRAVERSE(&frame_queue, cur, list) { + AST_LIST_TRAVERSE(&frame_queue[pvt->callno], cur, list) { /* Cancel any pending transmissions */ - if (cur->callno == pvt->callno) { - cur->retries = -1; - } + cur->retries = -1; } - AST_LIST_UNLOCK(&frame_queue); + + ast_mutex_unlock(&iaxsl[pvt->callno]); if (pvt->reg) { pvt->reg->callno = 0; @@ -2623,17 +2626,16 @@ static void __attempt_transmit(const void *data) f->retries = -1; freeme = 1; } - if (callno) - ast_mutex_unlock(&iaxsl[callno]); - /* Do not try again */ + if (freeme) { /* Don't attempt delivery, just remove it from the queue */ - AST_LIST_LOCK(&frame_queue); - AST_LIST_REMOVE(&frame_queue, f, list); - AST_LIST_UNLOCK(&frame_queue); + AST_LIST_REMOVE(&frame_queue[callno], f, list); + ast_mutex_unlock(&iaxsl[callno]); f->retrans = -1; /* Free the IAX frame */ iax2_frame_free(f); + } else if (callno) { + ast_mutex_unlock(&iaxsl[callno]); } } @@ -2923,7 +2925,7 @@ static char *complete_iax2_peers(const char *line, const char *word, int pos, in static char *handle_cli_iax2_show_stats(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) { struct iax_frame *cur; - int cnt = 0, dead = 0, final = 0; + int cnt = 0, dead = 0, final = 0, i = 0; switch (cmd) { case CLI_INIT: @@ -2939,15 +2941,17 @@ static char *handle_cli_iax2_show_stats(struct ast_cli_entry *e, int cmd, struct if (a->argc != 3) return CLI_SHOWUSAGE; - AST_LIST_LOCK(&frame_queue); - AST_LIST_TRAVERSE(&frame_queue, cur, list) { - if (cur->retries < 0) - dead++; - if (cur->final) - final++; - cnt++; + for (i = 0; i < ARRAY_LEN(frame_queue); i++) { + ast_mutex_lock(&iaxsl[i]); + AST_LIST_TRAVERSE(&frame_queue[i], cur, list) { + if (cur->retries < 0) + dead++; + if (cur->final) + final++; + cnt++; + } + ast_mutex_unlock(&iaxsl[i]); } - AST_LIST_UNLOCK(&frame_queue); ast_cli(a->fd, " IAX Statistics\n"); ast_cli(a->fd, "---------------------\n"); @@ -3304,23 +3308,39 @@ static int schedule_delivery(struct iax_frame *fr, int updatehistory, int fromtr return 0; } -static int iax2_transmit(struct iax_frame *fr) +static int transmit_frame(void *data) { - /* Lock the queue and place this packet at the end */ - /* By setting this to 0, the network thread will send it for us, and - queue retransmission if necessary */ - fr->sentyet = 0; - AST_LIST_LOCK(&frame_queue); - AST_LIST_INSERT_TAIL(&frame_queue, fr, list); - AST_LIST_UNLOCK(&frame_queue); - /* Wake up the network and scheduler thread */ - if (netthreadid != AST_PTHREADT_NULL) - pthread_kill(netthreadid, SIGURG); - ast_sched_thread_poke(sched); + struct iax_frame *fr = data; + + ast_mutex_lock(&iaxsl[fr->callno]); + + fr->sentyet = 1; + + if (iaxs[fr->callno]) { + send_packet(fr); + } + + if (fr->retries < 0) { + ast_mutex_unlock(&iaxsl[fr->callno]); + /* No retransmit requested */ + iax_frame_free(fr); + } else { + /* We need reliable delivery. Schedule a retransmission */ + AST_LIST_INSERT_TAIL(&frame_queue[fr->callno], fr, list); + ast_mutex_unlock(&iaxsl[fr->callno]); + fr->retries++; + fr->retrans = iax2_sched_add(sched, fr->retrytime, attempt_transmit, fr); + } + return 0; } +static int iax2_transmit(struct iax_frame *fr) +{ + fr->sentyet = 0; + return ast_taskprocessor_push(transmit_processor, transmit_frame, fr); +} static int iax2_digit_begin(struct ast_channel *c, char digit) { @@ -7015,16 +7035,13 @@ static int complete_transfer(int callno, struct iax_ies *ies) pvt->lastsent = 0; pvt->nextpred = 0; pvt->pingtime = DEFAULT_RETRY_TIME; - AST_LIST_LOCK(&frame_queue); - AST_LIST_TRAVERSE(&frame_queue, cur, list) { + AST_LIST_TRAVERSE(&frame_queue[callno], cur, list) { /* We must cancel any packets that would have been transmitted because now we're talking to someone new. It's okay, they were transmitted to someone that didn't care anyway. */ - if (callno == cur->callno) - cur->retries = -1; + cur->retries = -1; } - AST_LIST_UNLOCK(&frame_queue); - return 0; + return 0; } /*! \brief Acknowledgment received for OUR registration */ @@ -7627,16 +7644,13 @@ static void vnak_retransmit(int callno, int last) { struct iax_frame *f; - AST_LIST_LOCK(&frame_queue); - AST_LIST_TRAVERSE(&frame_queue, f, list) { + AST_LIST_TRAVERSE(&frame_queue[callno], f, list) { /* Send a copy immediately */ - if ((f->callno == callno) && iaxs[f->callno] && - ((unsigned char ) (f->oseqno - last) < 128) && - (f->retries >= 0)) { + if (((unsigned char) (f->oseqno - last) < 128) && + (f->retries >= 0)) { send_packet(f); } } - AST_LIST_UNLOCK(&frame_queue); } static void __iax2_poke_peer_s(const void *data) @@ -8653,17 +8667,15 @@ static int socket_process(struct iax2_thread *thread) if (iaxdebug) ast_debug(1, "Cancelling transmission of packet %d\n", x); call_to_destroy = 0; - AST_LIST_LOCK(&frame_queue); - AST_LIST_TRAVERSE(&frame_queue, cur, list) { + AST_LIST_TRAVERSE(&frame_queue[fr->callno], cur, list) { /* If it's our call, and our timestamp, mark -1 retries */ - if ((fr->callno == cur->callno) && (x == cur->oseqno)) { + if (x == cur->oseqno) { cur->retries = -1; /* Destroy call if this is the end */ if (cur->final) call_to_destroy = fr->callno; } } - AST_LIST_UNLOCK(&frame_queue); if (call_to_destroy) { if (iaxdebug) ast_debug(1, "Really destroying %d, having been acked on final message\n", call_to_destroy); @@ -8905,13 +8917,12 @@ retryowner: case IAX_COMMAND_TXACC: if (iaxs[fr->callno]->transferring == TRANSFER_BEGIN) { /* Ack the packet with the given timestamp */ - AST_LIST_LOCK(&frame_queue); - AST_LIST_TRAVERSE(&frame_queue, cur, list) { + AST_LIST_TRAVERSE(&frame_queue[fr->callno], cur, list) { /* Cancel any outstanding txcnt's */ - if ((fr->callno == cur->callno) && (cur->transfer)) + if (cur->transfer) { cur->retries = -1; + } } - AST_LIST_UNLOCK(&frame_queue); memset(&ied1, 0, sizeof(ied1)); iax_ie_append_short(&ied1, IAX_IE_CALLNO, iaxs[fr->callno]->callno); send_command(iaxs[fr->callno], AST_FRAME_IAX, IAX_COMMAND_TXREADY, 0, ied1.buf, ied1.pos, -1); @@ -9810,13 +9821,12 @@ immediatedial: break; case IAX_COMMAND_TXMEDIA: if (iaxs[fr->callno]->transferring == TRANSFER_READY) { - AST_LIST_LOCK(&frame_queue); - AST_LIST_TRAVERSE(&frame_queue, cur, list) { + AST_LIST_TRAVERSE(&frame_queue[fr->callno], cur, list) { /* Cancel any outstanding frames and start anew */ - if ((fr->callno == cur->callno) && (cur->transfer)) + if (cur->transfer) { cur->retries = -1; + } } - AST_LIST_UNLOCK(&frame_queue); /* Start sending our media to the transfer address, but otherwise leave the call as-is */ iaxs[fr->callno]->transferring = TRANSFER_MEDIAPASS; } @@ -10495,66 +10505,18 @@ static struct ast_channel *iax2_request(const char *type, int format, void *data static void *network_thread(void *ignore) { - /* Our job is simple: Send queued messages, retrying if necessary. Read frames - from the network, and queue them for delivery to the channels */ - int res, count, wakeup; - struct iax_frame *f; - - if (timer) + if (timer) { ast_io_add(io, ast_timer_fd(timer), timing_read, AST_IO_IN | AST_IO_PRI, NULL); - - for(;;) { - pthread_testcancel(); - - /* Go through the queue, sending messages which have not yet been - sent, and scheduling retransmissions if appropriate */ - AST_LIST_LOCK(&frame_queue); - count = 0; - wakeup = -1; - AST_LIST_TRAVERSE_SAFE_BEGIN(&frame_queue, f, list) { - if (f->sentyet) - continue; - - /* Try to lock the pvt, if we can't... don't fret - defer it till later */ - if (ast_mutex_trylock(&iaxsl[f->callno])) { - wakeup = 1; - continue; - } - - f->sentyet = 1; - - if (iaxs[f->callno]) { - send_packet(f); - count++; - } - - ast_mutex_unlock(&iaxsl[f->callno]); - - if (f->retries < 0) { - /* This is not supposed to be retransmitted */ - AST_LIST_REMOVE_CURRENT(list); - /* Free the iax frame */ - iax_frame_free(f); - } else { - /* We need reliable delivery. Schedule a retransmission */ - f->retries++; - f->retrans = iax2_sched_add(sched, f->retrytime, attempt_transmit, f); - } - } - AST_LIST_TRAVERSE_SAFE_END; - AST_LIST_UNLOCK(&frame_queue); + } + for (;;) { pthread_testcancel(); - if (count >= 20) - ast_debug(1, "chan_iax2: Sent %d queued outbound frames all at once\n", count); - - /* Now do the IO, and run scheduled tasks */ - res = ast_io_wait(io, wakeup); - if (res >= 0) { - if (res >= 20) - ast_debug(1, "chan_iax2: ast_io_wait ran %d I/Os all at once\n", res); - } + /* Wake up once a second just in case SIGURG was sent while + * we weren't in poll(), to make sure we don't hang when trying + * to unload. */ + ast_io_wait(io, 1000); } + return NULL; } @@ -12426,19 +12388,14 @@ static int __unload_module(void) struct ast_context *con; int x; - /* Make sure threads do not hold shared resources when they are canceled */ - - /* Grab the sched lock resource to keep it away from threads about to die */ - /* Cancel the network thread, close the net socket */ if (netthreadid != AST_PTHREADT_NULL) { - AST_LIST_LOCK(&frame_queue); pthread_cancel(netthreadid); - AST_LIST_UNLOCK(&frame_queue); + pthread_kill(netthreadid, SIGURG); pthread_join(netthreadid, NULL); } sched = ast_sched_thread_destroy(sched); - + /* Call for all threads to halt */ AST_LIST_LOCK(&idle_list); while ((thread = AST_LIST_REMOVE_HEAD(&idle_list, list))) @@ -12489,6 +12446,7 @@ static int __unload_module(void) if (timer) { ast_timer_close(timer); } + transmit_processor = ast_taskprocessor_unreference(transmit_processor); con = ast_context_find(regcontext); if (con) @@ -12557,19 +12515,23 @@ static int load_module(void) struct iax2_registry *reg = NULL; peers = ao2_container_alloc(MAX_PEER_BUCKETS, peer_hash_cb, peer_cmp_cb); - if (!peers) + if (!peers) { return AST_MODULE_LOAD_FAILURE; + } + users = ao2_container_alloc(MAX_USER_BUCKETS, user_hash_cb, user_cmp_cb); if (!users) { ao2_ref(peers, -1); return AST_MODULE_LOAD_FAILURE; } + iax_peercallno_pvts = ao2_container_alloc(IAX_MAX_CALLS, pvt_hash_cb, pvt_cmp_cb); if (!iax_peercallno_pvts) { ao2_ref(peers, -1); ao2_ref(users, -1); return AST_MODULE_LOAD_FAILURE; } + iax_transfercallno_pvts = ao2_container_alloc(IAX_MAX_CALLS, transfercallno_pvt_hash_cb, transfercallno_pvt_cmp_cb); if (!iax_transfercallno_pvts) { ao2_ref(peers, -1); @@ -12577,6 +12539,16 @@ static int load_module(void) ao2_ref(iax_peercallno_pvts, -1); return AST_MODULE_LOAD_FAILURE; } + + transmit_processor = ast_taskprocessor_get("iax2_transmit", TPS_REF_DEFAULT); + if (!transmit_processor) { + ao2_ref(peers, -1); + ao2_ref(users, -1); + ao2_ref(iax_peercallno_pvts, -1); + ao2_ref(iax_transfercallno_pvts, -1); + return AST_MODULE_LOAD_FAILURE; + } + ast_custom_function_register(&iaxpeer_function); ast_custom_function_register(&iaxvar_function); -- cgit v1.2.3