diff options
author | tilghman <tilghman@f38db490-d61c-443f-a65b-d21fe96a405b> | 2010-03-27 06:09:26 +0000 |
---|---|---|
committer | tilghman <tilghman@f38db490-d61c-443f-a65b-d21fe96a405b> | 2010-03-27 06:09:26 +0000 |
commit | 29ba1f4953ea48dd9386cb5a3bfac0e64d778b65 (patch) | |
tree | 96931850e3087aa09fa1e02b02eca6d96a48664b /pbx | |
parent | 5aa915559be5636a24609015debc7b349b122414 (diff) |
inotify support for pbx_spool
This should give a good speed boost, in that one particular thread isn't waking
up once a second to read directory contents.
Reviewboard: https://reviewboard.asterisk.org/r/137/
git-svn-id: http://svn.digium.com/svn/asterisk/trunk@255117 f38db490-d61c-443f-a65b-d21fe96a405b
Diffstat (limited to 'pbx')
-rw-r--r-- | pbx/pbx_spool.c | 147 |
1 files changed, 143 insertions, 4 deletions
diff --git a/pbx/pbx_spool.c b/pbx/pbx_spool.c index fddba86f5..a6871ecd7 100644 --- a/pbx/pbx_spool.c +++ b/pbx/pbx_spool.c @@ -1,7 +1,7 @@ /* * Asterisk -- An open source telephony toolkit. * - * Copyright (C) 1999 - 2005, Digium, Inc. + * Copyright (C) 1999 - 2010, Digium, Inc. * * Mark Spencer <markster@digium.com> * @@ -30,6 +30,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include <time.h> #include <utime.h> #include <dirent.h> +#ifdef HAVE_INOTIFY +#include <sys/inotify.h> +#endif #include "asterisk/paths.h" /* use ast_config_AST_SPOOL_DIR */ #include "asterisk/lock.h" @@ -107,7 +110,7 @@ static void free_outgoing(struct outgoing *o) ast_free(o); } -static int apply_outgoing(struct outgoing *o, char *fn, FILE *f) +static int apply_outgoing(struct outgoing *o, const char *fn, FILE *f) { char buf[256]; char *c, *c2; @@ -364,7 +367,7 @@ static void launch_service(struct outgoing *o) } } -static int scan_service(char *fn, time_t now, time_t atime) +static int scan_service(const char *fn, time_t now) { struct outgoing *o = NULL; FILE *f; @@ -432,6 +435,141 @@ static int scan_service(char *fn, time_t now, time_t atime) return res; } +#ifdef HAVE_INOTIFY +struct direntry { + AST_LIST_ENTRY(direntry) list; + time_t mtime; + char name[0]; +}; + +/* Only one thread is accessing this list, so no lock is necessary */ +static AST_LIST_HEAD_NOLOCK_STATIC(dirlist, direntry); + +static void queue_file(const char *filename, time_t when) +{ + struct stat st; + struct direntry *cur, *new; + int res; + time_t now = time(NULL); + + if (filename[0] != '/') { + char *fn = alloca(strlen(qdir) + strlen(filename) + 2); + sprintf(fn, "%s/%s", qdir, filename); /* SAFE */ + filename = fn; + } + + if (when == 0) { + if (stat(filename, &st)) { + ast_log(LOG_WARNING, "Unable to stat %s: %s\n", filename, strerror(errno)); + return; + } + + if (!S_ISREG(st.st_mode)) { + return; + } + + when = st.st_mtime; + } + + if ((res = when) > now || (res = scan_service(filename, now)) > 0) { + if (!(new = ast_calloc(1, sizeof(*new) + strlen(filename) + 1))) { + return; + } + new->mtime = res; + strcpy(new->name, filename); + /* List is ordered by mtime */ + if (AST_LIST_EMPTY(&dirlist)) { + AST_LIST_INSERT_HEAD(&dirlist, new, list); + } else { + int found = 0; + AST_LIST_TRAVERSE_SAFE_BEGIN(&dirlist, cur, list) { + if (cur->mtime > new->mtime) { + AST_LIST_INSERT_BEFORE_CURRENT(new, list); + found = 1; + break; + } + } + AST_LIST_TRAVERSE_SAFE_END + if (!found) { + AST_LIST_INSERT_TAIL(&dirlist, new, list); + } + } + } +} + +static void *scan_thread(void *unused) +{ + DIR *dir; + struct dirent *de; + int res; + time_t now; + struct timespec ts = { .tv_sec = 1 }; + int inotify_fd = inotify_init(); + struct direntry *cur; + struct { + struct inotify_event iev; + /* It may not look like we're using this element, but when we read + * from inotify_fd, the event is typically larger than the first + * struct, and overflows into this second one. */ + char name[FILENAME_MAX + 1]; + } buf; + struct pollfd pfd = { .fd = inotify_fd, .events = POLLIN }; + + while (!ast_fully_booted) { + nanosleep(&ts, NULL); + } + + if (inotify_fd < 0) { + ast_log(LOG_ERROR, "Unable to initialize inotify(7)\n"); + return NULL; + } + + inotify_add_watch(inotify_fd, qdir, IN_CREATE | IN_ATTRIB | IN_MOVED_TO); + + /* First, run through the directory and clear existing entries */ + if (!(dir = opendir(qdir))) { + ast_log(LOG_ERROR, "Unable to open directory %s: %s\n", qdir, strerror(errno)); + return NULL; + } + + now = time(NULL); + while ((de = readdir(dir))) { + queue_file(de->d_name, 0); + } + closedir(dir); + + /* Wait for either a) next timestamp to occur, or b) a change to happen */ + for (;/* ever */;) { + time_t next = AST_LIST_EMPTY(&dirlist) ? INT_MAX : AST_LIST_FIRST(&dirlist)->mtime; + + time(&now); + if (next > now) { + int stage = 0; + /* Convert from seconds to milliseconds, unless there's nothing + * in the queue already, in which case, we wait forever. */ + int waittime = next == INT_MAX ? -1 : (next - now) * 1000; + /* When a file arrives, add it to the queue, in mtime order. */ + if ((res = poll(&pfd, 1, waittime)) > 0 && (stage = 1) && + (res = read(inotify_fd, &buf, sizeof(buf))) >= sizeof(buf.iev)) { + /* File added to directory, add it to my list */ + queue_file(buf.iev.name, 0); + } else if (res < 0 && errno != EINTR && errno != EAGAIN) { + ast_debug(1, "Got an error back from %s(2): %s\n", stage ? "read" : "poll", strerror(errno)); + } + time(&now); + } + + /* Empty the list of all entries ready to be processed */ + while (!AST_LIST_EMPTY(&dirlist) && AST_LIST_FIRST(&dirlist)->mtime <= now) { + cur = AST_LIST_REMOVE_HEAD(&dirlist, list); + queue_file(cur->name, cur->mtime); + ast_free(cur); + } + } + return NULL; +} + +#else static void *scan_thread(void *unused) { struct stat st; @@ -481,7 +619,7 @@ static void *scan_thread(void *unused) if (!S_ISREG(st.st_mode)) continue; if (st.st_mtime <= now) { - res = scan_service(fn, now, st.st_atime); + res = scan_service(fn, now); if (res > 0) { /* Update next service time */ if (!next || (res < next)) { @@ -503,6 +641,7 @@ static void *scan_thread(void *unused) } return NULL; } +#endif static int unload_module(void) { |