aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormarkster <markster@f38db490-d61c-443f-a65b-d21fe96a405b>2006-02-26 20:27:14 +0000
committermarkster <markster@f38db490-d61c-443f-a65b-d21fe96a405b>2006-02-26 20:27:14 +0000
commit181407f0739de8cfd78f71645cbc49f5063996c2 (patch)
treec89e937d59c16bf3eb703206be390859ef617e51
parent89a70084d9d7f3ddbaa762ed2af208accfe7d804 (diff)
Make IAX2 multithreaded
git-svn-id: http://svn.digium.com/svn/asterisk/trunk@11192 f38db490-d61c-443f-a65b-d21fe96a405b
-rw-r--r--channels/chan_iax2.c453
-rw-r--r--configs/iax.conf.sample4
2 files changed, 405 insertions, 52 deletions
diff --git a/channels/chan_iax2.c b/channels/chan_iax2.c
index f6a197783..e5f52ca1a 100644
--- a/channels/chan_iax2.c
+++ b/channels/chan_iax2.c
@@ -100,6 +100,10 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
* otherwise, use the old jitterbuffer */
#define NEWJB
+/* Define SCHED_MULTITHREADED to run the scheduler in a special
+ multithreaded mode. */
+#define SCHED_MULTITHREADED
+
#ifdef NEWJB
#include "../jitterbuf.h"
#endif
@@ -124,6 +128,7 @@ static int nochecksums = 0;
#define PTR_TO_CALLNO(a) ((unsigned short)(unsigned long)(a))
#define CALLNO_TO_PTR(a) ((void *)(unsigned long)(a))
+#define DEFAULT_THREAD_COUNT 10
#define DEFAULT_RETRY_TIME 1000
#define MEMORY_SIZE 100
#define DEFAULT_DROP 3
@@ -230,6 +235,7 @@ static int iax2_encryption = 0;
static struct ast_flags globalflags = { 0 };
static pthread_t netthreadid = AST_PTHREADT_NULL;
+static pthread_t schedthreadid = AST_PTHREADT_NULL;
enum {
IAX_STATE_STARTED = (1 << 0),
@@ -429,6 +435,8 @@ static int max_jitter_buffer = MAX_JITTER_BUFFER;
/* If we have less than this much excess real jitter buffer, enlarge it. */
static int min_jitter_buffer = MIN_JITTER_BUFFER;
+static int iaxthreadcount = DEFAULT_THREAD_COUNT;
+
struct iax_rr {
int jitter;
int losspct;
@@ -660,6 +668,35 @@ static struct iax2_peer *realtime_peer(const char *peername, struct sockaddr_in
static void destroy_peer(struct iax2_peer *peer);
static int ast_cli_netstats(int fd, int limit_fmt);
+#define IAX_IOSTATE_IDLE 0
+#define IAX_IOSTATE_READY 1
+#define IAX_IOSTATE_PROCESSING 2
+#define IAX_IOSTATE_SCHEDREADY 3
+
+struct iax2_thread {
+ ASTOBJ_COMPONENTS(struct iax2_thread);
+ int iostate;
+#ifdef SCHED_MULTITHREADED
+ void (*schedfunc)(void *);
+ void *scheddata;
+#endif
+ int actions;
+ int halt;
+ pthread_t threadid;
+ int threadnum;
+ struct sockaddr_in iosin;
+ unsigned char buf[4096];
+ int iores;
+ int iofd;
+ time_t checktime;
+};
+
+struct iax2_thread_list {
+ ASTOBJ_CONTAINER_COMPONENTS(struct iax2_thread);
+};
+
+static struct iax2_thread_list idlelist, activelist;
+
static void iax_debug_output(const char *data)
{
if (iaxdebug)
@@ -771,18 +808,58 @@ static const struct ast_channel_tech iax2_tech = {
.fixup = iax2_fixup,
};
+static struct iax2_thread *find_idle_thread(void)
+{
+ struct iax2_thread *thread;
+ thread = ASTOBJ_CONTAINER_UNLINK_START(&idlelist);
+ return thread;
+}
+
+#ifdef SCHED_MULTITHREADED
+static int schedule_action(void (*func)(void *data), void *data)
+{
+ struct iax2_thread *thread;
+ static time_t lasterror;
+ static time_t t;
+ thread = find_idle_thread();
+ if (thread) {
+ thread->schedfunc = func;
+ thread->scheddata = data;
+ thread->iostate = IAX_IOSTATE_SCHEDREADY;
+ pthread_kill(thread->threadid, SIGURG);
+ return 0;
+ }
+ time(&t);
+ if (t != lasterror)
+ ast_log(LOG_NOTICE, "Out of idle IAX2 threads for scheduling!\n");
+ lasterror = t;
+ return -1;
+}
+#endif
+
+static void __send_ping(void *data)
+{
+ int callno = (long)data;
+ send_command(iaxs[callno], AST_FRAME_IAX, IAX_COMMAND_PING, 0, NULL, 0, -1);
+}
+
static int send_ping(void *data)
{
int callno = (long)data;
- /* Ping only if it's real, not if it's bridged */
if (iaxs[callno]) {
#ifdef BRIDGE_OPTIMIZATION
- if (!iaxs[callno]->bridgecallno)
+ if (!iaxs[callno]->bridgecallno)
#endif
- send_command(iaxs[callno], AST_FRAME_IAX, IAX_COMMAND_PING, 0, NULL, 0, -1);
+ {
+#ifdef SCHED_MULTITHREADED
+ if (schedule_action(__send_ping, data))
+#endif
+ __send_ping(data);
+ }
return 1;
} else
return 0;
+ return 0;
}
static int get_encrypt_methods(const char *s)
@@ -797,18 +874,30 @@ static int get_encrypt_methods(const char *s)
return e;
}
-static int send_lagrq(void *data)
+static void __send_lagrq(void *data)
{
int callno = (long)data;
/* Ping only if it's real not if it's bridged */
+ send_command(iaxs[callno], AST_FRAME_IAX, IAX_COMMAND_LAGRQ, 0, NULL, 0, -1);
+}
+
+static int send_lagrq(void *data)
+{
+ int callno = (long)data;
if (iaxs[callno]) {
#ifdef BRIDGE_OPTIMIZATION
- if (!iaxs[callno]->bridgecallno)
+ if (!iaxs[callno]->bridgecallno)
+#endif
+ {
+#ifdef SCHED_MULTITHREADED
+ if (schedule_action(__send_lagrq, data))
#endif
- send_command(iaxs[callno], AST_FRAME_IAX, IAX_COMMAND_LAGRQ, 0, NULL, 0, -1);
+ __send_lagrq(data);
+ }
return 1;
} else
return 0;
+ return 0;
}
static unsigned char compress_subclass(int subclass)
@@ -1422,7 +1511,7 @@ static int __do_deliver(void *data)
}
#ifndef NEWJB
-static int do_deliver(void *data)
+static int __real_do_deliver(void *data)
{
/* Locking version of __do_deliver */
struct iax_frame *fr = data;
@@ -1433,6 +1522,14 @@ static int do_deliver(void *data)
ast_mutex_unlock(&iaxsl[callno]);
return res;
}
+static int do_deliver(void *data)
+{
+#ifdef SCHED_MULTITHREADED
+ if (schedule_action(__do_deliver, data))
+#endif
+ __real_do_deliver(data);
+ return 0;
+}
#endif /* NEWJB */
static int handle_error(void)
@@ -1693,7 +1790,8 @@ static int update_packet(struct iax_frame *f)
return 0;
}
-static int attempt_transmit(void *data)
+static int attempt_transmit(void *data);
+static void __attempt_transmit(void *data)
{
/* Attempt to transmit the frame to the remote peer...
Called without iaxsl held. */
@@ -1780,6 +1878,14 @@ static int attempt_transmit(void *data)
/* Free the IAX frame */
iax2_frame_free(f);
}
+}
+
+static int attempt_transmit(void *data)
+{
+#ifdef SCHED_MULTITHREADED
+ if (schedule_action(__attempt_transmit, data))
+#endif
+ __attempt_transmit(data);
return 0;
}
@@ -2166,9 +2272,10 @@ static void update_jbsched(struct chan_iax2_pvt *pvt) {
}
pvt->jbid = ast_sched_add(sched, when, get_from_jb, (void *)pvt);
+ pthread_kill(schedthreadid, SIGURG);
}
-static int get_from_jb(void *p)
+static void __get_from_jb(void *p)
{
/* make sure pvt is valid! */
struct chan_iax2_pvt *pvt = p;
@@ -2238,7 +2345,15 @@ static int get_from_jb(void *p)
}
update_jbsched(pvt);
ast_mutex_unlock(&iaxsl[pvt->callno]);
- return 0;
+}
+
+static int get_from_jb(void *data)
+{
+#ifdef SCHED_MULTITHREADED
+ if (schedule_action(__get_from_jb, data))
+#endif
+ __get_from_jb(data);
+ return 0;
}
#endif
@@ -2491,6 +2606,7 @@ static int schedule_delivery(struct iax_frame *fr, int updatehistory, int fromtr
if (option_debug && iaxdebug)
ast_log(LOG_DEBUG, "schedule_delivery: Scheduling delivery in %d ms\n", delay);
fr->retrans = ast_sched_add(sched, delay, do_deliver, fr);
+ pthread_kill(schedthreadid, SIGURG);
}
#endif
if (tsout)
@@ -2524,8 +2640,9 @@ static int iax2_transmit(struct iax_frame *fr)
}
iaxq.count++;
ast_mutex_unlock(&iaxq.lock);
- /* Wake up the network thread */
+ /* Wake up the network and scheduler thread */
pthread_kill(netthreadid, SIGURG);
+ pthread_kill(schedthreadid, SIGURG);
return 0;
}
@@ -2827,7 +2944,7 @@ static int create_addr(const char *peername, struct sockaddr_in *sin, struct cre
return 0;
}
-static int auto_congest(void *nothing)
+static void __auto_congest(void *nothing)
{
int callno = PTR_TO_CALLNO(nothing);
struct ast_frame f = { AST_FRAME_CONTROL, AST_CONTROL_CONGESTION };
@@ -2838,6 +2955,14 @@ static int auto_congest(void *nothing)
ast_log(LOG_NOTICE, "Auto-congesting call due to slow response\n");
}
ast_mutex_unlock(&iaxsl[callno]);
+}
+
+static int auto_congest(void *data)
+{
+#ifdef SCHED_MULTITHREADED
+ if (schedule_action(__auto_congest, data))
+#endif
+ __auto_congest(data);
return 0;
}
@@ -4280,6 +4405,31 @@ static int __iax2_show_peers(int manager, int fd, int argc, char *argv[])
#undef FORMAT2
}
+static int iax2_show_threads(int fd, int argc, char *argv[])
+{
+ time_t t;
+ int threadcount = 0;
+ if (argc != 3)
+ return RESULT_SHOWUSAGE;
+
+ ast_cli(fd, "IAX2 Thread Information\n");
+ time(&t);
+ ast_cli(fd, "Idle Threads:\n");
+ ASTOBJ_CONTAINER_TRAVERSE(&idlelist, 1, {
+ ast_cli(fd, "Thread %d: state %d, last update: %d seconds ago, %d actions handled, refcnt = %d\n",
+ iterator->threadnum, iterator->iostate, (int)(t - iterator->checktime), iterator->actions, iterator->refcount);
+ threadcount++;
+ });
+ ast_cli(fd, "Active Threads:\n");
+ ASTOBJ_CONTAINER_TRAVERSE(&activelist, 1, {
+ ast_cli(fd, "Thread %d: state %d, last update: %d seconds ago, %d actions handled, refcnt = %d\n",
+ iterator->threadnum, iterator->iostate, (int)(t - iterator->checktime), iterator->actions, iterator->refcount);
+ threadcount++;
+ });
+ ast_cli(fd, "%d of %d threads accounted for\n", threadcount, iaxthreadcount);
+ return RESULT_SUCCESS;
+}
+
static int iax2_show_peers(int fd, int argc, char *argv[])
{
return __iax2_show_peers(0, fd, argc, argv);
@@ -5277,11 +5427,19 @@ static int authenticate_reply(struct chan_iax2_pvt *p, struct sockaddr_in *sin,
static int iax2_do_register(struct iax2_registry *reg);
-static int iax2_do_register_s(void *data)
+static void __iax2_do_register_s(void *data)
{
struct iax2_registry *reg = data;
reg->expire = -1;
iax2_do_register(reg);
+}
+
+static int iax2_do_register_s(void *data)
+{
+#ifdef SCHED_MULTITHREADED
+ if (schedule_action(__iax2_do_register_s, data))
+#endif
+ __iax2_do_register_s(data);
return 0;
}
@@ -5562,7 +5720,7 @@ static void register_peer_exten(struct iax2_peer *peer, int onoff)
}
static void prune_peers(void);
-static int expire_registry(void *data)
+static void __expire_registry(void *data)
{
struct iax2_peer *p = data;
@@ -5584,11 +5742,17 @@ static int expire_registry(void *data)
ast_set_flag(p, IAX_DELME);
prune_peers();
}
+}
+static int expire_registry(void *data)
+{
+#ifdef SCHED_MULTITHREADED
+ if (schedule_action(__expire_registry, data))
+#endif
+ __expire_registry(data);
return 0;
}
-
static int iax2_poke_peer(struct iax2_peer *peer, int heldcall);
static void reg_source_db(struct iax2_peer *p)
@@ -5828,7 +5992,7 @@ static int stop_stuff(int callno)
return 0;
}
-static int auth_reject(void *nothing)
+static void __auth_reject(void *nothing)
{
/* Called from IAX thread only, without iaxs lock */
int callno = (int)(long)(nothing);
@@ -5847,6 +6011,14 @@ static int auth_reject(void *nothing)
send_command_final(iaxs[callno], AST_FRAME_IAX, iaxs[callno]->authfail, 0, ied.buf, ied.pos, -1);
}
ast_mutex_unlock(&iaxsl[callno]);
+}
+
+static int auth_reject(void *data)
+{
+#ifdef SCHED_MULTITHREADED
+ if (schedule_action(__auth_reject, data))
+#endif
+ __auth_reject(data);
return 0;
}
@@ -5868,7 +6040,7 @@ static int auth_fail(int callno, int failcode)
return 0;
}
-static int auto_hangup(void *nothing)
+static void __auto_hangup(void *nothing)
{
/* Called from IAX thread only, without iaxs lock */
int callno = (int)(long)(nothing);
@@ -5882,6 +6054,14 @@ static int auto_hangup(void *nothing)
send_command_final(iaxs[callno], AST_FRAME_IAX, IAX_COMMAND_HANGUP, 0, ied.buf, ied.pos, -1);
}
ast_mutex_unlock(&iaxsl[callno]);
+}
+
+static int auto_hangup(void *data)
+{
+#ifdef SCHED_MULTITHREADED
+ if (schedule_action(__auto_hangup, data))
+#endif
+ __auto_hangup(data);
return 0;
}
@@ -5919,11 +6099,19 @@ static void vnak_retransmit(int callno, int last)
ast_mutex_unlock(&iaxq.lock);
}
-static int iax2_poke_peer_s(void *data)
+static void __iax2_poke_peer_s(void *data)
{
struct iax2_peer *peer = data;
peer->pokeexpire = -1;
iax2_poke_peer(peer, 0);
+}
+
+static int iax2_poke_peer_s(void *data)
+{
+#ifdef SCHED_MULTITHREADED
+ if (schedule_action(__iax2_poke_peer_s, data))
+#endif
+ __iax2_poke_peer_s(data);
return 0;
}
@@ -6258,18 +6446,50 @@ static void save_rr(struct iax_frame *fr, struct iax_ies *ies)
static int socket_read(int *id, int fd, short events, void *cbdata)
{
+ struct iax2_thread *thread;
+ socklen_t len;
+ thread = find_idle_thread();
+ time_t t;
+ static time_t last_errtime=0;
+ if (thread) {
+ len = sizeof(thread->iosin);
+ thread->iofd = fd;
+ thread->iores = recvfrom(fd, thread->buf, sizeof(thread->buf), 0,(struct sockaddr *) &thread->iosin, &len);
+ if (thread->iores < 0) {
+ if (errno != ECONNREFUSED)
+ ast_log(LOG_WARNING, "Error: %s\n", strerror(errno));
+ handle_error();
+ return 1;
+ }
+ if(test_losspct) { /* simulate random loss condition */
+ if( (100.0*rand()/(RAND_MAX+1.0)) < test_losspct)
+ return 1;
+ }
+ /* Mark as ready and send on its way */
+ thread->iostate = IAX_IOSTATE_READY;
+ pthread_kill(thread->threadid, SIGURG);
+ } else {
+ time(&t);
+ if (t != last_errtime)
+ ast_log(LOG_NOTICE, "Out of idle IAX2 threads for I/O, pausing!\n");
+ last_errtime = t;
+ usleep(1);
+ }
+ return 1;
+}
+
+static int socket_process(struct iax2_thread *thread)
+{
struct sockaddr_in sin;
int res;
int updatehistory=1;
int new = NEW_PREVENT;
- unsigned char buf[4096];
void *ptr;
- socklen_t len = sizeof(sin);
int dcallno = 0;
- struct ast_iax2_full_hdr *fh = (struct ast_iax2_full_hdr *)buf;
- struct ast_iax2_mini_hdr *mh = (struct ast_iax2_mini_hdr *)buf;
- struct ast_iax2_meta_hdr *meta = (struct ast_iax2_meta_hdr *)buf;
- struct ast_iax2_video_hdr *vh = (struct ast_iax2_video_hdr *)buf;
+ struct ast_iax2_full_hdr *fh = (struct ast_iax2_full_hdr *)thread->buf;
+ struct ast_iax2_mini_hdr *mh = (struct ast_iax2_mini_hdr *)thread->buf;
+ struct ast_iax2_meta_hdr *meta = (struct ast_iax2_meta_hdr *)thread->buf;
+ struct ast_iax2_video_hdr *vh = (struct ast_iax2_video_hdr *)thread->buf;
struct ast_iax2_meta_trunk_hdr *mth;
struct ast_iax2_meta_trunk_entry *mte;
struct ast_iax2_meta_trunk_mini *mtm;
@@ -6286,6 +6506,7 @@ static int socket_read(int *id, int fd, short events, void *cbdata)
struct iax_ies ies;
struct iax_ie_data ied0, ied1;
int format;
+ int fd;
int exists;
int minivid = 0;
unsigned int ts;
@@ -6298,19 +6519,12 @@ static int socket_read(int *id, int fd, short events, void *cbdata)
dblbuf[0] = 0; /* Keep GCC from whining */
fr.callno = 0;
-
- res = recvfrom(fd, buf, sizeof(buf), 0,(struct sockaddr *) &sin, &len);
- if (res < 0) {
- if (errno != ECONNREFUSED)
- ast_log(LOG_WARNING, "Error: %s\n", strerror(errno));
- handle_error();
- return 1;
- }
- if(test_losspct) { /* simulate random loss condition */
- if( (100.0*rand()/(RAND_MAX+1.0)) < test_losspct)
- return 1;
-
- }
+
+ /* Copy frequently used parameters to the stack */
+ res = thread->iores;
+ fd = thread->iofd;
+ memcpy(&sin, &thread->iosin, sizeof(sin));
+
if (res < sizeof(struct ast_iax2_mini_hdr)) {
ast_log(LOG_WARNING, "midget packet received (%d of %d min)\n", res, (int)sizeof(struct ast_iax2_mini_hdr));
return 1;
@@ -6643,14 +6857,14 @@ static int socket_read(int *id, int fd, short events, void *cbdata)
if (f.datalen) {
if (f.frametype == AST_FRAME_IAX) {
- if (iax_parse_ies(&ies, buf + sizeof(struct ast_iax2_full_hdr), f.datalen)) {
+ if (iax_parse_ies(&ies, thread->buf + sizeof(struct ast_iax2_full_hdr), f.datalen)) {
ast_log(LOG_WARNING, "Undecodable frame received from '%s'\n", ast_inet_ntoa(iabuf, sizeof(iabuf), sin.sin_addr));
ast_mutex_unlock(&iaxsl[fr.callno]);
return 1;
}
f.data = NULL;
} else
- f.data = buf + sizeof(struct ast_iax2_full_hdr);
+ f.data = thread->buf + sizeof(struct ast_iax2_full_hdr);
} else {
if (f.frametype == AST_FRAME_IAX)
f.data = NULL;
@@ -7526,7 +7740,7 @@ retryowner2:
}
f.datalen = res - sizeof(struct ast_iax2_video_hdr);
if (f.datalen)
- f.data = buf + sizeof(struct ast_iax2_video_hdr);
+ f.data = thread->buf + sizeof(struct ast_iax2_video_hdr);
else
f.data = NULL;
#ifdef IAXTESTS
@@ -7553,7 +7767,7 @@ retryowner2:
return 1;
}
if (f.datalen)
- f.data = buf + sizeof(struct ast_iax2_mini_hdr);
+ f.data = thread->buf + sizeof(struct ast_iax2_mini_hdr);
else
f.data = NULL;
#ifdef IAXTESTS
@@ -7620,6 +7834,59 @@ retryowner2:
return 1;
}
+static void destroy_helper(struct iax2_thread *thread)
+{
+ ast_log(LOG_DEBUG, "Destroying helper %d!\n", thread->threadnum);
+ free(thread);
+}
+
+static void *iax2_process_thread(void *data)
+{
+ struct iax2_thread *thread_copy, *thread = data;
+ struct timeval tv;
+ for(;;) {
+ /* Sleep for up to 1 second */
+ tv.tv_sec = 1;
+ tv.tv_usec = 0;
+ select(0, NULL, NULL, NULL, &tv);
+ /* Unlink from idlelist / activelist if there*/
+ ASTOBJ_CONTAINER_UNLINK(&idlelist, thread);
+ ASTOBJ_CONTAINER_UNLINK(&activelist, thread);
+ /* If instructed to halt, stop now */
+ if (thread->halt) {
+ ast_log(LOG_DEBUG, "Halting, refcount = %d\n", thread->refcount);
+ ASTOBJ_UNREF(thread, destroy_helper);
+ break;
+ }
+ /* Remove our reference */
+ ASTOBJ_CONTAINER_LINK_END(&activelist, thread);
+ switch(thread->iostate) {
+ case IAX_IOSTATE_READY:
+ thread->actions++;
+ thread->iostate = IAX_IOSTATE_PROCESSING;
+ socket_process(thread);
+ break;
+ case IAX_IOSTATE_SCHEDREADY:
+ thread->actions++;
+ thread->iostate = IAX_IOSTATE_PROCESSING;
+ thread->schedfunc(thread->scheddata);
+ break;
+ }
+ time(&thread->checktime);
+ thread->iostate = IAX_IOSTATE_IDLE;
+ ASTOBJ_CONTAINER_UNLINK(&activelist, thread);
+ ASTOBJ_CONTAINER_LINK_END(&idlelist, thread);
+ /* Make a copy so we don't lose thread, but if
+ we become unreferenced here, our thread gets
+ cancelled anyway, so it's okay */
+ thread_copy = thread;
+ ASTOBJ_UNREF(thread_copy, destroy_helper);
+ thread_copy = thread;
+ ASTOBJ_UNREF(thread_copy, destroy_helper);
+ }
+ return NULL;
+}
+
static int iax2_do_register(struct iax2_registry *reg)
{
struct iax_ie_data ied;
@@ -7771,7 +8038,7 @@ static int iax2_prov_cmd(int fd, int argc, char *argv[])
return RESULT_SUCCESS;
}
-static int iax2_poke_noanswer(void *data)
+static void __iax2_poke_noanswer(void *data)
{
struct iax2_peer *peer = data;
peer->pokeexpire = -1;
@@ -7786,6 +8053,14 @@ static int iax2_poke_noanswer(void *data)
peer->lastms = -1;
/* Try again quickly */
peer->pokeexpire = ast_sched_add(sched, peer->pokefreqnotok, iax2_poke_peer_s, peer);
+}
+
+static int iax2_poke_noanswer(void *data)
+{
+#ifdef SCHED_MULTITHREADED
+ if (schedule_action(__iax2_poke_noanswer, data))
+#endif
+ __iax2_poke_noanswer(data);
return 0;
}
@@ -7918,6 +8193,26 @@ static struct ast_channel *iax2_request(const char *type, int format, void *data
return c;
}
+static void *sched_thread(void *ignore)
+{
+ int count;
+ int res;
+ for (;;) {
+ res = ast_sched_wait(sched);
+ if ((res > 1000) || (res < 0))
+ res = 1000;
+ res = poll(NULL, 0, res);
+ if (res < 0) {
+ if ((errno != EAGAIN) && (errno != EINTR))
+ ast_log(LOG_WARNING, "poll failed: %s\n", strerror(errno));
+ }
+ count = ast_sched_runq(sched);
+ if (count >= 20)
+ ast_log(LOG_DEBUG, "chan_iax2: ast_sched_runq ran %d scheduled tasks all at once\n", count);
+ }
+ return NULL;
+}
+
static void *network_thread(void *ignore)
{
/* Our job is simple: Send queued messages, retrying if necessary. Read frames
@@ -7958,6 +8253,7 @@ static void *network_thread(void *ignore)
/* We need reliable delivery. Schedule a retransmission */
f->retries++;
f->retrans = ast_sched_add(sched, f->retrytime, attempt_transmit, f);
+ pthread_kill(schedthreadid, SIGURG);
}
}
f = f->next;
@@ -7969,16 +8265,10 @@ static void *network_thread(void *ignore)
ast_log(LOG_DEBUG, "chan_iax2: Sent %d queued outbound frames all at once\n", count);
/* Now do the IO, and run scheduled tasks */
- res = ast_sched_wait(sched);
- if ((res > 1000) || (res < 0))
- res = 1000;
- res = ast_io_wait(io, res);
+ res = ast_io_wait(io, -1);
if (res >= 0) {
if (res >= 20)
ast_log(LOG_DEBUG, "chan_iax2: ast_io_wait ran %d I/Os all at once\n", res);
- count = ast_sched_runq(sched);
- if (count >= 20)
- ast_log(LOG_DEBUG, "chan_iax2: ast_sched_runq ran %d scheduled tasks all at once\n", count);
}
}
return NULL;
@@ -7986,7 +8276,29 @@ static void *network_thread(void *ignore)
static int start_network_thread(void)
{
- return ast_pthread_create(&netthreadid, NULL, network_thread, NULL);
+ int threadcount = 0;
+ int x;
+ ASTOBJ_CONTAINER_INIT(&idlelist);
+ ASTOBJ_CONTAINER_INIT(&activelist);
+ for (x = 0; x < iaxthreadcount; x++) {
+ struct iax2_thread *thread = ast_calloc(1, sizeof(struct iax2_thread));
+ if (thread) {
+ ASTOBJ_INIT(thread);
+ thread->threadnum = ++threadcount;
+ if (ast_pthread_create(&thread->threadid, NULL, iax2_process_thread, thread)) {
+ ast_log(LOG_WARNING, "Failed to create new thread!\n");
+ free(thread);
+ thread = NULL;
+ }
+ ASTOBJ_CONTAINER_LINK_END(&idlelist, thread);
+ ASTOBJ_UNREF(thread, destroy_helper);
+ }
+ }
+ ast_pthread_create(&schedthreadid, NULL, sched_thread, NULL);
+ ast_pthread_create(&netthreadid, NULL, network_thread, NULL);
+ if (option_verbose > 1)
+ ast_verbose(VERBOSE_PREFIX_2 "%d helper threaads started\n", threadcount);
+ return 0;
}
static struct iax2_context *build_context(char *context)
@@ -8620,7 +8932,21 @@ static int set_config(char *config_file, int reload)
portno = atoi(v->value);
} else if (!strcasecmp(v->name, "pingtime"))
ping_time = atoi(v->value);
- else if (!strcasecmp(v->name, "nochecksums")) {
+ else if (!strcasecmp(v->name, "iaxthreadcount")) {
+ if (reload) {
+ if (atoi(v->value) != iaxthreadcount)
+ ast_log(LOG_NOTICE, "Ignoring any changes to iaxthreadcount during reload\n");
+ } else {
+ iaxthreadcount = atoi(v->value);
+ if (iaxthreadcount < 1) {
+ ast_log(LOG_NOTICE, "iaxthreadcount must be at least 1.\n");
+ iaxthreadcount = 1;
+ } else if (iaxthreadcount > 256) {
+ ast_log(LOG_NOTICE, "limiting iaxthreadcount to 256\n");
+ iaxthreadcount = 256;
+ }
+ }
+ } else if (!strcasecmp(v->name, "nochecksums")) {
#ifdef SO_NO_CHECK
if (ast_true(v->value))
nochecksums = 1;
@@ -8759,7 +9085,7 @@ static int set_config(char *config_file, int reload)
amaflags = format;
}
} else if (!strcasecmp(v->name, "language")) {
- ast_copy_string(language, v->value, sizeof(language));
+ ast_copy_string(language, v->value, sizeof(language));
} /*else if (strcasecmp(v->name,"type")) */
/* ast_log(LOG_WARNING, "Ignoring %s\n", v->name); */
v = v->next;
@@ -9366,6 +9692,10 @@ static char show_netstats_usage[] =
"Usage: iax2 show netstats\n"
" Lists network status for all currently active IAX channels.\n";
+static char show_threads_usage[] =
+"Usage: iax2 show threads\n"
+" Lists status of IAX helper threads\n";
+
static char show_peers_usage[] =
"Usage: iax2 show peers [registered] [like <pattern>]\n"
" Lists all known IAX2 peers.\n"
@@ -9445,6 +9775,8 @@ static struct ast_cli_entry iax2_cli[] = {
"Show active IAX channel netstats", show_netstats_usage },
{ { "iax2", "show", "peers", NULL }, iax2_show_peers,
"Show defined IAX peers", show_peers_usage },
+ { { "iax2", "show", "threads", NULL }, iax2_show_threads,
+ "Show IAX helper thread info", show_threads_usage },
{ { "iax2", "show", "registry", NULL }, iax2_show_registry,
"Show IAX registration status", show_reg_usage },
{ { "iax2", "debug", NULL }, iax2_do_debug,
@@ -9481,6 +9813,23 @@ static int __unload_module(void)
pthread_cancel(netthreadid);
pthread_join(netthreadid, NULL);
}
+ if (schedthreadid != AST_PTHREADT_NULL) {
+ pthread_cancel(schedthreadid);
+ pthread_join(schedthreadid, NULL);
+ }
+ while (idlelist.head || activelist.head) {
+ ASTOBJ_CONTAINER_TRAVERSE(&idlelist, 1, {
+ iterator->halt = 1;
+ pthread_kill(iterator->threadid, SIGURG);
+ });
+ ASTOBJ_CONTAINER_TRAVERSE(&activelist, 1, {
+ iterator->halt = 1;
+ pthread_kill(iterator->threadid, SIGURG);
+ });
+ usleep(100000);
+ }
+ ASTOBJ_CONTAINER_DESTROY(&idlelist);
+ ASTOBJ_CONTAINER_DESTROY(&activelist);
ast_netsock_release(netsock);
for (x=0;x<IAX_MAX_CALLS;x++)
if (iaxs[x])
diff --git a/configs/iax.conf.sample b/configs/iax.conf.sample
index 26d637d8d..5c4b226b2 100644
--- a/configs/iax.conf.sample
+++ b/configs/iax.conf.sample
@@ -163,6 +163,10 @@ forcejitterbuffer=no
; minregexpire = 60
; maxregexpire = 60
;
+; IAX helper threads
+; Establishes the number of iax helper threads to handle I/O.
+; iaxthreadcount = 10
+;
; We can register with another IAX server to let him know where we are
; in case we have a dynamic IP address for example
;