aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--CHANGES26
-rw-r--r--cdr/cdr_pgsql.c269
2 files changed, 244 insertions, 51 deletions
diff --git a/CHANGES b/CHANGES
index 21ea3c2f1..c5480dfef 100644
--- a/CHANGES
+++ b/CHANGES
@@ -450,6 +450,29 @@ Logger changes
and to ensure that the oldest log file gets deleted.
* Added realtime support for the queue log
+Call Detail Records
+-------------------
+ * The cdr_manager module has a [mappings] feature, like cdr_custom,
+ to add fields to the manager event from the CDR variables.
+ * Added cdr_adaptive_odbc, a new module that adapts to the structure of your
+ backend database CDR table. Specifically, additional, non-standard
+ columns are supported, merely by setting the corresponding CDR variable in
+ your dialplan. In addition, you may alias any column to another name (for
+ example, if you want the 'src' CDR variable to be column 'ANI' in the DB,
+ simply "alias src => ANI" in the configuration file). Records may be
+ posted to more than one backend, simply by specifying multiple categories
+ in the configuration file. And finally, you may filter which CDRs get
+ posted to each backend, by specifying a filter (which the record must
+ match) for the particular category. Filters are additive (meaning all
+ rules must match to post that CDR).
+ * The Postgres CDR module now supports some features of the cdr_adaptive_odbc
+ module. Specifically, you may add additional columns into the table and
+ they will be set, if you set the corresponding CDR variable name. Also,
+ if you omit columns in your database table, they will be silently skipped
+ (but a record will still be inserted, based on what columns remain). Note
+ that the other two features from cdr_adaptive_odbc (alias and filter) are
+ not currently supported.
+
Miscellaneous New Modules
-------------------------
* Added a new CDR module, cdr_sqlite3_custom.
@@ -494,8 +517,6 @@ Miscellaneous
* Added maxfiles option to options section of asterisk.conf which allows you to specify
what Asterisk should set as the maximum number of open files when it loads.
* Added the jittertargetextra configuration option.
- * The cdr_manager module has a [mappings] feature, like cdr_custom,
- to add fields to the manager event from the CDR variables.
* Added support for setting the CoS for VLAN traffic (802.1p). See the sample
configuration files for the IP channel drivers. The new option is "cos".
This information is also documented in doc/qos.tex, or the IP Quality of Service
@@ -523,3 +544,4 @@ Miscellaneous
* Added a compiler flag, CHANNEL_TRACE, which permits channel tracing to be
turned on, via the CHANNEL(trace) dialplan function. Could be useful for
dialplan debugging.
+
diff --git a/cdr/cdr_pgsql.c b/cdr/cdr_pgsql.c
index eea4416a5..8ec1105ae 100644
--- a/cdr/cdr_pgsql.c
+++ b/cdr/cdr_pgsql.c
@@ -50,29 +50,68 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/cdr.h"
#include "asterisk/module.h"
-#define DATE_FORMAT "%Y-%m-%d %T"
+#define DATE_FORMAT "'%Y-%m-%d %T'"
static char *name = "pgsql";
static char *config = "cdr_pgsql.conf";
static char *pghostname = NULL, *pgdbname = NULL, *pgdbuser = NULL, *pgpassword = NULL, *pgdbport = NULL, *table = NULL;
static int connected = 0;
+static int maxsize = 512, maxsize2 = 512;
AST_MUTEX_DEFINE_STATIC(pgsql_lock);
static PGconn *conn = NULL;
+struct columns {
+ char *name;
+ char *type;
+ int len;
+ AST_RWLIST_ENTRY(columns) list;
+};
+
+static AST_RWLIST_HEAD_STATIC(psql_columns, columns);
+
+#define LENGTHEN_BUF1(size) \
+ do { \
+ /* Lengthen buffer, if necessary */ \
+ if ((newsize = lensql + (size) + 3) > sizesql) { \
+ if ((tmp = ast_realloc(sql, (newsize / 512 + 1) * 512))) { \
+ sql = tmp; \
+ sizesql = (newsize / 512 + 1) * 512; \
+ } else { \
+ ast_log(LOG_ERROR, "Unable to allocate sufficient memory. Insert CDR failed.\n"); \
+ ast_free(sql); \
+ ast_free(sql2); \
+ AST_RWLIST_UNLOCK(&psql_columns); \
+ return -1; \
+ } \
+ } \
+ } while (0)
+
+#define LENGTHEN_BUF2(size) \
+ do { \
+ if ((newsize = lensql2 + (size) + 3) > sizesql2) { \
+ if ((tmp = ast_realloc(sql2, (newsize / 512 + 1) * 512))) { \
+ sql2 = tmp; \
+ sizesql2 = (newsize / 512 + 1) * 512; \
+ } else { \
+ ast_log(LOG_ERROR, "Unable to allocate sufficient memory. Insert CDR failed.\n"); \
+ ast_free(sql); \
+ ast_free(sql2); \
+ AST_RWLIST_UNLOCK(&psql_columns); \
+ return -1; \
+ } \
+ } \
+ } while (0)
+
static int pgsql_log(struct ast_cdr *cdr)
{
struct ast_tm tm;
- char sqlcmd[2048] = "", timestr[128];
char *pgerror;
PGresult *result;
ast_mutex_lock(&pgsql_lock);
- ast_localtime(&cdr->start, &tm, NULL);
- ast_strftime(timestr, sizeof(timestr), DATE_FORMAT, &tm);
-
if ((!connected) && pghostname && pgdbuser && pgpassword && pgdbname) {
conn = PQsetdbLogin(pghostname, pgdbport, NULL, NULL, pgdbname, pgdbuser, pgpassword);
if (PQstatus(conn) != CONNECTION_BAD) {
@@ -87,49 +126,135 @@ static int pgsql_log(struct ast_cdr *cdr)
}
if (connected) {
- char *clid=NULL, *dcontext=NULL, *channel=NULL, *dstchannel=NULL, *lastapp=NULL, *lastdata=NULL;
- char *src=NULL, *dst=NULL, *uniqueid=NULL, *userfield=NULL;
- int pgerr;
-
- /* Maximum space needed would be if all characters needed to be escaped, plus a trailing NULL */
- if ((clid = alloca(strlen(cdr->clid) * 2 + 1)) != NULL)
- PQescapeStringConn(conn, clid, cdr->clid, strlen(cdr->clid), &pgerr);
- if ((dcontext = alloca(strlen(cdr->dcontext) * 2 + 1)) != NULL)
- PQescapeStringConn(conn, dcontext, cdr->dcontext, strlen(cdr->dcontext), &pgerr);
- if ((channel = alloca(strlen(cdr->channel) * 2 + 1)) != NULL)
- PQescapeStringConn(conn, channel, cdr->channel, strlen(cdr->channel), &pgerr);
- if ((dstchannel = alloca(strlen(cdr->dstchannel) * 2 + 1)) != NULL)
- PQescapeStringConn(conn, dstchannel, cdr->dstchannel, strlen(cdr->dstchannel), &pgerr);
- if ((lastapp = alloca(strlen(cdr->lastapp) * 2 + 1)) != NULL)
- PQescapeStringConn(conn, lastapp, cdr->lastapp, strlen(cdr->lastapp), &pgerr);
- if ((lastdata = alloca(strlen(cdr->lastdata) * 2 + 1)) != NULL)
- PQescapeStringConn(conn, lastdata, cdr->lastdata, strlen(cdr->lastdata), &pgerr);
- if ((uniqueid = alloca(strlen(cdr->uniqueid) * 2 + 1)) != NULL)
- PQescapeStringConn(conn, uniqueid, cdr->uniqueid, strlen(cdr->uniqueid), &pgerr);
- if ((userfield = alloca(strlen(cdr->userfield) * 2 + 1)) != NULL)
- PQescapeStringConn(conn, userfield, cdr->userfield, strlen(cdr->userfield), &pgerr);
- if ((src = alloca(strlen(cdr->src) * 2 + 1)) != NULL)
- PQescapeStringConn(conn, src, cdr->src, strlen(cdr->src), &pgerr);
- if ((dst = alloca(strlen(cdr->dst) * 2 + 1)) != NULL)
- PQescapeStringConn(conn, dst, cdr->dst, strlen(cdr->dst), &pgerr);
-
- /* Check for all alloca failures above at once */
- if ((!clid) || (!dcontext) || (!channel) || (!dstchannel) || (!lastapp) || (!lastdata) || (!uniqueid) || (!userfield) || (!src) || (!dst)) {
- ast_log(LOG_ERROR, "cdr_pgsql: Out of memory error (insert fails)\n");
- ast_mutex_unlock(&pgsql_lock);
- return -1;
- }
+ struct columns *cur;
+ int lensql, lensql2, sizesql = maxsize, sizesql2 = maxsize2, newsize;
+ char *sql = ast_calloc(sizeof(char), sizesql), *sql2 = ast_calloc(sizeof(char), sizesql2), *tmp, *value;
+ char buf[257], escapebuf[513];
+
+ lensql = snprintf(sql, sizesql, "INSERT INTO %s (", table);
+ lensql2 = snprintf(sql2, sizesql2, " VALUES (");
+
+ AST_RWLIST_RDLOCK(&psql_columns);
+ AST_RWLIST_TRAVERSE(&psql_columns, cur, list) {
+ /* For fields not set, simply skip them */
+ ast_cdr_getvar(cdr, cur->name, &value, buf, sizeof(buf), 0, 0);
+ if (!value)
+ continue;
+
+ LENGTHEN_BUF1(strlen(cur->name));
+ lensql += snprintf(sql + lensql, sizesql - lensql, "%s,", cur->name);
+
+ if (strcmp(cur->name, "start") == 0 || strcmp(cur->name, "calldate") == 0) {
+ if (strncmp(cur->type, "int", 3) == 0) {
+ LENGTHEN_BUF2(12);
+ lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%ld", cdr->start.tv_sec);
+ } else if (strncmp(cur->type, "float", 5) == 0) {
+ LENGTHEN_BUF2(30);
+ lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%f", (double)cdr->start.tv_sec + (double)cdr->start.tv_usec / 1000000.0);
+ } else {
+ /* char, hopefully */
+ LENGTHEN_BUF2(30);
+ ast_localtime(&cdr->start, &tm, NULL);
+ lensql2 += ast_strftime(sql2 + lensql2, sizesql2 - lensql2, DATE_FORMAT, &tm);
+ }
+ } else if (strcmp(cur->name, "answer") == 0) {
+ if (strncmp(cur->type, "int", 3) == 0) {
+ LENGTHEN_BUF2(12);
+ lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%ld", cdr->answer.tv_sec);
+ } else if (strncmp(cur->type, "float", 5) == 0) {
+ LENGTHEN_BUF2(30);
+ lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%f", (double)cdr->answer.tv_sec + (double)cdr->answer.tv_usec / 1000000.0);
+ } else {
+ /* char, hopefully */
+ LENGTHEN_BUF2(30);
+ ast_localtime(&cdr->start, &tm, NULL);
+ lensql2 += ast_strftime(sql2 + lensql2, sizesql2 - lensql2, DATE_FORMAT, &tm);
+ }
+ } else if (strcmp(cur->name, "end") == 0) {
+ if (strncmp(cur->type, "int", 3) == 0) {
+ LENGTHEN_BUF2(12);
+ lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%ld", cdr->end.tv_sec);
+ } else if (strncmp(cur->type, "float", 5) == 0) {
+ LENGTHEN_BUF2(30);
+ lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%f", (double)cdr->end.tv_sec + (double)cdr->end.tv_usec / 1000000.0);
+ } else {
+ /* char, hopefully */
+ LENGTHEN_BUF2(30);
+ ast_localtime(&cdr->end, &tm, NULL);
+ lensql2 += ast_strftime(sql2 + lensql2, sizesql2 - lensql2, DATE_FORMAT, &tm);
+ }
+ } else if (strcmp(cur->name, "duration") == 0 || strcmp(cur->name, "billsec") == 0) {
+ if (cur->type[0] == 'i') {
+ /* Get integer, no need to escape anything */
+ ast_cdr_getvar(cdr, cur->name, &value, buf, sizeof(buf), 0, 0);
+ LENGTHEN_BUF2(12);
+ lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%s", value);
+ } else if (strncmp(cur->type, "float", 5) == 0) {
+ struct timeval *tv = cur->name[0] == 'd' ? &cdr->start : &cdr->answer;
+ LENGTHEN_BUF2(30);
+ lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%f", (double)cdr->end.tv_sec - tv->tv_sec + cdr->end.tv_usec / 1000000.0 - tv->tv_usec / 1000000.0);
+ } else {
+ /* Char field, probably */
+ struct timeval *tv = cur->name[0] == 'd' ? &cdr->start : &cdr->answer;
+ LENGTHEN_BUF2(30);
+ lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "'%f'", (double)cdr->end.tv_sec - tv->tv_sec + cdr->end.tv_usec / 1000000.0 - tv->tv_usec / 1000000.0);
+ }
+ } else if (strcmp(cur->name, "disposition") == 0 || strcmp(cur->name, "amaflags") == 0) {
+ if (strncmp(cur->type, "int", 3) == 0) {
+ /* Integer, no need to escape anything */
+ ast_cdr_getvar(cdr, cur->name, &value, buf, sizeof(buf), 0, 1);
+ LENGTHEN_BUF2(12);
+ lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%s", value);
+ } else {
+ /* Although this is a char field, there are no special characters in the values for these fields */
+ ast_cdr_getvar(cdr, cur->name, &value, buf, sizeof(buf), 0, 0);
+ LENGTHEN_BUF2(30);
+ lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "'%s'", value);
+ }
+ } else {
+ /* Arbitrary field, could be anything */
+ ast_cdr_getvar(cdr, cur->name, &value, buf, sizeof(buf), 0, 0);
+ if (strncmp(cur->type, "int", 3) == 0) {
+ long long whatever;
+ if (value && sscanf(value, "%lld", &whatever) == 1) {
+ LENGTHEN_BUF2(25);
+ lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%lld", whatever);
+ } else {
+ LENGTHEN_BUF2(1);
+ lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "0");
+ }
+ } else if (strncmp(cur->type, "float", 5) == 0) {
+ long double whatever;
+ if (value && sscanf(value, "%Lf", &whatever) == 1) {
+ LENGTHEN_BUF2(50);
+ lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%30Lf", whatever);
+ } else {
+ LENGTHEN_BUF2(1);
+ lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "0");
+ }
+ /* XXX Might want to handle dates, times, and other misc fields here XXX */
+ } else {
+ if (value)
+ PQescapeStringConn(conn, escapebuf, value, strlen(value), NULL);
+ else
+ escapebuf[0] = '\0';
+ LENGTHEN_BUF2(strlen(escapebuf) + 2);
+ lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "'%s'", escapebuf);
+ }
+ }
+ LENGTHEN_BUF2(1);
+ strcat(sql2 + lensql2, ",");
+ lensql2++;
+ }
+ AST_RWLIST_UNLOCK(&psql_columns);
+ LENGTHEN_BUF1(lensql2);
+ sql[lensql - 1] = ')';
+ sql2[lensql2 - 1] = ')';
+ strcat(sql + lensql, sql2);
+ ast_verb(11, "[%s]\n", sql);
ast_debug(2, "cdr_pgsql: inserting a CDR record.\n");
- snprintf(sqlcmd,sizeof(sqlcmd),"INSERT INTO %s (calldate,clid,src,dst,dcontext,channel,dstchannel,"
- "lastapp,lastdata,duration,billsec,disposition,amaflags,accountcode,uniqueid,userfield) VALUES"
- " ('%s','%s','%s','%s','%s', '%s','%s','%s','%s',%ld,%ld,'%s',%ld,'%s','%s','%s')",
- table, timestr, clid, src, dst, dcontext, channel, dstchannel, lastapp, lastdata,
- cdr->duration,cdr->billsec,ast_cdr_disp2str(cdr->disposition),cdr->amaflags, cdr->accountcode, uniqueid, userfield);
-
- ast_debug(3, "cdr_pgsql: SQL command executed: %s\n",sqlcmd);
-
/* Test to be sure we're still connected... */
/* If we're connected, and connection is working, good. */
/* Otherwise, attempt reconnect. If it fails... sorry... */
@@ -152,7 +277,7 @@ static int pgsql_log(struct ast_cdr *cdr)
return -1;
}
}
- result = PQexec(conn, sqlcmd);
+ result = PQexec(conn, sql);
if (PQresultStatus(result) != PGRES_COMMAND_OK) {
pgerror = PQresultErrorMessage(result);
ast_log(LOG_ERROR,"cdr_pgsql: Failed to insert call detail record into database!\n");
@@ -163,7 +288,7 @@ static int pgsql_log(struct ast_cdr *cdr)
ast_log(LOG_ERROR, "cdr_pgsql: Connection reestablished.\n");
connected = 1;
PQclear(result);
- result = PQexec(conn, sqlcmd);
+ result = PQexec(conn, sql);
if (PQresultStatus(result) != PGRES_COMMAND_OK) {
pgerror = PQresultErrorMessage(result);
ast_log(LOG_ERROR,"cdr_pgsql: HARD ERROR! Attempted reconnection failed. DROPPING CALL RECORD!\n");
@@ -181,8 +306,14 @@ static int pgsql_log(struct ast_cdr *cdr)
}
static int unload_module(void)
-{
+{
+ struct columns *cur;
+ ast_cdr_unregister(name);
+
+ /* Give all threads time to finish */
+ usleep(1);
PQfinish(conn);
+
if (pghostname)
ast_free(pghostname);
if (pgdbname)
@@ -195,7 +326,13 @@ static int unload_module(void)
ast_free(pgdbport);
if (table)
ast_free(table);
- ast_cdr_unregister(name);
+
+ AST_RWLIST_WRLOCK(&psql_columns);
+ while ((cur = AST_RWLIST_REMOVE_HEAD(&psql_columns, list))) {
+ ast_free(cur);
+ }
+ AST_RWLIST_UNLOCK(&psql_columns);
+
return 0;
}
@@ -203,6 +340,8 @@ static int config_module(int reload)
{
struct ast_variable *var;
char *pgerror;
+ struct columns *cur;
+ PGresult *result;
const char *tmp;
struct ast_config *cfg;
struct ast_flags config_flags = { reload ? CONFIG_FLAG_FILEUNCHANGED : 0 };
@@ -304,8 +443,40 @@ static int config_module(int reload)
conn = PQsetdbLogin(pghostname, pgdbport, NULL, NULL, pgdbname, pgdbuser, pgpassword);
if (PQstatus(conn) != CONNECTION_BAD) {
+ char sqlcmd[256];
+ char *fname, *ftype, *flen;
+ int i, rows;
ast_debug(1, "Successfully connected to PostgreSQL database.\n");
connected = 1;
+
+ /* Query the columns */
+ snprintf(sqlcmd, sizeof(sqlcmd), "select a.attname, t.typname, a.attlen from pg_class c, pg_attribute a, pg_type t where c.oid = a.attrelid and a.atttypid = t.oid and (a.attnum > 0) and c.relname = '%s' order by c.relname, attnum", table);
+ result = PQexec(conn, sqlcmd);
+ if (PQresultStatus(result) != PGRES_TUPLES_OK) {
+ pgerror = PQresultErrorMessage(result);
+ ast_log(LOG_ERROR, "cdr_pgsql: Failed to query database columns: %s\n", pgerror);
+ PQclear(result);
+ unload_module();
+ return AST_MODULE_LOAD_DECLINE;
+ }
+
+ rows = PQntuples(result);
+ for (i = 0; i < rows; i++) {
+ fname = PQgetvalue(result, i, 0);
+ ftype = PQgetvalue(result, i, 1);
+ flen = PQgetvalue(result, i, 2);
+ ast_verb(4, "Found column '%s' of type '%s'\n", fname, ftype);
+ cur = ast_calloc(1, sizeof(*cur) + strlen(fname) + strlen(ftype) + 2);
+ if (cur) {
+ sscanf(flen, "%d", &cur->len);
+ cur->name = (char *)cur + sizeof(*cur);
+ cur->type = (char *)cur + sizeof(*cur) + strlen(fname) + 1;
+ strcpy(cur->name, fname);
+ strcpy(cur->type, ftype);
+ AST_RWLIST_INSERT_TAIL(&psql_columns, cur, list);
+ }
+ }
+ PQclear(result);
} else {
pgerror = PQerrorMessage(conn);
ast_log(LOG_ERROR, "cdr_pgsql: Unable to connect to database server %s. CALLS WILL NOT BE LOGGED!!\n", pghostname);