diff options
author | markster <markster@f38db490-d61c-443f-a65b-d21fe96a405b> | 2006-04-02 19:59:55 +0000 |
---|---|---|
committer | markster <markster@f38db490-d61c-443f-a65b-d21fe96a405b> | 2006-04-02 19:59:55 +0000 |
commit | 7302a69879d92e522760a0c9fad7663f97ff413e (patch) | |
tree | 0852e09dfe1f0b29ab38afbab427318340a7c1e6 /manager.c | |
parent | 18d0848ac2d2eaa5d6b685cd59be77793ebf5f0c (diff) |
Unify manager behind a single event queue
git-svn-id: http://svn.digium.com/svn/asterisk/trunk@16957 f38db490-d61c-443f-a65b-d21fe96a405b
Diffstat (limited to 'manager.c')
-rw-r--r-- | manager.c | 242 |
1 files changed, 171 insertions, 71 deletions
@@ -83,6 +83,14 @@ struct fast_originate_helper { struct ast_variable *vars; }; +struct eventqent { + int usecount; + int category; + ast_mutex_t lock; + struct eventqent *next; + char eventdata[1]; +}; + static int enabled = 0; static int portno = DEFAULT_MANAGER_PORT; static int asock = -1; @@ -93,6 +101,8 @@ static int httptimeout = 60; static pthread_t t; AST_MUTEX_DEFINE_STATIC(sessionlock); static int block_sockets = 0; +static int num_sessions = 0; +struct eventqent *master_eventq = NULL; static struct permalias { int num; @@ -472,6 +482,23 @@ static int handle_showmanconn(int fd, int argc, char *argv[]) return RESULT_SUCCESS; } +/*! \brief handle_showmanconn: CLI command show manager connected */ +/* Should change to "manager show connected" */ +static int handle_showmaneventq(int fd, int argc, char *argv[]) +{ + struct eventqent *s; + ast_mutex_lock(&sessionlock); + s = master_eventq; + while (s) { + ast_cli(fd, "Usecount: %d\n",s->usecount); + ast_cli(fd, "Category: %d\n", s->category); + ast_cli(fd, "Event:\n%s", s->eventdata); + s = s->next; + } + ast_mutex_unlock(&sessionlock); + return RESULT_SUCCESS; +} + static char showmancmd_help[] = "Usage: show manager command <actionname>\n" " Shows the detailed description for a specific Asterisk manager interface command.\n"; @@ -485,6 +512,11 @@ static char showmanconn_help[] = " Prints a listing of the users that are currently connected to the\n" "Asterisk manager interface.\n"; +static char showmaneventq_help[] = +"Usage: show manager eventq\n" +" Prints a listing of all events pending in the Asterisk manger\n" +"event queue.\n"; + static struct ast_cli_entry show_mancmd_cli = { { "show", "manager", "command", NULL }, handle_showmancmd, "Show a manager interface command", showmancmd_help, complete_show_mancmd }; @@ -497,6 +529,24 @@ static struct ast_cli_entry show_manconn_cli = { { "show", "manager", "connected", NULL }, handle_showmanconn, "Show connected manager interface users", showmanconn_help }; +static struct ast_cli_entry show_maneventq_cli = + { { "show", "manager", "eventq", NULL }, + handle_showmaneventq, "Show manager interface queued events", showmaneventq_help }; + +static void unuse_eventqent(struct eventqent *e) +{ + /* XXX Need to atomically decrement the users. Change this to atomic_dec + one day when we have such a beast XXX */ + int val; + ast_mutex_lock(&e->lock); + e->usecount--; + val = e->usecount && e->next; + ast_mutex_unlock(&e->lock); + /* Wake up sleeping beauty */ + if (val) + pthread_kill(t, SIGURG); +} + static void free_session(struct mansession *s) { struct eventqent *eqe; @@ -508,7 +558,7 @@ static void free_session(struct mansession *s) while(s->eventq) { eqe = s->eventq; s->eventq = s->eventq->next; - free(eqe); + unuse_eventqent(eqe); } free(s); } @@ -530,6 +580,7 @@ static void destroy_session(struct mansession *s) else sessions = cur->next; free_session(s); + num_sessions--; } else ast_log(LOG_WARNING, "Trying to delete nonexistent session %p?\n", s); ast_mutex_unlock(&sessionlock); @@ -876,7 +927,7 @@ static int action_waitevent(struct mansession *s, struct message *m) ast_log(LOG_DEBUG, "Starting waiting for an event!\n"); for (x=0;((x<timeout) || (timeout < 0)); x++) { ast_mutex_lock(&s->__lock); - if (s->eventq) + if (s->eventq && s->eventq->next) needexit = 1; if (s->waiting_thread != pthread_self()) needexit = 1; @@ -898,11 +949,14 @@ static int action_waitevent(struct mansession *s, struct message *m) if (s->waiting_thread == pthread_self()) { astman_send_response(s, m, "Success", "Waiting for Event..."); /* Only show events if we're the most recent waiter */ - while(s->eventq) { - astman_append(s, "%s", s->eventq->eventdata); - eqe = s->eventq; - s->eventq = s->eventq->next; - free(eqe); + while(s->eventq->next) { + eqe = s->eventq->next; + if (((s->readperm & eqe->category) == eqe->category) && + ((s->send_events & eqe->category) == eqe->category)) { + astman_append(s, "%s", eqe->eventdata); + } + unuse_eventqent(s->eventq); + s->eventq = eqe; } astman_append(s, "Event: WaitEventComplete\r\n" @@ -1566,6 +1620,30 @@ static int action_timeout(struct mansession *s, struct message *m) return 0; } +static int process_events(struct mansession *s) +{ + struct eventqent *eqe; + int ret = 0; + ast_mutex_lock(&s->__lock); + if (s->fd > -1) { + s->busy--; + if (!s->eventq) + s->eventq = master_eventq; + while(s->eventq->next) { + eqe = s->eventq->next; + if ((s->authenticated && (s->readperm & eqe->category) == eqe->category) && + ((s->send_events & eqe->category) == eqe->category)) { + if (!ret && ast_carefulwrite(s->fd, eqe->eventdata, strlen(eqe->eventdata), s->writetimeout) < 0) + ret = -1; + } + unuse_eventqent(s->eventq); + s->eventq = eqe; + } + } + ast_mutex_unlock(&s->__lock); + return ret; +} + static int process_message(struct mansession *s, struct message *m) { char action[80] = ""; @@ -1573,6 +1651,7 @@ static int process_message(struct mansession *s, struct message *m) char *id = astman_get_header(m,"ActionID"); char idText[256] = ""; char iabuf[INET_ADDRSTRLEN]; + int ret = 0; ast_copy_string(action, astman_get_header(m, "Action"), sizeof(action)); ast_log( LOG_DEBUG, "Manager received command '%s'\n", action ); @@ -1581,9 +1660,9 @@ static int process_message(struct mansession *s, struct message *m) astman_send_error(s, m, "Missing action in request"); return 0; } - if (!ast_strlen_zero(id)) { - snprintf(idText, sizeof(idText), "ActionID: %s\r\n",id); - } + if (!ast_strlen_zero(id)) { + snprintf(idText, sizeof(idText), "ActionID: %s\r\n",id); + } if (!s->authenticated) { if (!strcasecmp(action, "Challenge")) { char *authtype; @@ -1623,8 +1702,6 @@ static int process_message(struct mansession *s, struct message *m) } else astman_send_error(s, m, "Authentication Required"); } else { - int ret=0; - struct eventqent *eqe; ast_mutex_lock(&s->__lock); s->busy++; ast_mutex_unlock(&s->__lock); @@ -1642,23 +1719,10 @@ static int process_message(struct mansession *s, struct message *m) } if (!tmp) astman_send_error(s, m, "Invalid/unknown command"); - ast_mutex_lock(&s->__lock); - if (s->fd > -1) { - s->busy--; - while(s->eventq) { - if (ast_carefulwrite(s->fd, s->eventq->eventdata, strlen(s->eventq->eventdata), s->writetimeout) < 0) { - ret = -1; - break; - } - eqe = s->eventq; - s->eventq = s->eventq->next; - free(eqe); - } - } - ast_mutex_unlock(&s->__lock); - return ret; } - return 0; + if (ret) + return ret; + return process_events(s); } static int get_input(struct mansession *s, char *output) @@ -1687,12 +1751,20 @@ static int get_input(struct mansession *s, char *output) fds[0].fd = s->fd; fds[0].events = POLLIN; do { + ast_mutex_lock(&s->__lock); + s->waiting_thread = pthread_self(); + ast_mutex_unlock(&s->__lock); + res = poll(fds, 1, -1); + + ast_mutex_lock(&s->__lock); + s->waiting_thread = AST_PTHREADT_NULL; + ast_mutex_unlock(&s->__lock); if (res < 0) { if (errno == EINTR) { if (s->dead) return -1; - continue; + return 0; } ast_log(LOG_WARNING, "Select returned error: %s\n", strerror(errno)); return -1; @@ -1734,8 +1806,12 @@ static void *session_do(void *data) memset(&m, 0, sizeof(m)); } else if (m.hdrcount < AST_MAX_MANHEADERS - 1) m.hdrcount++; - } else if (res < 0) + } else if (res < 0) { break; + } else if (s->eventq->next) { + if (process_events(s)) + break; + } } if (s->authenticated) { if (option_verbose > 1) { @@ -1759,6 +1835,7 @@ static void *accept_thread(void *ignore) int as; struct sockaddr_in sin; socklen_t sinlen; + struct eventqent *eqe; struct mansession *s, *prev=NULL, *next; struct protoent *p; int arg = 1; @@ -1779,6 +1856,7 @@ static void *accept_thread(void *ignore) while(s) { next = s->next; if (s->sessiontimeout && (now > s->sessiontimeout) && !s->inuse) { + num_sessions--; if (prev) prev->next = next; else @@ -1792,6 +1870,14 @@ static void *accept_thread(void *ignore) prev = s; s = next; } + /* Purge master event queue of old, unused events, but make sure we + always keep at least one in the queue */ + eqe = master_eventq; + while (master_eventq->next && !master_eventq->usecount) { + eqe = master_eventq; + master_eventq = master_eventq->next; + free(eqe); + } ast_mutex_unlock(&sessionlock); sinlen = sizeof(sin); @@ -1831,8 +1917,17 @@ static void *accept_thread(void *ignore) s->fd = as; s->send_events = -1; ast_mutex_lock(&sessionlock); + num_sessions++; s->next = sessions; sessions = s; + /* Find the last place in the master event queue and hook ourselves + in there */ + s->eventq = master_eventq; + while(s->eventq->next) + s->eventq = s->eventq->next; + ast_mutex_lock(&s->eventq->lock); + s->eventq->usecount++; + ast_mutex_unlock(&s->eventq->lock); ast_mutex_unlock(&sessionlock); if (ast_pthread_create(&s->t, &attr, session_do, s)) destroy_session(s); @@ -1841,21 +1936,24 @@ static void *accept_thread(void *ignore) return NULL; } -static int append_event(struct mansession *s, const char *str) +static int append_event(const char *str, int category) { struct eventqent *tmp, *prev=NULL; tmp = malloc(sizeof(struct eventqent) + strlen(str)); if (tmp) { + ast_mutex_init(&tmp->lock); tmp->next = NULL; + tmp->category = category; strcpy(tmp->eventdata, str); - if (s->eventq) { - prev = s->eventq; + if (master_eventq) { + prev = master_eventq; while(prev->next) prev = prev->next; prev->next = tmp; } else { - s->eventq = tmp; + master_eventq = tmp; } + tmp->usecount = num_sessions; return 0; } return -1; @@ -1870,45 +1968,33 @@ int manager_event(int category, const char *event, const char *fmt, ...) char *tmp_next = tmp; size_t tmp_left = sizeof(tmp) - 2; va_list ap; + struct timeval now; + /* Abort if there aren't any manager sessions */ + if (!num_sessions) + return 0; + + ast_build_string(&tmp_next, &tmp_left, "Event: %s\r\nPrivilege: %s\r\n", + event, authority_to_str(category, auth, sizeof(auth))); + if (timestampevents) { + now = ast_tvnow(); + ast_build_string(&tmp_next, &tmp_left, "Timestamp: %ld.%06lu\r\n", + now.tv_sec, (unsigned long) now.tv_usec); + } + va_start(ap, fmt); + ast_build_string_va(&tmp_next, &tmp_left, fmt, ap); + va_end(ap); + *tmp_next++ = '\r'; + *tmp_next++ = '\n'; + *tmp_next = '\0'; + ast_mutex_lock(&sessionlock); + /* Append even to master list and wake up any sleeping sessions */ + append_event(tmp, category); for (s = sessions; s; s = s->next) { - if ((s->readperm & category) != category) - continue; - - if ((s->send_events & category) != category) - continue; - - if (ast_strlen_zero(tmp)) { - struct timeval now; - - ast_build_string(&tmp_next, &tmp_left, "Event: %s\r\nPrivilege: %s\r\n", - event, authority_to_str(category, auth, sizeof(auth))); - if (timestampevents) { - now = ast_tvnow(); - ast_build_string(&tmp_next, &tmp_left, "Timestamp: %ld.%06lu\r\n", - now.tv_sec, (unsigned long) now.tv_usec); - } - va_start(ap, fmt); - ast_build_string_va(&tmp_next, &tmp_left, fmt, ap); - va_end(ap); - *tmp_next++ = '\r'; - *tmp_next++ = '\n'; - *tmp_next = '\0'; - } - ast_mutex_lock(&s->__lock); - if (s->busy) { - append_event(s, tmp); - if (s->waiting_thread != AST_PTHREADT_NULL) - pthread_kill(s->waiting_thread, SIGURG); - } else if (!s->dead && !s->sessiontimeout) { - if (ast_carefulwrite(s->fd, tmp, tmp_next - tmp, s->writetimeout) < 0) { - ast_log(LOG_WARNING, "Disconnecting slow (or gone) manager session!\n"); - s->dead = 1; - pthread_kill(s->t, SIGURG); - } - } + if (s->waiting_thread != AST_PTHREADT_NULL) + pthread_kill(s->waiting_thread, SIGURG); ast_mutex_unlock(&s->__lock); } ast_mutex_unlock(&sessionlock); @@ -2084,12 +2170,23 @@ static char *generic_http_callback(int format, struct sockaddr_in *requestor, co s->managerid = rand() | (unsigned long)s; s->next = sessions; sessions = s; + num_sessions++; + /* Hook into the last spot in the event queue */ + s->eventq = master_eventq; + while(s->eventq->next) + s->eventq = s->eventq->next; + ast_mutex_lock(&s->eventq->lock); + s->eventq->usecount++; + ast_mutex_unlock(&s->eventq->lock); ast_mutex_unlock(&sessionlock); } - /* Reset HTTP timeout */ + /* Reset HTTP timeout. If we're not yet authenticated, keep it extremely short */ time(&s->sessiontimeout); - s->sessiontimeout += httptimeout; + if (!s->authenticated && (httptimeout > 5)) + s->sessiontimeout += 5; + else + s->sessiontimeout += httptimeout; ast_mutex_unlock(&s->__lock); memset(&m, 0, sizeof(m)); @@ -2248,8 +2345,11 @@ int init_manager(void) ast_cli_register(&show_mancmd_cli); ast_cli_register(&show_mancmds_cli); ast_cli_register(&show_manconn_cli); + ast_cli_register(&show_maneventq_cli); ast_extension_state_add(NULL, NULL, manager_state_cb, NULL); registered = 1; + /* Append placeholder event so master_eventq never runs dry */ + append_event("Event: Placeholder\r\n\r\n", 0); } portno = DEFAULT_MANAGER_PORT; displayconnects = 1; |