aboutsummaryrefslogtreecommitdiffstats
path: root/main
diff options
context:
space:
mode:
authorrussell <russell@f38db490-d61c-443f-a65b-d21fe96a405b>2008-06-10 15:12:17 +0000
committerrussell <russell@f38db490-d61c-443f-a65b-d21fe96a405b>2008-06-10 15:12:17 +0000
commit6195ff1afd2d86d16ce6179327b6ccd8862c898e (patch)
treed87634ac06f4e43877c9790c9f70d61d70b246f1 /main
parent639a4bf7e4bf9917deb652e7b0469e33095a0596 (diff)
Merge another big set of changes from team/russell/events
This commit merges in the rest of the code needed to support distributed device state. There are two main parts to this commit. Core changes: - The device state handling in the core has been updated to understand device state across a cluster of Asterisk servers. Every time the state of a device changes, it looks at all of the device states on each node, and determines the aggregate device state. That resulting device state is what is provided to modules in Asterisk that take actions based on the state of a device. New module, res_ais: - A module has been written to facilitate the communication of events between nodes in a cluster of Asterisk servers. This module uses the SAForum AIS (Service Availability Forum Application Interface Specification) CLM and EVT services (Cluster Management and Event) to handle this task. This module currently supports sharing Voicemail MWI (Message Waiting Indication) and device state events between servers. It has been tested with openais, though other implementations of the spec do exist. For more information on testing distributed device state, see the following doc: - doc/distributed_devstate.txt git-svn-id: http://svn.digium.com/svn/asterisk/trunk@121559 f38db490-d61c-443f-a65b-d21fe96a405b
Diffstat (limited to 'main')
-rw-r--r--main/devicestate.c215
-rw-r--r--main/pbx.c2
2 files changed, 211 insertions, 6 deletions
diff --git a/main/devicestate.c b/main/devicestate.c
index 85ac6492e..8c20d1703 100644
--- a/main/devicestate.c
+++ b/main/devicestate.c
@@ -1,9 +1,10 @@
/*
* Asterisk -- An open source telephony toolkit.
*
- * Copyright (C) 1999 - 2007, Digium, Inc.
+ * Copyright (C) 1999 - 2008, Digium, Inc.
*
* Mark Spencer <markster@digium.com>
+ * Russell Bryant <russell@digium.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
@@ -20,14 +21,18 @@
*
* \brief Device state management
*
- *
* \author Mark Spencer <markster@digium.com>
+ * \author Russell Bryant <russell@digium.com>
*
* \arg \ref AstExtState
*/
/*! \page AstExtState Extension and device states in Asterisk
*
+ * (Note that these descriptions of device states and extension
+ * states have not been updated to the way things work
+ * in Asterisk 1.6.)
+ *
* Asterisk has an internal system that reports states
* for an extension. By using the dialplan priority -1,
* also called a \b hint, a connection can be made from an
@@ -171,6 +176,23 @@ enum devstate_cache {
CACHE_OFF,
};
+struct devstate_change {
+ AST_LIST_ENTRY(devstate_change) entry;
+ uint32_t state;
+ struct ast_eid eid;
+ char device[1];
+};
+
+struct {
+ pthread_t thread;
+ struct ast_event_sub *event_sub;
+ ast_cond_t cond;
+ ast_mutex_t lock;
+ AST_LIST_HEAD_NOLOCK(, devstate_change) devstate_change_q;
+} devstate_collector = {
+ .thread = AST_PTHREADT_NULL,
+};
+
/* Forward declarations */
static int getproviderstate(const char *provider, const char *address);
@@ -271,7 +293,7 @@ static enum ast_device_state devstate_cached(const char *device)
enum ast_device_state res = AST_DEVICE_UNKNOWN;
struct ast_event *event;
- event = ast_event_get_cached(AST_EVENT_DEVICE_STATE,
+ event = ast_event_get_cached(AST_EVENT_DEVICE_STATE_CHANGE,
AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device,
AST_EVENT_IE_END);
@@ -383,7 +405,6 @@ static int getproviderstate(const char *provider, const char *address)
struct devstate_prov *devprov;
int res = AST_DEVICE_INVALID;
-
AST_RWLIST_RDLOCK(&devstate_provs);
AST_RWLIST_TRAVERSE(&devstate_provs, devprov, list) {
ast_debug(5, "Checking provider %s with %s\n", devprov->label, provider);
@@ -394,6 +415,7 @@ static int getproviderstate(const char *provider, const char *address)
}
}
AST_RWLIST_UNLOCK(&devstate_provs);
+
return res;
}
@@ -401,7 +423,9 @@ static void devstate_event(const char *device, enum ast_device_state state, enum
{
struct ast_event *event;
- if (!(event = ast_event_new(AST_EVENT_DEVICE_STATE,
+ ast_debug(1, "device '%s' state '%d'\n", device, state);
+
+ if (!(event = ast_event_new(AST_EVENT_DEVICE_STATE_CHANGE,
AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device,
AST_EVENT_IE_STATE, AST_EVENT_IE_PLTYPE_UINT, state,
AST_EVENT_IE_END))) {
@@ -413,6 +437,7 @@ static void devstate_event(const char *device, enum ast_device_state state, enum
* device name if it exists. */
ast_event_queue_and_cache(event,
AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR,
+ AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, sizeof(struct ast_eid),
AST_EVENT_IE_END);
} else {
ast_event_queue(event);
@@ -540,9 +565,189 @@ static void *do_devstate_changes(void *data)
return NULL;
}
+static void destroy_devstate_change(struct devstate_change *sc)
+{
+ ast_free(sc);
+}
+
+#define MAX_SERVERS 64
+struct change_collection {
+ struct devstate_change states[MAX_SERVERS];
+ size_t num_states;
+};
+
+static void devstate_cache_cb(const struct ast_event *event, void *data)
+{
+ struct change_collection *collection = data;
+ int i;
+ const struct ast_eid *eid;
+
+ if (collection->num_states == ARRAY_LEN(collection->states)) {
+ ast_log(LOG_ERROR, "More per-server state values than we have room for (MAX_SERVERS is %d)\n",
+ MAX_SERVERS);
+ return;
+ }
+
+ if (!(eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) {
+ ast_log(LOG_ERROR, "Device state change event with no EID\n");
+ return;
+ }
+
+ i = collection->num_states;
+
+ collection->states[i].state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
+ collection->states[i].eid = *eid;
+
+ collection->num_states++;
+}
+
+static void process_collection(const char *device, struct change_collection *collection)
+{
+ int i;
+ struct ast_devstate_aggregate agg;
+ enum ast_device_state state;
+ struct ast_event *event;
+
+ ast_devstate_aggregate_init(&agg);
+
+ for (i = 0; i < collection->num_states; i++) {
+ ast_debug(1, "Adding per-server state of '%s' for '%s'\n",
+ devstate2str(collection->states[i].state), device);
+ ast_devstate_aggregate_add(&agg, collection->states[i].state);
+ }
+
+ state = ast_devstate_aggregate_result(&agg);
+
+ ast_debug(1, "Aggregate devstate result is %d\n", state);
+
+ event = ast_event_get_cached(AST_EVENT_DEVICE_STATE,
+ AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device,
+ AST_EVENT_IE_END);
+
+ if (event) {
+ enum ast_device_state old_state;
+
+ old_state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
+
+ ast_event_destroy(event);
+
+ if (state == old_state) {
+ /* No change since last reported device state */
+ ast_debug(1, "Aggregate state for device '%s' has not changed from '%s'\n",
+ device, devstate2str(state));
+ return;
+ }
+ }
+
+ ast_debug(1, "Aggregate state for device '%s' has changed to '%s'\n",
+ device, devstate2str(state));
+
+ event = ast_event_new(AST_EVENT_DEVICE_STATE,
+ AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device,
+ AST_EVENT_IE_STATE, AST_EVENT_IE_PLTYPE_UINT, state,
+ AST_EVENT_IE_END);
+
+ if (!event)
+ return;
+
+ ast_event_queue_and_cache(event,
+ AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR,
+ AST_EVENT_IE_END);
+}
+
+static void handle_devstate_change(struct devstate_change *sc)
+{
+ struct ast_event_sub *tmp_sub;
+ struct change_collection collection = {
+ .num_states = 0,
+ };
+
+ ast_debug(1, "Processing device state change for '%s'\n", sc->device);
+
+ if (!(tmp_sub = ast_event_subscribe_new(AST_EVENT_DEVICE_STATE_CHANGE, devstate_cache_cb, &collection))) {
+ ast_log(LOG_ERROR, "Failed to create subscription\n");
+ return;
+ }
+
+ if (ast_event_sub_append_ie_str(tmp_sub, AST_EVENT_IE_DEVICE, sc->device)) {
+ ast_log(LOG_ERROR, "Failed to append device IE\n");
+ ast_event_sub_destroy(tmp_sub);
+ return;
+ }
+
+ /* Populate the collection of device states from the cache */
+ ast_event_dump_cache(tmp_sub);
+
+ process_collection(sc->device, &collection);
+
+ ast_event_sub_destroy(tmp_sub);
+}
+
+static void *run_devstate_collector(void *data)
+{
+ for (;;) {
+ struct devstate_change *sc;
+
+ ast_mutex_lock(&devstate_collector.lock);
+ while (!(sc = AST_LIST_REMOVE_HEAD(&devstate_collector.devstate_change_q, entry)))
+ ast_cond_wait(&devstate_collector.cond, &devstate_collector.lock);
+ ast_mutex_unlock(&devstate_collector.lock);
+
+ handle_devstate_change(sc);
+
+ destroy_devstate_change(sc);
+ }
+
+ return NULL;
+}
+
+static void devstate_change_collector_cb(const struct ast_event *event, void *data)
+{
+ struct devstate_change *sc;
+ const char *device;
+ const struct ast_eid *eid;
+ uint32_t state;
+
+ device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE);
+ eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
+ state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
+
+ if (ast_strlen_zero(device) || !eid) {
+ ast_log(LOG_ERROR, "Invalid device state change event received\n");
+ return;
+ }
+
+ if (!(sc = ast_calloc(1, sizeof(*sc) + strlen(device))))
+ return;
+
+ strcpy(sc->device, device);
+ sc->eid = *eid;
+ sc->state = state;
+
+ ast_mutex_lock(&devstate_collector.lock);
+ AST_LIST_INSERT_TAIL(&devstate_collector.devstate_change_q, sc, entry);
+ ast_cond_signal(&devstate_collector.cond);
+ ast_mutex_unlock(&devstate_collector.lock);
+}
+
/*! \brief Initialize the device state engine in separate thread */
int ast_device_state_engine_init(void)
{
+ devstate_collector.event_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE_CHANGE,
+ devstate_change_collector_cb, NULL, AST_EVENT_IE_END);
+
+ if (!devstate_collector.event_sub) {
+ ast_log(LOG_ERROR, "Failed to create subscription for the device state change collector\n");
+ return -1;
+ }
+
+ ast_mutex_init(&devstate_collector.lock);
+ ast_cond_init(&devstate_collector.cond, NULL);
+ if (ast_pthread_create_background(&devstate_collector.thread, NULL, run_devstate_collector, NULL) < 0) {
+ ast_log(LOG_ERROR, "Unable to start device state collector thread.\n");
+ return -1;
+ }
+
ast_cond_init(&change_pending, NULL);
if (ast_pthread_create_background(&change_thread, NULL, do_devstate_changes, NULL) < 0) {
ast_log(LOG_ERROR, "Unable to start device state change thread.\n");
diff --git a/main/pbx.c b/main/pbx.c
index fcc3fb3f0..6bb8a14f6 100644
--- a/main/pbx.c
+++ b/main/pbx.c
@@ -8070,7 +8070,7 @@ int load_pbx(void)
/* Register manager application */
ast_manager_register2("ShowDialPlan", EVENT_FLAG_CONFIG | EVENT_FLAG_REPORTING, manager_show_dialplan, "List dialplan", mandescr_show_dialplan);
- if (!(device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE, device_state_cb, NULL,
+ if (!(device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE_CHANGE, device_state_cb, NULL,
AST_EVENT_IE_END))) {
return -1;
}