aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortilghman <tilghman@f38db490-d61c-443f-a65b-d21fe96a405b>2010-03-27 06:09:26 +0000
committertilghman <tilghman@f38db490-d61c-443f-a65b-d21fe96a405b>2010-03-27 06:09:26 +0000
commit29ba1f4953ea48dd9386cb5a3bfac0e64d778b65 (patch)
tree96931850e3087aa09fa1e02b02eca6d96a48664b
parent5aa915559be5636a24609015debc7b349b122414 (diff)
inotify support for pbx_spool
This should give a good speed boost, in that one particular thread isn't waking up once a second to read directory contents. Reviewboard: https://reviewboard.asterisk.org/r/137/ git-svn-id: http://svn.digium.com/svn/asterisk/trunk@255117 f38db490-d61c-443f-a65b-d21fe96a405b
-rw-r--r--pbx/pbx_spool.c147
1 files changed, 143 insertions, 4 deletions
diff --git a/pbx/pbx_spool.c b/pbx/pbx_spool.c
index fddba86f5..a6871ecd7 100644
--- a/pbx/pbx_spool.c
+++ b/pbx/pbx_spool.c
@@ -1,7 +1,7 @@
/*
* Asterisk -- An open source telephony toolkit.
*
- * Copyright (C) 1999 - 2005, Digium, Inc.
+ * Copyright (C) 1999 - 2010, Digium, Inc.
*
* Mark Spencer <markster@digium.com>
*
@@ -30,6 +30,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include <time.h>
#include <utime.h>
#include <dirent.h>
+#ifdef HAVE_INOTIFY
+#include <sys/inotify.h>
+#endif
#include "asterisk/paths.h" /* use ast_config_AST_SPOOL_DIR */
#include "asterisk/lock.h"
@@ -107,7 +110,7 @@ static void free_outgoing(struct outgoing *o)
ast_free(o);
}
-static int apply_outgoing(struct outgoing *o, char *fn, FILE *f)
+static int apply_outgoing(struct outgoing *o, const char *fn, FILE *f)
{
char buf[256];
char *c, *c2;
@@ -364,7 +367,7 @@ static void launch_service(struct outgoing *o)
}
}
-static int scan_service(char *fn, time_t now, time_t atime)
+static int scan_service(const char *fn, time_t now)
{
struct outgoing *o = NULL;
FILE *f;
@@ -432,6 +435,141 @@ static int scan_service(char *fn, time_t now, time_t atime)
return res;
}
+#ifdef HAVE_INOTIFY
+struct direntry {
+ AST_LIST_ENTRY(direntry) list;
+ time_t mtime;
+ char name[0];
+};
+
+/* Only one thread is accessing this list, so no lock is necessary */
+static AST_LIST_HEAD_NOLOCK_STATIC(dirlist, direntry);
+
+static void queue_file(const char *filename, time_t when)
+{
+ struct stat st;
+ struct direntry *cur, *new;
+ int res;
+ time_t now = time(NULL);
+
+ if (filename[0] != '/') {
+ char *fn = alloca(strlen(qdir) + strlen(filename) + 2);
+ sprintf(fn, "%s/%s", qdir, filename); /* SAFE */
+ filename = fn;
+ }
+
+ if (when == 0) {
+ if (stat(filename, &st)) {
+ ast_log(LOG_WARNING, "Unable to stat %s: %s\n", filename, strerror(errno));
+ return;
+ }
+
+ if (!S_ISREG(st.st_mode)) {
+ return;
+ }
+
+ when = st.st_mtime;
+ }
+
+ if ((res = when) > now || (res = scan_service(filename, now)) > 0) {
+ if (!(new = ast_calloc(1, sizeof(*new) + strlen(filename) + 1))) {
+ return;
+ }
+ new->mtime = res;
+ strcpy(new->name, filename);
+ /* List is ordered by mtime */
+ if (AST_LIST_EMPTY(&dirlist)) {
+ AST_LIST_INSERT_HEAD(&dirlist, new, list);
+ } else {
+ int found = 0;
+ AST_LIST_TRAVERSE_SAFE_BEGIN(&dirlist, cur, list) {
+ if (cur->mtime > new->mtime) {
+ AST_LIST_INSERT_BEFORE_CURRENT(new, list);
+ found = 1;
+ break;
+ }
+ }
+ AST_LIST_TRAVERSE_SAFE_END
+ if (!found) {
+ AST_LIST_INSERT_TAIL(&dirlist, new, list);
+ }
+ }
+ }
+}
+
+static void *scan_thread(void *unused)
+{
+ DIR *dir;
+ struct dirent *de;
+ int res;
+ time_t now;
+ struct timespec ts = { .tv_sec = 1 };
+ int inotify_fd = inotify_init();
+ struct direntry *cur;
+ struct {
+ struct inotify_event iev;
+ /* It may not look like we're using this element, but when we read
+ * from inotify_fd, the event is typically larger than the first
+ * struct, and overflows into this second one. */
+ char name[FILENAME_MAX + 1];
+ } buf;
+ struct pollfd pfd = { .fd = inotify_fd, .events = POLLIN };
+
+ while (!ast_fully_booted) {
+ nanosleep(&ts, NULL);
+ }
+
+ if (inotify_fd < 0) {
+ ast_log(LOG_ERROR, "Unable to initialize inotify(7)\n");
+ return NULL;
+ }
+
+ inotify_add_watch(inotify_fd, qdir, IN_CREATE | IN_ATTRIB | IN_MOVED_TO);
+
+ /* First, run through the directory and clear existing entries */
+ if (!(dir = opendir(qdir))) {
+ ast_log(LOG_ERROR, "Unable to open directory %s: %s\n", qdir, strerror(errno));
+ return NULL;
+ }
+
+ now = time(NULL);
+ while ((de = readdir(dir))) {
+ queue_file(de->d_name, 0);
+ }
+ closedir(dir);
+
+ /* Wait for either a) next timestamp to occur, or b) a change to happen */
+ for (;/* ever */;) {
+ time_t next = AST_LIST_EMPTY(&dirlist) ? INT_MAX : AST_LIST_FIRST(&dirlist)->mtime;
+
+ time(&now);
+ if (next > now) {
+ int stage = 0;
+ /* Convert from seconds to milliseconds, unless there's nothing
+ * in the queue already, in which case, we wait forever. */
+ int waittime = next == INT_MAX ? -1 : (next - now) * 1000;
+ /* When a file arrives, add it to the queue, in mtime order. */
+ if ((res = poll(&pfd, 1, waittime)) > 0 && (stage = 1) &&
+ (res = read(inotify_fd, &buf, sizeof(buf))) >= sizeof(buf.iev)) {
+ /* File added to directory, add it to my list */
+ queue_file(buf.iev.name, 0);
+ } else if (res < 0 && errno != EINTR && errno != EAGAIN) {
+ ast_debug(1, "Got an error back from %s(2): %s\n", stage ? "read" : "poll", strerror(errno));
+ }
+ time(&now);
+ }
+
+ /* Empty the list of all entries ready to be processed */
+ while (!AST_LIST_EMPTY(&dirlist) && AST_LIST_FIRST(&dirlist)->mtime <= now) {
+ cur = AST_LIST_REMOVE_HEAD(&dirlist, list);
+ queue_file(cur->name, cur->mtime);
+ ast_free(cur);
+ }
+ }
+ return NULL;
+}
+
+#else
static void *scan_thread(void *unused)
{
struct stat st;
@@ -481,7 +619,7 @@ static void *scan_thread(void *unused)
if (!S_ISREG(st.st_mode))
continue;
if (st.st_mtime <= now) {
- res = scan_service(fn, now, st.st_atime);
+ res = scan_service(fn, now);
if (res > 0) {
/* Update next service time */
if (!next || (res < next)) {
@@ -503,6 +641,7 @@ static void *scan_thread(void *unused)
}
return NULL;
}
+#endif
static int unload_module(void)
{