aboutsummaryrefslogtreecommitdiffstats
path: root/echld/parent.c
diff options
context:
space:
mode:
authorLuis Ontanon <luis.ontanon@gmail.com>2013-06-27 03:41:48 +0000
committerLuis Ontanon <luis.ontanon@gmail.com>2013-06-27 03:41:48 +0000
commite48b0084e19eacc30a2d8a8e274cc284973e0a2c (patch)
tree4f1f9c467037e08567984dd003687057777ec431 /echld/parent.c
parentffe6d9c4d6fe36d7f2498154cdb12159b6b60aff (diff)
MS: Pong from the dispatcher!
svn path=/trunk/; revision=50183
Diffstat (limited to 'echld/parent.c')
-rw-r--r--echld/parent.c181
1 files changed, 110 insertions, 71 deletions
diff --git a/echld/parent.c b/echld/parent.c
index e74125abf3..53028121bb 100644
--- a/echld/parent.c
+++ b/echld/parent.c
@@ -67,11 +67,13 @@ struct _echld_parent {
parent_decoder_t* dec;
} parent = {NULL,{NULL,0,NULL,-1,NULL,0},-1,-1,1,NULL,0,NULL,NULL};
-#define PARENT_SEND(BYTEARR,CHILDNUM,TYPE) echld_write_frame(parent.dispatcher_fd, BYTEARR, CHILDNUM, TYPE, parent.reqh_id++, NULL)
+
+static int reqh_ids = 1;
+
#ifdef DEBUG_PARENT
-static int dbg_level = 0;
+static int dbg_level = DEBUG_PARENT;
static void parent_dbg(int level, const char* fmt, ...) {
va_list ap;
@@ -88,13 +90,18 @@ static void parent_dbg(int level, const char* fmt, ...) {
}
#define PARENT_DBG(attrs) parent_dbg attrs
+#define PARENT_SEND(BYTEARR,CHILDNUM,TYPE,R_ID) do { long st = echld_write_frame(parent.dispatcher_fd, BYTEARR, CHILDNUM, TYPE, R_ID, NULL); PARENT_DBG((1,"SEND type='%c' chld_id=%d reqh_id=%d msg='%s'",TYPE,CHILDNUM,R_ID, ( st >= 8 ? "ok" : ((st<0)?strerror(errno):"?") ) )); } while(0)
#else
#define PARENT_DBG(attrs)
+#define PARENT_SEND(BYTEARR,CHILDNUM,TYPE,R_ID) echld_write_frame(parent.dispatcher_fd, BYTEARR, CHILDNUM, TYPE, R_ID, NULL)
#endif
extern void echld_set_parent_dbg_level(int lvl) {
(dbg_level = lvl);
+ if (lvl > 6) {
+ echld_common_set_dbg(lvl,stderr);
+ }
PARENT_DBG((0,"Debug Level Set: %d",lvl));
}
@@ -115,31 +122,25 @@ static void parent_fatal(int exit_code, const char* fmt, ...) {
fprintf(stderr,"Fatal error: exit_code=%d str=%s",exit_code,str);
#endif
+ kill(parent.dispatcher_pid,SIGTERM);
exit(exit_code);
}
static void echld_cleanup(void) {
- int i;
- char b[4];
- GByteArray ba;
+ // int i;
- ba.data = b;
- ba.len = 0;
+ PARENT_DBG((4,"echld_cleanup starting"));
- PARENT_DBG((0,"echld_cleanup starting"));
- PARENT_SEND(&ba,0,ECHLD_CLOSE_CHILD);
+ // for (i=0;i<ECHLD_MAX_CHILDREN;i++) {
+ // if ( parent.children[i].handlers ) g_array_free(parent.children[i].handlers,TRUE);
+ // if ( parent.children[i].reqs ) g_array_free(parent.children[i].reqs,TRUE);
+ // };
- do ; while(sleep(1)); /* wait a full sec without signals */
+ // g_free(parent.children);
- for (i=0;i<ECHLD_MAX_CHILDREN;i++) {
- g_array_free(parent.children[i].handlers,TRUE);
- g_array_free(parent.children[i].reqs,TRUE);
- };
+ // g_byte_array_free(parent.snd,TRUE);
- free(parent.children);
- g_byte_array_free(parent.snd,TRUE);
- close(parent.dispatcher_fd);
- PARENT_DBG((0,"echld_cleanup done"));
+ PARENT_DBG((3,"echld_cleanup done"));
}
@@ -169,7 +170,7 @@ void parent_reaper(int sig) {
if (! parent.closing) {
/* crashed */
- PARENT_FATAL((2222,"Dispatcher process dead"));
+ PARENT_FATAL((DISPATCHER_DEAD,"Dispatcher process dead"));
}
return;
@@ -189,64 +190,64 @@ void echld_initialize(echld_encoding_t enc) {
PARENT_DBG((1,"Echld Starting"));
if (enc != ECHLD_ENCODING_JSON) {
- PARENT_FATAL((1111,"Only JSON implemented"));
+ PARENT_FATAL((UNIMPLEMENTED,"Only JSON implemented"));
}
if ( pipe(to_disp) ) {
- PARENT_FATAL((1112,"Failed to open dispatcher pipe"));
+ PARENT_FATAL((DISPATCHER_PIPE_FAILED,"Failed to open pipe to dispatcher"));
} else if( pipe(from_disp) ) {
- PARENT_FATAL((1113,"Failed to open dispatcher pipe"));
+ PARENT_FATAL((DISPATCHER_PIPE_FAILED,"Failed to open pipe from dispatcher"));
} else {
int pid;
int i;
+ PARENT_DBG((3,"Pipes Opened fr[0]=%d fr[1]=%d to[0]=%d to[1]=%d",from_disp[0],from_disp[1],to_disp[0],to_disp[1]));
+
pid = fork();
if ( pid < 0 ) {
- PARENT_FATAL((1114,"Failed to fork() reason='%s'",strerror(errno)));
+ PARENT_FATAL((CANNOT_FORK,"Failed to fork() reason='%s'",strerror(errno)));
} else if ( pid == 0) {
#ifdef PARENT_THREADS
reader_realloc_buf = child_realloc_buff;
#endif
-
- PARENT_DBG((1,"Dispatcher starting"));
/* child code */
- //echld_cleanup();
-
- PARENT_DBG((2,"Dispatcher starting.."));
-
- //echld_dispatcher_start(to_disp,from_disp);
- PARENT_FATAL((1115,"This shoudln't happen"));
+ echld_cleanup();
+ echld_dispatcher_start(to_disp,from_disp);
+ PARENT_FATAL((SHOULD_HAVE_EXITED_BEFORE,"This shoudln't happen"));
} else {
/* parent code */
#ifdef PARENT_THREADS
reader_realloc_buf = parent_realloc_buff;
#endif
+ echld_common_set_dbg(9,stderr);
+
PARENT_DBG((3,"Dispatcher forked"));
echld_get_all_codecs(NULL, NULL, &parent.enc, &parent.dec);
parent.children = g_new0(echld_t,ECHLD_MAX_CHILDREN);
parent.snd = g_byte_array_new();
- parent.dispatcher_fd = to_disp[0];
+ parent.dispatcher_fd = to_disp[1];
+ parent.dispatcher_pid = pid;
- echld_init_reader(&(parent.reader),from_disp[1],4096);
+ echld_init_reader(&(parent.reader),from_disp[0],4096);
- parent.children[0].chld_id = 0;
- parent.children[0].data = NULL;
- parent.children[0].state = IDLE;
- parent.children[0].handlers = g_array_new(TRUE,TRUE,sizeof(hdlr_t));
- for (i=1;i<ECHLD_MAX_CHILDREN;i++) {
+ for (i=0;i<ECHLD_MAX_CHILDREN;i++) {
parent.children[i].chld_id = -1;
parent.children[i].data = NULL;
parent.children[i].state = FREE;
parent.children[i].handlers = g_array_new(TRUE,TRUE,sizeof(hdlr_t));
+ parent.children[i].reqs = g_array_new(TRUE,TRUE,sizeof(reqh_t));
}
+ parent.children[0].chld_id = 0;
+ parent.children[0].state = IDLE;
+
signal(SIGCHLD,parent_reaper);
- close(to_disp[1]);
- close(from_disp[0]);
+ //close(to_disp[0]);
+ //close(from_disp[1]);
PARENT_DBG((3,"Ready"));
}
}
@@ -254,20 +255,18 @@ void echld_initialize(echld_encoding_t enc) {
extern echld_state_t echld_terminate(void) {
+
+ parent.closing = TRUE;
+ PARENT_SEND(NULL,0,ECHLD_CLOSE_CHILD,++reqh_ids);
+
+ do ; while(sleep(1)); /* wait a full sec without signals */
+
echld_cleanup();
+ close(parent.dispatcher_fd);
+ kill(parent.dispatcher_pid,SIGTERM);
return TRUE;
}
-int reqh_id_idx(echld_t* c, int reqh_id) {
- int i;
- int imax = c->reqs->len;
-
- for(i=0; i < imax ; i++) {
- if (((reqh_t*)&g_array_index (c->reqs, reqh_t, i))->reqh_id == reqh_id) return i;
- }
-
- return -1;
-}
@@ -283,30 +282,54 @@ static echld_t* get_child(int id) {
/* send a request */
-static int reqh_ids = 1;
+int reqh_id_idx(echld_t* c, int reqh_id) {
+ int i;
+ int imax = c->reqs->len;
+ reqh_t* rr = (reqh_t*)c->reqs->data;
+
+ for(i=0; i < imax ; i++) {
+ if (rr[i].reqh_id == reqh_id)
+ return i;
+ }
+
+ return -1;
+}
+
static echld_state_t reqh_snd(echld_t* c, echld_msg_type_t t, GByteArray* ba, echld_msg_cb_t resp_cb, void* cb_data) {
- reqh_t req;
+ int idx;
+ reqh_t* r;
+ int reqh_id = reqh_ids++;
if (!c) {
PARENT_DBG((1,"REQH_SND: No such child"));
return 1;
}
- req.reqh_id = reqh_ids++;
- req.cb = resp_cb;
- req.cb_data = cb_data;
- gettimeofday(&(req.tv),NULL);
+ idx = reqh_id_idx(c,-1);
+ if (idx < 0) {
+ reqh_t req;
+ idx = c->reqs->len;
+ g_array_append_val(c->reqs,req);
+ }
+
+ r = &(((reqh_t*)c->reqs->data)[idx]);
+
+ r->reqh_id = reqh_id;
+ r->cb = resp_cb;
+ r->cb_data = cb_data;
+
+ gettimeofday(&(r->tv),NULL);
- g_array_append_val(c->reqs,req);
+ PARENT_DBG((4,"reqh_add: idx='%d'",idx));
- PARENT_DBG((1,"REQH_SND: type='%c' chld_id=%d reqh_id=%d",t,c->chld_id,req.reqh_id));
+ PARENT_DBG((1,"REQH_SND: type='%c' chld_id=%d reqh_id=%d",t, c->chld_id,reqh_id));
- PARENT_SEND(ba,c->chld_id,t);
+ PARENT_SEND(ba,c->chld_id,t,reqh_id);
- if (ba) g_byte_array_free(ba,TRUE);
+ if (ba) g_byte_array_free(ba,TRUE); /* do we? */
- return req.reqh_id;
+ return reqh_id;
}
@@ -699,6 +722,7 @@ static long parent_read_frame(guint8* b, size_t len, echld_chld_id_t chld_id, ec
echld_t* c = get_child(chld_id);
GByteArray* ba = g_byte_array_new();
+ PARENT_DBG((3,"parent_read_frame ch=%d t='%c' rh=%d",chld_id,t,reqh_id));
g_byte_array_append(ba,b, (guint)len);
if (c) {
@@ -708,14 +732,27 @@ static long parent_read_frame(guint8* b, size_t len, echld_chld_id_t chld_id, ec
gboolean go_ahead = TRUE;
if (r) { /* got that reqh_id */
- go_ahead = r->cb ? r->cb(t,ba,r->cb_data) : TRUE;
+ if (r->cb) {
+ go_ahead = r->cb(t,ba,r->cb_data);
+ }
+
+ r->reqh_id = -1;
+ r->cb = NULL;
+ r->cb_data = 0;
+ r->tv.tv_sec = 0;
+ r->tv.tv_usec = 0;
+
+ PARENT_DBG((2,"hanlded by reqh_id=%d msg='%s'",reqh_id,go_ahead?"retrying":"done"));
}
while(go_ahead && ( h = get_next_hdlr_for_type(c,t,&i))) {
- go_ahead = h->cb(t,ba,r->cb_data);
- }
+ if (h->cb)
+ go_ahead = h->cb(t,ba,h->cb_data);
+
+ PARENT_DBG((2,"hanlded by t='%c' msgh_id=%d msg='%s'",h->type, h->id,go_ahead?"retrying":"done"));
+ }
} else {
- /* no such child??? */
+ PARENT_DBG((1,"parent_read_frame: No such child"));
}
g_byte_array_free(ba,TRUE);
@@ -733,31 +770,33 @@ extern int echld_fd_read(fd_set* rfds, fd_set* efds) {
int r_nfds=0;
if (FD_ISSET(parent.reader.fd,efds) || FD_ISSET(parent.dispatcher_fd,efds) ) {
/* Handle errored dispatcher */
- r_nfds--;
+ PARENT_DBG((1,"parent errored"));
return -1;
}
if (FD_ISSET(parent.reader.fd,rfds)) {
- r_nfds++;
+ PARENT_DBG((1,"reading from dispatcher"));
echld_read_frame(&(parent.reader),parent_read_frame,&(parent));
}
return r_nfds;
}
-extern int echld_select(int nfds, fd_set* rfds, fd_set* wfds, fd_set* efds, struct timeval* timeout) {
+extern int echld_select(int nfds _U_, fd_set* rfds, fd_set* wfds, fd_set* efds, struct timeval* timeout) {
fd_set my_rfds, my_wfds, my_efds;
int r_nfds;
+
if (rfds == NULL) { rfds = &my_rfds; FD_ZERO(rfds); }
if (wfds == NULL) { wfds = &my_wfds; FD_ZERO(wfds); }
if (efds == NULL) { efds = &my_efds; FD_ZERO(efds); }
- nfds += echld_fdset(rfds,efds);
+ echld_fdset(rfds,efds);
- r_nfds = select(nfds, rfds, wfds, efds, timeout);
+ PARENT_DBG((2,"Select()"));
+ r_nfds = select(FD_SETSIZE, rfds, wfds, efds, timeout);
- r_nfds += echld_fd_read(rfds,efds);
+ echld_fd_read(rfds,efds);
return r_nfds ;
}