aboutsummaryrefslogtreecommitdiffstats
path: root/echld
diff options
context:
space:
mode:
authorLuis Ontanon <luis.ontanon@gmail.com>2013-06-27 03:41:48 +0000
committerLuis Ontanon <luis.ontanon@gmail.com>2013-06-27 03:41:48 +0000
commite48b0084e19eacc30a2d8a8e274cc284973e0a2c (patch)
tree4f1f9c467037e08567984dd003687057777ec431 /echld
parentffe6d9c4d6fe36d7f2498154cdb12159b6b60aff (diff)
MS: Pong from the dispatcher!
svn path=/trunk/; revision=50183
Diffstat (limited to 'echld')
-rw-r--r--echld/child.c2
-rw-r--r--echld/common.c153
-rw-r--r--echld/dispatcher.c263
-rw-r--r--echld/echld-int.h22
-rw-r--r--echld/echld-util.c7
-rw-r--r--echld/parent.c181
6 files changed, 430 insertions, 198 deletions
diff --git a/echld/child.c b/echld/child.c
index b603d3464a..5b8e89021f 100644
--- a/echld/child.c
+++ b/echld/child.c
@@ -60,7 +60,7 @@ static echld_child_t child;
#define CHILD_RESP(BYTEARR,TYPE) echld_write_frame(child.fds.pipe_to_parent, BYTEARR, child.chld_id, TYPE, child.reqh_id, NULL)
#ifdef DEBUG_CHILD
-static int dbg_level = 0;
+static int dbg_level = DEBUG_CHILD;
void child_debug(int level, const char* fmt, ...) {
va_list ap;
diff --git a/echld/common.c b/echld/common.c
index c078af5fd7..2071fc6bd0 100644
--- a/echld/common.c
+++ b/echld/common.c
@@ -26,6 +26,37 @@
#include "echld-int.h"
+#ifdef DEBUG_BASE
+
+static int dbg_level = DEBUG_BASE;
+static FILE* dbg_fp;
+
+static void common_dbg(int level, const char* fmt, ...) {
+ va_list ap;
+ char str[1024];
+
+ if (level > dbg_level) return;
+
+ va_start(ap,fmt);
+ g_vsnprintf(str,1024,fmt,ap);
+ va_end(ap);
+
+ if (dbg_fp) {
+ fprintf(dbg_fp,"Common: level=%d msg='%s'\n",level,str);
+ fflush(dbg_fp);
+ }
+}
+
+#define DBG(attrs) ( common_dbg attrs )
+#else
+#define DBG(attrs)
+#endif
+
+
+extern void echld_common_set_dbg(int level, FILE* fp) {
+ dbg_level = level;
+ dbg_fp = fp;
+}
/**
@@ -39,6 +70,8 @@ static void child_realloc_buff(echld_reader_t* r, size_t needed) {
size_t s = r->len;
long rp_off = r->rp - r->data;
+ DBG((2,"REALLOC BUFF needed=%d",needed));
+
if ( a < (s + needed) ) {
guint8* data = r->data;
@@ -68,19 +101,6 @@ static void parent_realloc_buff(echld_reader_t* b, size_t needed) {
-void echld_init_reader(echld_reader_t* r, int fd, size_t initial) {
- r->fd = fd;
- if (fd >= 0) fcntl(fd, F_SETFL, O_NONBLOCK);
-
- if (r->data == NULL) {
- r->actual_len = initial;
- r->data = (guint8*)g_malloc0(initial);
- r->wp = r->data;
- r->rp = NULL;
- r->len = 0;
- }
-}
-
void echld_reset_reader(echld_reader_t* r, int fd, size_t initial) {
r->fd = fd;
fcntl(fd, F_SETFL, O_NONBLOCK);
@@ -89,15 +109,21 @@ void echld_reset_reader(echld_reader_t* r, int fd, size_t initial) {
r->actual_len = initial;
r->data =(guint8*) g_malloc0(initial);
r->wp = r->data;
- r->rp = NULL;
+ r->rp = r->data;
r->len = 0;
} else {
r->wp = r->data;
- r->rp = NULL;
+ r->rp = r->data;
r->len = 0;
}
}
+void echld_init_reader(echld_reader_t* r, int fd, size_t initial) {
+ echld_reset_reader(r,fd,initial);
+}
+
+
+
void free_reader(echld_reader_t* r) {
free(r->data);
}
@@ -106,26 +132,31 @@ static long reader_readv(echld_reader_t* r, size_t len) {
struct iovec iov;
long nread;
+ DBG((2,"READV needed=%d",len));
+
if ( (r->actual_len - r->len) < len )
reader_realloc_buff(r, len);
iov.iov_base = r->wp;
iov.iov_len = len;
- nread = readv(0,
- &iov,
- (guint)len);
+ nread = readv(r->fd, &iov, 1);
+
+ DBG((2,"READV nread=%d msg='%s'",nread, (nread<0) ? strerror(errno) : "-" ));
if (nread >= 0) {
r->wp += nread;
r->len += nread;
}
+ if (errno == EAGAIN) return 0;
+
return nread;
};
long echld_read_frame(echld_reader_t* r, read_cb_t cb, void* cb_data) {
+ DBG((4,"READ = echld_read_frame fd=%d",r->fd));
// it will use shared memory instead of inband communication
do {
@@ -135,6 +166,8 @@ long echld_read_frame(echld_reader_t* r, read_cb_t cb, void* cb_data) {
size_t missing;
long off;
+ DBG((5,"READ reader_len=%d",r->len));
+
if ( r->len < ECHLD_HDR_LEN) {
/* read the header */
goto incomplete_header;
@@ -143,29 +176,28 @@ long echld_read_frame(echld_reader_t* r, read_cb_t cb, void* cb_data) {
goto incomplete_frame;
}
- /* we've got a frame! */
-
off = (fr_len = HDR_LEN(h)) + ECHLD_HDR_LEN;
-
+ DBG((5,"READ we've got a frame! fr_len=%d ch=%d t='%c' rh=%d",fr_len, h->h.chld_id, HDR_TYPE(h), h->h.reqh_id));
+
+
cb( &(r->rp[sizeof(hdr_t)]), HDR_LEN(h), h->h.chld_id, HDR_TYPE(h), h->h.reqh_id, cb_data);
- if ( ((long)r->len) >= off ) {
- /* shift the consumed frame */
- r->len -= off;
- memcpy(r->rp ,r->rp + off ,r->len);
- r->wp -= off;
- r->rp -= off;
- }
+ r->len = 0;
+ r->wp = r->data;
+ r->rp = r->data;
- continue;
+ DBG((5,"READ consumed frame!"));
+
+ goto again;
incomplete_header:
missing = ECHLD_HDR_LEN - (r->len);
+ DBG((5,"READ incomplete_header missing=%d",missing));
nread = reader_readv(r,missing);
- if (nread < 0) {
+ if (nread < 0 && errno != EAGAIN) {
goto kaput; /*XXX*/
} else if (nread < (long)missing) {
goto again;
@@ -177,16 +209,21 @@ long echld_read_frame(echld_reader_t* r, read_cb_t cb, void* cb_data) {
fr_len = HDR_LEN(h) + ECHLD_HDR_LEN;
missing = fr_len - r->len;
- nread = reader_readv(r,missing);
+ DBG((5,"READ incomplete_frame fr_len=%d missing=%d",fr_len ,missing));
+ if (missing) {
+ nread = reader_readv(r,missing);
- if (nread < 0) {
- goto kaput; /*XXX*/
- } else if (nread <= (long)missing) {
- goto again;
+ if (nread < 0 && errno != EAGAIN) {
+ goto kaput; /*XXX*/
+ } else if (nread < (long)missing) {
+ goto again;
+ }
}
} while(1);
+
+ DBG((1,"READ incomplete_frame Cannot happen"));
return 0;
again: return 1;
@@ -196,44 +233,26 @@ long echld_read_frame(echld_reader_t* r, read_cb_t cb, void* cb_data) {
-long echld_write_frame(int fd, GByteArray* ba, guint16 chld_id, echld_msg_type_t type, guint16 reqh_id, void* data) {
- static guint8* write_buf = NULL;
- static long wb_len = 4096;
- hdr_t* h;
- struct iovec iov;
- long fr_len = ba->len+ECHLD_HDR_LEN;
-
- data = data; //
+long echld_write_frame(int fd, GByteArray* ba, guint16 chld_id, echld_msg_type_t type, guint16 reqh_id, void* data _U_) {
+ hdr_t h;
+ struct iovec iov[2];
+ int iov_cnt = 1;
- // it will use shared memory instead of inband communication
- if (! write_buf) {
- // lock if needed
- write_buf = (guint8*)g_malloc0(wb_len);
- // unlock if needed
- }
+ h.h.type_len = (type<<24) | ((ba?ba->len:0) & 0x00ffffff) ;
+ h.h.chld_id = chld_id;
+ h.h.reqh_id = reqh_id;
- if (fr_len > wb_len) {
- do {
- wb_len *= 2;
- } while (fr_len > wb_len);
+ iov[0].iov_base = &h;
+ iov[0].iov_len = 8;
- // lock if needed
- write_buf = (guint8*)g_realloc(write_buf,wb_len);
- // unlock if needed
+ if ( ba && ba->len > 0 ) {
+ iov[1].iov_base = ba->data;
+ iov[1].iov_len = ba->len;
+ iov_cnt++;
}
- h = (hdr_t*)write_buf;
- h->h.type_len = (type<<24) | (((guint32)ba->len) & 0x00ffffff) ;
- h->h.chld_id = chld_id;
- h->h.reqh_id = reqh_id;
-
- memcpy(write_buf+ECHLD_HDR_LEN,ba->data,ba->len);
-
- iov.iov_base = write_buf;
- iov.iov_len = fr_len;
-
- return (long) writev(fd, &iov, (unsigned)fr_len);
+ return (long) writev(fd, iov, iov_cnt);
}
diff --git a/echld/dispatcher.c b/echld/dispatcher.c
index 53dbbda9d7..6edff8ae5e 100644
--- a/echld/dispatcher.c
+++ b/echld/dispatcher.c
@@ -62,27 +62,40 @@ struct dispatcher {
struct dispatcher* dispatcher;
-#define DISP_RESP(B,T) (echld_write_frame( dispatcher->parent_out, (B), 0, (T), dispatcher->reqh_id, NULL))
#ifdef DEBUG_DISPATCHER
-static int dbg_level = 0;
+static int debug_lvl = DEBUG_DISPATCHER;
+static FILE* debug_fp = NULL;
-void dispatcher_debug(int level, const char* fmt, ...) {
+#define DCOM echld_common_set_dbg(debug_lvl,debug_fp)
+#define DFL fflush(debug_fp)
+
+int dispatcher_debug(int level, const char* fmt, ...) {
va_list ap;
char* str;
- if (dbg_level<level) return;
+ if (debug_lvl<level) return 1;
va_start(ap, fmt);
str = g_strdup_vprintf(fmt,ap);
va_end(ap);
- fprintf(stderr, "dispatcher[%d]: reqh_id=%d dbg_level=%d message='%s'", dispatcher->pid, dispatcher->reqh_id, level, str);
+ if (dispatcher) {
+ fprintf(debug_fp, "dispatcher[%d]: reqh_id=%d dbg_level=%d message='%s'\n", dispatcher->pid, dispatcher->reqh_id, level, str);
+ } else {
+ fprintf(debug_fp, "dispatcher: dbg_level=%d message='%s'\n", level, str);
+ }
+
+ fflush(debug_fp);
+
g_free(str);
+
+ return 1;
}
+
static char* param_get_dbg_level(char** err _U_) {
- return g_strdup_printf("%d",dbg_level);
+ return g_strdup_printf("%d",debug_lvl);
}
static echld_bool_t param_set_dbg_level(char* val , char** err ) {
@@ -97,15 +110,51 @@ static echld_bool_t param_set_dbg_level(char* val , char** err ) {
return FALSE;
}
- dbg_level = lvl;
+ debug_lvl = lvl;
+ DCOM;
return TRUE;
}
+static long dbg_r = 0;
+
#define DISP_DBG(attrs) ( dispatcher_debug attrs )
+#define DISP_DBG_INIT() do { debug_fp = stderr; DCOM; } while(0)
+#define DISP_DBG_START(fname) do { debug_fp = fopen(fname,"a"); DCOM; DISP_DBG((0,"Log Started")); } while(0)
+#define DISP_WRITE(FD,BA,CH,T,RH) ( dbg_r = echld_write_frame(FD,BA,CH,T,RH,NULL), DISP_DBG((1,"SND fd=%d ch=%d ty='%c' rh=%d msg='%s'",FD,CH,T,RH, (dbg_r>0?"ok":strerror(errno)))), dbg_r )
#else
#define DISP_DBG(attrs)
+#define DISP_DBG_INIT()
+#define DISP_DBG_START(fname)
+#define DISP_WRITE(FD,BA,CH,T,RH) echld_write_frame(FD,BA,CH,T,RH,NULL)
#endif
+#define DISP_RESP(B,T) (DISP_WRITE( dispatcher->parent_out, (B), 0, (T), dispatcher->reqh_id))
+
+static void dispatcher_fatal(int cause, const char* fmt, ...) {
+ size_t len= 1024;
+ gchar err_str[len];
+ va_list ap;
+ int i;
+ struct dispatcher_child* cc = dispatcher->children;
+ int max_children = dispatcher->max_children;
+
+ va_start(ap, fmt);
+ g_vsnprintf(err_str,len,fmt,ap);
+ va_end(ap);
+
+ DISP_DBG((0,"fatal cause=%d msg=\"%s\"",cause ,err_str));
+
+ /* the massacre */
+ for(i = 0; i < max_children; i++) {
+ struct dispatcher_child* c = &(cc[i]);
+ if (c->chld_id > 0) kill(c->pid,SIGTERM);
+ }
+
+ exit(cause);
+}
+
+#define DISP_FATAL(attrs) dispatcher_fatal attrs
+
static void dispatcher_err(int errnum, const char* fmt, ...) {
size_t len= 1024;
gchar err_str[len];
@@ -222,12 +271,33 @@ static char* param_get_interfaces(char** err) {
return s;
}
+static struct timeval disp_loop_timeout;
+
+static char* param_get_loop_to(char** err _U_) {
+ return g_strdup_printf("%d.%6ds",(int)disp_loop_timeout.tv_sec, (int)disp_loop_timeout.tv_usec );
+}
+
+static echld_bool_t param_set_loop_to(char* val , char** err ) {
+ char* p;
+ int usec = (int)strtol(val, &p, 10); /*XXX: "10ms" or "500us" or "1s" */
+
+ if (p<=val) {
+ *err = g_strdup("not an integer");
+ return FALSE;
+ }
+
+ disp_loop_timeout.tv_sec = usec / 1000000;
+ disp_loop_timeout.tv_usec = usec % 1000000;
+
+ return TRUE;
+}
static param_t disp_params[] = {
#ifdef DEBUG_DISPATCHER
{"dbg_level", param_get_dbg_level, param_set_dbg_level},
# endif
+ {"loop_timeout",param_get_loop_to,param_set_loop_to},
{"interfaces",param_get_interfaces,NULL},
{NULL,NULL,NULL} };
@@ -237,7 +307,7 @@ static param_t* get_paramset(char* name) {
if (strcmp(name,disp_params[i].name) == 0 ) return &(disp_params[i]);
}
return NULL;
-}
+}
static struct dispatcher_child* dispatcher_get_child(struct dispatcher* d, guint16 chld_id) {
@@ -255,6 +325,7 @@ static struct dispatcher_child* dispatcher_get_child(struct dispatcher* d, guint
static void dispatcher_clear_child(struct dispatcher_child* c) {
+ DISP_DBG((5,"dispatcher_clear_child chld_id=%d",c->chld_id));
echld_reset_reader(&(c->reader), -1, 4096);
c->chld_id = 0;
c->write_fd = 0;
@@ -263,14 +334,22 @@ static void dispatcher_clear_child(struct dispatcher_child* c) {
}
static void preinit_epan(void) {
+ DISP_DBG((2,"preinit_epan"));
/* Here we do initialization of parts of epan that will be the same for every child we fork */
}
static void dispatcher_clear(void) {
+ DISP_DBG((2,"Child chld_id=%d ->CAPTURING"));
/* remove unnecessary stuff for the working child */
}
+void dispatcher_sig(int sig) {
+ DISP_FATAL((TERMINATED,"SIG sig=%d",sig));
+ exit(1);
+
+}
+
void dispatcher_reaper(int sig) {
int status;
int i;
@@ -316,7 +395,7 @@ void dispatcher_reaper(int sig) {
if (s) g_free(s);
}
- echld_write_frame(dispatcher->parent_out, em, c->chld_id, ECHLD_CHILD_DEAD, 0, NULL);
+ DISP_WRITE(dispatcher->parent_out, em, c->chld_id, ECHLD_CHILD_DEAD, 0);
dispatcher_clear_child(c);
g_byte_array_free(em,TRUE);
return;
@@ -369,19 +448,38 @@ static long dispatch_to_parent(guint8* b, size_t len, echld_chld_id_t chld_id, e
switch(type) {
case ECHLD_ERROR: break;
case ECHLD_TIMED_OUT: break;
- case ECHLD_HELLO: c->state = IDLE; break;
- case ECHLD_CLOSING: c->closing = TRUE; c->state = CLOSED; break;
+ case ECHLD_HELLO:
+ c->state = IDLE;
+ DISP_DBG((2,"Child chld_id=%d ->IDLE",c->chld_id));
+ break;
+ case ECHLD_CLOSING:
+ c->closing = TRUE;
+ c->state = CLOSED;
+ DISP_DBG((2,"Child chld_id=%d ->CLOSED",c->chld_id));
+ break;
case ECHLD_PARAM: break;
case ECHLD_PONG: break;
- case ECHLD_FILE_OPENED: c->state = READING; break;
- case ECHLD_INTERFACE_OPENED: c->state = READY; break;
- case ECHLD_CAPTURE_STARTED: c->state = CAPTURING; break;
+ case ECHLD_FILE_OPENED:
+ c->state = READING;
+ DISP_DBG((2,"Child chld_id=%d ->READING",c->chld_id));
+ break;
+ case ECHLD_INTERFACE_OPENED:
+ c->state = READY;
+ DISP_DBG((2,"Child chld_id=%d ->READY",c->chld_id));
+ break;
+ case ECHLD_CAPTURE_STARTED:
+ c->state = CAPTURING;
+ DISP_DBG((2,"Child chld_id=%d ->CAPTURING",c->chld_id));
+ break;
case ECHLD_NOTIFY: break; // notify(pre-encoded)
case ECHLD_PACKET_SUM: break; // packet_sum(pre-encoded)
case ECHLD_TREE: break; //tree(framenum, tree(pre-encoded) )
case ECHLD_BUFFER: break; // buffer (name,range,totlen,data)
- case ECHLD_EOF: c->state = DONE; break;
- case ECHLD_CAPTURE_STOPPED: c->state = DONE; break;
+ case ECHLD_EOF:
+ case ECHLD_CAPTURE_STOPPED:
+ c->state = DONE;
+ DISP_DBG((2,"Child chld_id=%d ->DONE",c->chld_id));
+ break;
case ECHLD_NOTE_ADDED: break;
case ECHLD_PACKET_LIST: break; // packet_list(name,filter,range);
case ECHLD_FILE_SAVED: break;
@@ -389,8 +487,9 @@ static long dispatch_to_parent(guint8* b, size_t len, echld_chld_id_t chld_id, e
default:
goto misbehabing;
}
-
- return echld_write_frame(dispatcher->parent_out, &in_ba, chld_id, type, reqh_id, NULL);
+
+ DISP_DBG((4,"Dispatching to child reqh_id=%d chld_id=%d type='%c'",reqh_id,c->chld_id,type));
+ return DISP_WRITE(dispatcher->parent_out, &in_ba, chld_id, type, reqh_id);
misbehabing:
c->state = ERRORED;
@@ -415,6 +514,7 @@ void dispatch_new_child(struct dispatcher* dd) {
int pipe_to_child;
int pipe_from_child;
+ DISP_DBG((5,"new_child pipe(parent)"));
if( pipe(parent_pipe_fds) < 0) {
dispatcher_err(ECHLD_ERR_CANNOT_FORK,"CANNOT OPEN PARENT PIPE: %s",strerror(errno));
return;
@@ -423,6 +523,7 @@ void dispatch_new_child(struct dispatcher* dd) {
pipe_from_parent = parent_pipe_fds[0];
pipe_to_child = parent_pipe_fds[1];
+ DISP_DBG((5,"new_child pipe(child)"));
if( pipe(child_pipe_fds) < 0) {
close(pipe_from_parent);
close(pipe_to_child);
@@ -433,6 +534,7 @@ void dispatch_new_child(struct dispatcher* dd) {
pipe_from_child = child_pipe_fds[0];
pipe_to_parent = child_pipe_fds[1];
+ DISP_DBG((4,"New Child Forking()"));
switch (( pid = fork() )) {
case -1: {
close(pipe_to_child);
@@ -477,10 +579,12 @@ void dispatch_new_child(struct dispatcher* dd) {
echld_reset_reader(&(c->reader), pipe_from_child,4096);
c->write_fd = pipe_to_child;
c->pid = pid;
- dispatcher->nchildren++;
+ c->chld_id = dispatcher->nchildren++;
+
+ DISP_DBG((4,"Child Forked pid=%d chld_id=%d",pid,c->chld_id));
/* configure child */
- echld_write_frame(pipe_to_child, &out_ba, c->chld_id, ECHLD_NEW_CHILD, dispatcher->reqh_id, NULL);
+ DISP_WRITE(pipe_to_child, &out_ba, c->chld_id, ECHLD_NEW_CHILD, dispatcher->reqh_id);
return;
}
}
@@ -494,21 +598,19 @@ void dispatch_new_child(struct dispatcher* dd) {
/* process signals sent from parent */
static long dispatch_to_child(guint8* b, size_t len, echld_chld_id_t chld_id, echld_msg_type_t type, echld_reqh_id_t reqh_id, void* data) {
struct dispatcher* disp = (struct dispatcher*)data;
- GByteArray in_ba;
disp->reqh_id = reqh_id;
- in_ba.data = b;
- in_ba.len = (guint)len;
if (chld_id == 0) { /* these are messages to the dispatcher itself */
+ DISP_DBG((2,"Message to Dispatcher"));
switch(type) {
case ECHLD_CLOSE_CHILD:
dispatcher_destroy();
return 0;
case ECHLD_PING:
- echld_write_frame(disp->parent_out, &in_ba, chld_id, ECHLD_PONG, reqh_id, NULL);
-
+ DISP_DBG((2,"PONG reqh_id=%d",reqh_id));
+ DISP_WRITE(disp->parent_out, NULL, chld_id, ECHLD_PONG, reqh_id);
return 0;
case ECHLD_NEW_CHILD:
dispatch_new_child(disp);
@@ -589,6 +691,8 @@ static long dispatch_to_child(guint8* b, size_t len, echld_chld_id_t chld_id, ec
} else {
struct dispatcher_child* c;
+ DISP_DBG((2,"Message to Child"));
+
if (! (c = dispatcher_get_child(dispatcher, chld_id)) ) {
dispatcher_err(ECHLD_ERR_NO_SUCH_CHILD, "wrong chld_id %d", chld_id);
return 0;
@@ -598,21 +702,26 @@ static long dispatch_to_child(guint8* b, size_t len, echld_chld_id_t chld_id, ec
case ECHLD_CLOSE_CHILD:
c->closing = TRUE;
c->state = CLOSED;
+ DISP_DBG((2,"Child chld_id=%d ->CLOSED",chld_id));
goto relay_frame;
case ECHLD_OPEN_FILE:
c->state = READING;
+ DISP_DBG((2,"Child chld_id=%d ->READING",chld_id));
goto relay_frame;
case ECHLD_OPEN_INTERFACE:
c->state = READY;
+ DISP_DBG((2,"Child chld_id=%d ->READY",chld_id));
goto relay_frame;
case ECHLD_START_CAPTURE:
+ DISP_DBG((2,"Child chld_id=%d ->CAPTURING",chld_id));
c->state = CAPTURING;
goto relay_frame;
case ECHLD_STOP_CAPTURE:
+ DISP_DBG((2,"Child chld_id=%d ->DONE",chld_id));
c->state = DONE;
goto relay_frame;
@@ -625,9 +734,15 @@ static long dispatch_to_child(guint8* b, size_t len, echld_chld_id_t chld_id, ec
case ECHLD_GET_TREE:
case ECHLD_GET_BUFFER:
case ECHLD_ADD_NOTE:
- relay_frame:
- return echld_write_frame(c->write_fd, &in_ba, chld_id, type, reqh_id, NULL);
+ relay_frame: {
+ GByteArray in_ba;
+
+ in_ba.data = b;
+ in_ba.len = (guint)len;
+ DISP_DBG((3,"Relay to Child chld_id=%d type='%c' req_id=%d",chld_id, type, reqh_id));
+ return DISP_WRITE(c->write_fd, &in_ba, chld_id, type, reqh_id);
+ }
default:
dispatcher_err(ECHLD_ERR_WRONG_MSG, "wrong message %d %c", reqh_id, type);
return 0;
@@ -636,18 +751,23 @@ static long dispatch_to_child(guint8* b, size_t len, echld_chld_id_t chld_id, ec
}
+
+
int dispatcher_loop(void) {
int parent_out = dispatcher->parent_out;
int parent_in = dispatcher->parent_in.fd;
-
struct dispatcher_child* children = dispatcher->children;
+ volatile int pforce = 0;
+
+ DISP_DBG((5,"LOOP in_fd=%d out_fd=%d",parent_in, parent_out));
do {
fd_set rfds;
fd_set efds;
- struct timeval timeout;
struct dispatcher_child* c;
int nfds;
+ int nchld = 0;
+
FD_ZERO(&rfds);
FD_ZERO(&efds);
@@ -656,42 +776,58 @@ int dispatcher_loop(void) {
FD_SET(parent_in,&efds);
FD_SET(parent_out,&efds);
- for (c = children, nfds = 0; c->pid; c++) {
- if (c->chld_id) {
+ for (c = children; c->pid; c++) {
+ if (c->chld_id > 0) {
+ nchld++;
FD_SET(c->reader.fd, &rfds);
FD_SET(c->reader.fd, &efds);
}
- nfds++;
}
- nfds = select(nfds, &rfds, NULL, &efds, &timeout);
+ DISP_DBG((5,"Select()ing nchld=%d usecs=%d",nchld,disp_loop_timeout.tv_usec));
- if ( FD_ISSET(parent_in, &efds) || FD_ISSET(parent_out, &efds) ) {
- /* XXX deep shit */
- break;
+ nfds = select(FD_SETSIZE, &rfds, NULL, &efds, &disp_loop_timeout);
+
+ if (nfds < 0) {
+ DISP_DBG((1,"select error='%s'",strerror(errno) ));
+ continue;
}
- if (FD_ISSET(parent_in, &rfds)) {
+ if ( pforce || FD_ISSET(parent_in, &rfds)) {
long st = echld_read_frame(&(dispatcher->parent_in), dispatch_to_child, dispatcher);
if (st < 0) {
- /* XXX */
+ DISP_DBG((1,"read frame returning < 0 for parent"));
+ /* XXX: ??? */
continue;
}
}
+ if ( FD_ISSET(parent_in, &efds) ) {
+ DISP_DBG((1,"Parent In Pipe Errored!"));
+ continue;
+ }
+
+ if ( FD_ISSET(parent_out, &efds) ) {
+ DISP_DBG((1,"Parent Out Pipe Errored!"));
+ continue;
+ }
+
+
for (c=children; c->pid; c++) {
if (c->chld_id) {
- if ( FD_ISSET(c->reader.fd,&efds) ) {
- /* XXX cleanup child and report */
- continue;
- }
+ // if ( FD_ISSET(c->reader.fd,&efds) ) {
+ // DISP_DBG((1,"errored child pipe chld_id=%d",c->chld_id));
+ // dispatcher_clear_child(c);
+ // continue;
+ // }
if (FD_ISSET(c->reader.fd,&rfds)) {
long st = echld_read_frame(&(c->reader), dispatch_to_parent, c);
if (st < 0) {
- /* XXX cleanup child and report */
+ DISP_DBG((1,"read_frame returned < 0 for chld_id=%d",c->chld_id));
+ /* XXX */
continue;
}
continue;
@@ -704,17 +840,34 @@ int dispatcher_loop(void) {
return 1;
}
+
void echld_dispatcher_start(int* in_pipe_fds, int* out_pipe_fds) {
static struct dispatcher d;
- int fdt_len = getdtablesize();
- int i;
+#ifdef DEBUG_DISPATCHER
+ int dbg_fd;
+#endif
+
+ disp_loop_timeout.tv_sec = 0;
+ disp_loop_timeout.tv_usec = DISPATCHER_WAIT_INITIAL;
+
+ DISP_DBG_INIT();
+ DISP_DBG_START("dispatcher.debug");
+#ifdef DEBUG_DISPATCHER
+ dbg_fd = fileno(debug_fp);
+#endif
+ DISP_DBG((2,"Dispatcher Starting"));
- preinit_epan();
signal(SIGCHLD,dispatcher_reaper);
+ signal(SIGTERM,dispatcher_sig);
+ signal(SIGPIPE,dispatcher_sig);
+ signal(SIGINT,SIG_IGN);
+ signal(SIGCONT,SIG_IGN);
+
dispatcher = &d;
+
echld_init_reader(&(d.parent_in),in_pipe_fds[0],4096);
d.parent_out = out_pipe_fds[1];
d.children = g_new0(struct dispatcher_child,ECHLD_MAX_CHILDREN);
@@ -723,19 +876,19 @@ void echld_dispatcher_start(int* in_pipe_fds, int* out_pipe_fds) {
d.reqh_id = -1;
d.pid = getpid();
+ close(out_pipe_fds[0]);
+ close(in_pipe_fds[1]);
+
echld_get_all_codecs(&(d.enc.to_parent), &(d.dec.from_parent), &(d.enc.to_child), &(d.dec.from_child));
- dispatcher_clear();
+ DISP_DBG((2,"Dispatcher Configured pid=%d parent_in=%d parent_out=%d",d.pid,in_pipe_fds[0],d.parent_out));
- /* close all fds but those used */
- for(i=0;i<fdt_len;i++) {
- if ( i != d.parent_in.fd
- && i != d.parent_out
- && i != STDERR_FILENO ) {
- close(i);
- }
- }
+ preinit_epan();
exit(dispatcher_loop());
}
+extern void echld_dispatcher_unused(void) {
+ DISP_FATAL((1,"UNUSED"));
+}
+
diff --git a/echld/echld-int.h b/echld/echld-int.h
index e21b69c154..1085301afc 100644
--- a/echld/echld-int.h
+++ b/echld/echld-int.h
@@ -122,6 +122,8 @@ typedef struct _echld_reader {
#define READER_FD_ISSET(R,fdset_p) READER_FD_ISSET(R.fd,&(fdset_p))
#define READER_FD_CLEAR(R,fdset_p) READER_FD_CLEAR(R.fd,&(fdset_p))
+extern void echld_common_set_dbg(int level, FILE* fp);
+
extern void echld_init_reader(echld_reader_t* r, int fd, size_t initial);
extern void echld_reset_reader(echld_reader_t* r, int fd, size_t initial);
@@ -193,13 +195,27 @@ extern void echld_dispatcher_start(int* in_pipe_fds, int* out_pipe_fds);
extern void dummy_switch(echld_msg_type_t type);
extern void echld_unused(void);
+/* initial debug levels */
+#define DEBUG_BASE 5
#define DEBUG_CHILD 5
#define DEBUG_DISPATCHER 5
#define DEBUG_PARENT 5
-#define BROKEN_PARENT_PIPE 3333
-#define BROKEN_DUMPCAP_PIPE 4444
-#define BROKEN_READFILE 5555
+/* config stuff */
+#define DISPATCHER_WAIT_INITIAL 999999 /* almost 1s */
+
+
+/* fatalities */
+#define BROKEN_PARENT_PIPE 123
+#define BROKEN_DUMPCAP_PIPE 124
+#define BROKEN_READFILE 125
+#define DISPATCHER_DEAD 126
+#define UNIMPLEMENTED 127
+#define CANNOT_FORK 128
+#define SHOULD_HAVE_EXITED_BEFORE 129
+#define DISPATCHER_PIPE_FAILED 130
+#define TERMINATED 140
+
#endif
diff --git a/echld/echld-util.c b/echld/echld-util.c
index 6519e81c90..02d4e3c2f5 100644
--- a/echld/echld-util.c
+++ b/echld/echld-util.c
@@ -47,16 +47,21 @@ static gboolean pong(echld_msg_type_t type, GByteArray* ba _U_, void* data) {
struct timeval t;
long ret = -1;
gettimeofday(&t,NULL);
+
switch (type) {
case ECHLD_PONG:
ret = timevaldiff(&(p->tv),&t);
+ break;
default:
ret = -1;
+ break;
}
if (p->cb) p->cb(ret, p->cb_data);
+ g_free(p);
+
return TRUE;
}
@@ -68,7 +73,7 @@ extern echld_state_t echld_ping(int chld_id, echld_ping_cb_t pcb, void* cb_data)
p->cb_data = cb_data;
gettimeofday(&(p->tv),NULL);
- return echld_reqh(chld_id, 0, ECHLD_PING, NULL, pong, p);
+ return echld_reqh(chld_id, ECHLD_PING, 0, NULL, pong, p);
}
diff --git a/echld/parent.c b/echld/parent.c
index e74125abf3..53028121bb 100644
--- a/echld/parent.c
+++ b/echld/parent.c
@@ -67,11 +67,13 @@ struct _echld_parent {
parent_decoder_t* dec;
} parent = {NULL,{NULL,0,NULL,-1,NULL,0},-1,-1,1,NULL,0,NULL,NULL};
-#define PARENT_SEND(BYTEARR,CHILDNUM,TYPE) echld_write_frame(parent.dispatcher_fd, BYTEARR, CHILDNUM, TYPE, parent.reqh_id++, NULL)
+
+static int reqh_ids = 1;
+
#ifdef DEBUG_PARENT
-static int dbg_level = 0;
+static int dbg_level = DEBUG_PARENT;
static void parent_dbg(int level, const char* fmt, ...) {
va_list ap;
@@ -88,13 +90,18 @@ static void parent_dbg(int level, const char* fmt, ...) {
}
#define PARENT_DBG(attrs) parent_dbg attrs
+#define PARENT_SEND(BYTEARR,CHILDNUM,TYPE,R_ID) do { long st = echld_write_frame(parent.dispatcher_fd, BYTEARR, CHILDNUM, TYPE, R_ID, NULL); PARENT_DBG((1,"SEND type='%c' chld_id=%d reqh_id=%d msg='%s'",TYPE,CHILDNUM,R_ID, ( st >= 8 ? "ok" : ((st<0)?strerror(errno):"?") ) )); } while(0)
#else
#define PARENT_DBG(attrs)
+#define PARENT_SEND(BYTEARR,CHILDNUM,TYPE,R_ID) echld_write_frame(parent.dispatcher_fd, BYTEARR, CHILDNUM, TYPE, R_ID, NULL)
#endif
extern void echld_set_parent_dbg_level(int lvl) {
(dbg_level = lvl);
+ if (lvl > 6) {
+ echld_common_set_dbg(lvl,stderr);
+ }
PARENT_DBG((0,"Debug Level Set: %d",lvl));
}
@@ -115,31 +122,25 @@ static void parent_fatal(int exit_code, const char* fmt, ...) {
fprintf(stderr,"Fatal error: exit_code=%d str=%s",exit_code,str);
#endif
+ kill(parent.dispatcher_pid,SIGTERM);
exit(exit_code);
}
static void echld_cleanup(void) {
- int i;
- char b[4];
- GByteArray ba;
+ // int i;
- ba.data = b;
- ba.len = 0;
+ PARENT_DBG((4,"echld_cleanup starting"));
- PARENT_DBG((0,"echld_cleanup starting"));
- PARENT_SEND(&ba,0,ECHLD_CLOSE_CHILD);
+ // for (i=0;i<ECHLD_MAX_CHILDREN;i++) {
+ // if ( parent.children[i].handlers ) g_array_free(parent.children[i].handlers,TRUE);
+ // if ( parent.children[i].reqs ) g_array_free(parent.children[i].reqs,TRUE);
+ // };
- do ; while(sleep(1)); /* wait a full sec without signals */
+ // g_free(parent.children);
- for (i=0;i<ECHLD_MAX_CHILDREN;i++) {
- g_array_free(parent.children[i].handlers,TRUE);
- g_array_free(parent.children[i].reqs,TRUE);
- };
+ // g_byte_array_free(parent.snd,TRUE);
- free(parent.children);
- g_byte_array_free(parent.snd,TRUE);
- close(parent.dispatcher_fd);
- PARENT_DBG((0,"echld_cleanup done"));
+ PARENT_DBG((3,"echld_cleanup done"));
}
@@ -169,7 +170,7 @@ void parent_reaper(int sig) {
if (! parent.closing) {
/* crashed */
- PARENT_FATAL((2222,"Dispatcher process dead"));
+ PARENT_FATAL((DISPATCHER_DEAD,"Dispatcher process dead"));
}
return;
@@ -189,64 +190,64 @@ void echld_initialize(echld_encoding_t enc) {
PARENT_DBG((1,"Echld Starting"));
if (enc != ECHLD_ENCODING_JSON) {
- PARENT_FATAL((1111,"Only JSON implemented"));
+ PARENT_FATAL((UNIMPLEMENTED,"Only JSON implemented"));
}
if ( pipe(to_disp) ) {
- PARENT_FATAL((1112,"Failed to open dispatcher pipe"));
+ PARENT_FATAL((DISPATCHER_PIPE_FAILED,"Failed to open pipe to dispatcher"));
} else if( pipe(from_disp) ) {
- PARENT_FATAL((1113,"Failed to open dispatcher pipe"));
+ PARENT_FATAL((DISPATCHER_PIPE_FAILED,"Failed to open pipe from dispatcher"));
} else {
int pid;
int i;
+ PARENT_DBG((3,"Pipes Opened fr[0]=%d fr[1]=%d to[0]=%d to[1]=%d",from_disp[0],from_disp[1],to_disp[0],to_disp[1]));
+
pid = fork();
if ( pid < 0 ) {
- PARENT_FATAL((1114,"Failed to fork() reason='%s'",strerror(errno)));
+ PARENT_FATAL((CANNOT_FORK,"Failed to fork() reason='%s'",strerror(errno)));
} else if ( pid == 0) {
#ifdef PARENT_THREADS
reader_realloc_buf = child_realloc_buff;
#endif
-
- PARENT_DBG((1,"Dispatcher starting"));
/* child code */
- //echld_cleanup();
-
- PARENT_DBG((2,"Dispatcher starting.."));
-
- //echld_dispatcher_start(to_disp,from_disp);
- PARENT_FATAL((1115,"This shoudln't happen"));
+ echld_cleanup();
+ echld_dispatcher_start(to_disp,from_disp);
+ PARENT_FATAL((SHOULD_HAVE_EXITED_BEFORE,"This shoudln't happen"));
} else {
/* parent code */
#ifdef PARENT_THREADS
reader_realloc_buf = parent_realloc_buff;
#endif
+ echld_common_set_dbg(9,stderr);
+
PARENT_DBG((3,"Dispatcher forked"));
echld_get_all_codecs(NULL, NULL, &parent.enc, &parent.dec);
parent.children = g_new0(echld_t,ECHLD_MAX_CHILDREN);
parent.snd = g_byte_array_new();
- parent.dispatcher_fd = to_disp[0];
+ parent.dispatcher_fd = to_disp[1];
+ parent.dispatcher_pid = pid;
- echld_init_reader(&(parent.reader),from_disp[1],4096);
+ echld_init_reader(&(parent.reader),from_disp[0],4096);
- parent.children[0].chld_id = 0;
- parent.children[0].data = NULL;
- parent.children[0].state = IDLE;
- parent.children[0].handlers = g_array_new(TRUE,TRUE,sizeof(hdlr_t));
- for (i=1;i<ECHLD_MAX_CHILDREN;i++) {
+ for (i=0;i<ECHLD_MAX_CHILDREN;i++) {
parent.children[i].chld_id = -1;
parent.children[i].data = NULL;
parent.children[i].state = FREE;
parent.children[i].handlers = g_array_new(TRUE,TRUE,sizeof(hdlr_t));
+ parent.children[i].reqs = g_array_new(TRUE,TRUE,sizeof(reqh_t));
}
+ parent.children[0].chld_id = 0;
+ parent.children[0].state = IDLE;
+
signal(SIGCHLD,parent_reaper);
- close(to_disp[1]);
- close(from_disp[0]);
+ //close(to_disp[0]);
+ //close(from_disp[1]);
PARENT_DBG((3,"Ready"));
}
}
@@ -254,20 +255,18 @@ void echld_initialize(echld_encoding_t enc) {
extern echld_state_t echld_terminate(void) {
+
+ parent.closing = TRUE;
+ PARENT_SEND(NULL,0,ECHLD_CLOSE_CHILD,++reqh_ids);
+
+ do ; while(sleep(1)); /* wait a full sec without signals */
+
echld_cleanup();
+ close(parent.dispatcher_fd);
+ kill(parent.dispatcher_pid,SIGTERM);
return TRUE;
}
-int reqh_id_idx(echld_t* c, int reqh_id) {
- int i;
- int imax = c->reqs->len;
-
- for(i=0; i < imax ; i++) {
- if (((reqh_t*)&g_array_index (c->reqs, reqh_t, i))->reqh_id == reqh_id) return i;
- }
-
- return -1;
-}
@@ -283,30 +282,54 @@ static echld_t* get_child(int id) {
/* send a request */
-static int reqh_ids = 1;
+int reqh_id_idx(echld_t* c, int reqh_id) {
+ int i;
+ int imax = c->reqs->len;
+ reqh_t* rr = (reqh_t*)c->reqs->data;
+
+ for(i=0; i < imax ; i++) {
+ if (rr[i].reqh_id == reqh_id)
+ return i;
+ }
+
+ return -1;
+}
+
static echld_state_t reqh_snd(echld_t* c, echld_msg_type_t t, GByteArray* ba, echld_msg_cb_t resp_cb, void* cb_data) {
- reqh_t req;
+ int idx;
+ reqh_t* r;
+ int reqh_id = reqh_ids++;
if (!c) {
PARENT_DBG((1,"REQH_SND: No such child"));
return 1;
}
- req.reqh_id = reqh_ids++;
- req.cb = resp_cb;
- req.cb_data = cb_data;
- gettimeofday(&(req.tv),NULL);
+ idx = reqh_id_idx(c,-1);
+ if (idx < 0) {
+ reqh_t req;
+ idx = c->reqs->len;
+ g_array_append_val(c->reqs,req);
+ }
+
+ r = &(((reqh_t*)c->reqs->data)[idx]);
+
+ r->reqh_id = reqh_id;
+ r->cb = resp_cb;
+ r->cb_data = cb_data;
+
+ gettimeofday(&(r->tv),NULL);
- g_array_append_val(c->reqs,req);
+ PARENT_DBG((4,"reqh_add: idx='%d'",idx));
- PARENT_DBG((1,"REQH_SND: type='%c' chld_id=%d reqh_id=%d",t,c->chld_id,req.reqh_id));
+ PARENT_DBG((1,"REQH_SND: type='%c' chld_id=%d reqh_id=%d",t, c->chld_id,reqh_id));
- PARENT_SEND(ba,c->chld_id,t);
+ PARENT_SEND(ba,c->chld_id,t,reqh_id);
- if (ba) g_byte_array_free(ba,TRUE);
+ if (ba) g_byte_array_free(ba,TRUE); /* do we? */
- return req.reqh_id;
+ return reqh_id;
}
@@ -699,6 +722,7 @@ static long parent_read_frame(guint8* b, size_t len, echld_chld_id_t chld_id, ec
echld_t* c = get_child(chld_id);
GByteArray* ba = g_byte_array_new();
+ PARENT_DBG((3,"parent_read_frame ch=%d t='%c' rh=%d",chld_id,t,reqh_id));
g_byte_array_append(ba,b, (guint)len);
if (c) {
@@ -708,14 +732,27 @@ static long parent_read_frame(guint8* b, size_t len, echld_chld_id_t chld_id, ec
gboolean go_ahead = TRUE;
if (r) { /* got that reqh_id */
- go_ahead = r->cb ? r->cb(t,ba,r->cb_data) : TRUE;
+ if (r->cb) {
+ go_ahead = r->cb(t,ba,r->cb_data);
+ }
+
+ r->reqh_id = -1;
+ r->cb = NULL;
+ r->cb_data = 0;
+ r->tv.tv_sec = 0;
+ r->tv.tv_usec = 0;
+
+ PARENT_DBG((2,"hanlded by reqh_id=%d msg='%s'",reqh_id,go_ahead?"retrying":"done"));
}
while(go_ahead && ( h = get_next_hdlr_for_type(c,t,&i))) {
- go_ahead = h->cb(t,ba,r->cb_data);
- }
+ if (h->cb)
+ go_ahead = h->cb(t,ba,h->cb_data);
+
+ PARENT_DBG((2,"hanlded by t='%c' msgh_id=%d msg='%s'",h->type, h->id,go_ahead?"retrying":"done"));
+ }
} else {
- /* no such child??? */
+ PARENT_DBG((1,"parent_read_frame: No such child"));
}
g_byte_array_free(ba,TRUE);
@@ -733,31 +770,33 @@ extern int echld_fd_read(fd_set* rfds, fd_set* efds) {
int r_nfds=0;
if (FD_ISSET(parent.reader.fd,efds) || FD_ISSET(parent.dispatcher_fd,efds) ) {
/* Handle errored dispatcher */
- r_nfds--;
+ PARENT_DBG((1,"parent errored"));
return -1;
}
if (FD_ISSET(parent.reader.fd,rfds)) {
- r_nfds++;
+ PARENT_DBG((1,"reading from dispatcher"));
echld_read_frame(&(parent.reader),parent_read_frame,&(parent));
}
return r_nfds;
}
-extern int echld_select(int nfds, fd_set* rfds, fd_set* wfds, fd_set* efds, struct timeval* timeout) {
+extern int echld_select(int nfds _U_, fd_set* rfds, fd_set* wfds, fd_set* efds, struct timeval* timeout) {
fd_set my_rfds, my_wfds, my_efds;
int r_nfds;
+
if (rfds == NULL) { rfds = &my_rfds; FD_ZERO(rfds); }
if (wfds == NULL) { wfds = &my_wfds; FD_ZERO(wfds); }
if (efds == NULL) { efds = &my_efds; FD_ZERO(efds); }
- nfds += echld_fdset(rfds,efds);
+ echld_fdset(rfds,efds);
- r_nfds = select(nfds, rfds, wfds, efds, timeout);
+ PARENT_DBG((2,"Select()"));
+ r_nfds = select(FD_SETSIZE, rfds, wfds, efds, timeout);
- r_nfds += echld_fd_read(rfds,efds);
+ echld_fd_read(rfds,efds);
return r_nfds ;
}