diff options
author | Luis Ontanon <luis.ontanon@gmail.com> | 2013-06-27 03:41:48 +0000 |
---|---|---|
committer | Luis Ontanon <luis.ontanon@gmail.com> | 2013-06-27 03:41:48 +0000 |
commit | e48b0084e19eacc30a2d8a8e274cc284973e0a2c (patch) | |
tree | 4f1f9c467037e08567984dd003687057777ec431 /echld/parent.c | |
parent | ffe6d9c4d6fe36d7f2498154cdb12159b6b60aff (diff) |
MS: Pong from the dispatcher!
svn path=/trunk/; revision=50183
Diffstat (limited to 'echld/parent.c')
-rw-r--r-- | echld/parent.c | 181 |
1 files changed, 110 insertions, 71 deletions
diff --git a/echld/parent.c b/echld/parent.c index e74125abf3..53028121bb 100644 --- a/echld/parent.c +++ b/echld/parent.c @@ -67,11 +67,13 @@ struct _echld_parent { parent_decoder_t* dec; } parent = {NULL,{NULL,0,NULL,-1,NULL,0},-1,-1,1,NULL,0,NULL,NULL}; -#define PARENT_SEND(BYTEARR,CHILDNUM,TYPE) echld_write_frame(parent.dispatcher_fd, BYTEARR, CHILDNUM, TYPE, parent.reqh_id++, NULL) + +static int reqh_ids = 1; + #ifdef DEBUG_PARENT -static int dbg_level = 0; +static int dbg_level = DEBUG_PARENT; static void parent_dbg(int level, const char* fmt, ...) { va_list ap; @@ -88,13 +90,18 @@ static void parent_dbg(int level, const char* fmt, ...) { } #define PARENT_DBG(attrs) parent_dbg attrs +#define PARENT_SEND(BYTEARR,CHILDNUM,TYPE,R_ID) do { long st = echld_write_frame(parent.dispatcher_fd, BYTEARR, CHILDNUM, TYPE, R_ID, NULL); PARENT_DBG((1,"SEND type='%c' chld_id=%d reqh_id=%d msg='%s'",TYPE,CHILDNUM,R_ID, ( st >= 8 ? "ok" : ((st<0)?strerror(errno):"?") ) )); } while(0) #else #define PARENT_DBG(attrs) +#define PARENT_SEND(BYTEARR,CHILDNUM,TYPE,R_ID) echld_write_frame(parent.dispatcher_fd, BYTEARR, CHILDNUM, TYPE, R_ID, NULL) #endif extern void echld_set_parent_dbg_level(int lvl) { (dbg_level = lvl); + if (lvl > 6) { + echld_common_set_dbg(lvl,stderr); + } PARENT_DBG((0,"Debug Level Set: %d",lvl)); } @@ -115,31 +122,25 @@ static void parent_fatal(int exit_code, const char* fmt, ...) { fprintf(stderr,"Fatal error: exit_code=%d str=%s",exit_code,str); #endif + kill(parent.dispatcher_pid,SIGTERM); exit(exit_code); } static void echld_cleanup(void) { - int i; - char b[4]; - GByteArray ba; + // int i; - ba.data = b; - ba.len = 0; + PARENT_DBG((4,"echld_cleanup starting")); - PARENT_DBG((0,"echld_cleanup starting")); - PARENT_SEND(&ba,0,ECHLD_CLOSE_CHILD); + // for (i=0;i<ECHLD_MAX_CHILDREN;i++) { + // if ( parent.children[i].handlers ) g_array_free(parent.children[i].handlers,TRUE); + // if ( parent.children[i].reqs ) g_array_free(parent.children[i].reqs,TRUE); + // }; - do ; while(sleep(1)); /* wait a full sec without signals */ + // g_free(parent.children); - for (i=0;i<ECHLD_MAX_CHILDREN;i++) { - g_array_free(parent.children[i].handlers,TRUE); - g_array_free(parent.children[i].reqs,TRUE); - }; + // g_byte_array_free(parent.snd,TRUE); - free(parent.children); - g_byte_array_free(parent.snd,TRUE); - close(parent.dispatcher_fd); - PARENT_DBG((0,"echld_cleanup done")); + PARENT_DBG((3,"echld_cleanup done")); } @@ -169,7 +170,7 @@ void parent_reaper(int sig) { if (! parent.closing) { /* crashed */ - PARENT_FATAL((2222,"Dispatcher process dead")); + PARENT_FATAL((DISPATCHER_DEAD,"Dispatcher process dead")); } return; @@ -189,64 +190,64 @@ void echld_initialize(echld_encoding_t enc) { PARENT_DBG((1,"Echld Starting")); if (enc != ECHLD_ENCODING_JSON) { - PARENT_FATAL((1111,"Only JSON implemented")); + PARENT_FATAL((UNIMPLEMENTED,"Only JSON implemented")); } if ( pipe(to_disp) ) { - PARENT_FATAL((1112,"Failed to open dispatcher pipe")); + PARENT_FATAL((DISPATCHER_PIPE_FAILED,"Failed to open pipe to dispatcher")); } else if( pipe(from_disp) ) { - PARENT_FATAL((1113,"Failed to open dispatcher pipe")); + PARENT_FATAL((DISPATCHER_PIPE_FAILED,"Failed to open pipe from dispatcher")); } else { int pid; int i; + PARENT_DBG((3,"Pipes Opened fr[0]=%d fr[1]=%d to[0]=%d to[1]=%d",from_disp[0],from_disp[1],to_disp[0],to_disp[1])); + pid = fork(); if ( pid < 0 ) { - PARENT_FATAL((1114,"Failed to fork() reason='%s'",strerror(errno))); + PARENT_FATAL((CANNOT_FORK,"Failed to fork() reason='%s'",strerror(errno))); } else if ( pid == 0) { #ifdef PARENT_THREADS reader_realloc_buf = child_realloc_buff; #endif - - PARENT_DBG((1,"Dispatcher starting")); /* child code */ - //echld_cleanup(); - - PARENT_DBG((2,"Dispatcher starting..")); - - //echld_dispatcher_start(to_disp,from_disp); - PARENT_FATAL((1115,"This shoudln't happen")); + echld_cleanup(); + echld_dispatcher_start(to_disp,from_disp); + PARENT_FATAL((SHOULD_HAVE_EXITED_BEFORE,"This shoudln't happen")); } else { /* parent code */ #ifdef PARENT_THREADS reader_realloc_buf = parent_realloc_buff; #endif + echld_common_set_dbg(9,stderr); + PARENT_DBG((3,"Dispatcher forked")); echld_get_all_codecs(NULL, NULL, &parent.enc, &parent.dec); parent.children = g_new0(echld_t,ECHLD_MAX_CHILDREN); parent.snd = g_byte_array_new(); - parent.dispatcher_fd = to_disp[0]; + parent.dispatcher_fd = to_disp[1]; + parent.dispatcher_pid = pid; - echld_init_reader(&(parent.reader),from_disp[1],4096); + echld_init_reader(&(parent.reader),from_disp[0],4096); - parent.children[0].chld_id = 0; - parent.children[0].data = NULL; - parent.children[0].state = IDLE; - parent.children[0].handlers = g_array_new(TRUE,TRUE,sizeof(hdlr_t)); - for (i=1;i<ECHLD_MAX_CHILDREN;i++) { + for (i=0;i<ECHLD_MAX_CHILDREN;i++) { parent.children[i].chld_id = -1; parent.children[i].data = NULL; parent.children[i].state = FREE; parent.children[i].handlers = g_array_new(TRUE,TRUE,sizeof(hdlr_t)); + parent.children[i].reqs = g_array_new(TRUE,TRUE,sizeof(reqh_t)); } + parent.children[0].chld_id = 0; + parent.children[0].state = IDLE; + signal(SIGCHLD,parent_reaper); - close(to_disp[1]); - close(from_disp[0]); + //close(to_disp[0]); + //close(from_disp[1]); PARENT_DBG((3,"Ready")); } } @@ -254,20 +255,18 @@ void echld_initialize(echld_encoding_t enc) { extern echld_state_t echld_terminate(void) { + + parent.closing = TRUE; + PARENT_SEND(NULL,0,ECHLD_CLOSE_CHILD,++reqh_ids); + + do ; while(sleep(1)); /* wait a full sec without signals */ + echld_cleanup(); + close(parent.dispatcher_fd); + kill(parent.dispatcher_pid,SIGTERM); return TRUE; } -int reqh_id_idx(echld_t* c, int reqh_id) { - int i; - int imax = c->reqs->len; - - for(i=0; i < imax ; i++) { - if (((reqh_t*)&g_array_index (c->reqs, reqh_t, i))->reqh_id == reqh_id) return i; - } - - return -1; -} @@ -283,30 +282,54 @@ static echld_t* get_child(int id) { /* send a request */ -static int reqh_ids = 1; +int reqh_id_idx(echld_t* c, int reqh_id) { + int i; + int imax = c->reqs->len; + reqh_t* rr = (reqh_t*)c->reqs->data; + + for(i=0; i < imax ; i++) { + if (rr[i].reqh_id == reqh_id) + return i; + } + + return -1; +} + static echld_state_t reqh_snd(echld_t* c, echld_msg_type_t t, GByteArray* ba, echld_msg_cb_t resp_cb, void* cb_data) { - reqh_t req; + int idx; + reqh_t* r; + int reqh_id = reqh_ids++; if (!c) { PARENT_DBG((1,"REQH_SND: No such child")); return 1; } - req.reqh_id = reqh_ids++; - req.cb = resp_cb; - req.cb_data = cb_data; - gettimeofday(&(req.tv),NULL); + idx = reqh_id_idx(c,-1); + if (idx < 0) { + reqh_t req; + idx = c->reqs->len; + g_array_append_val(c->reqs,req); + } + + r = &(((reqh_t*)c->reqs->data)[idx]); + + r->reqh_id = reqh_id; + r->cb = resp_cb; + r->cb_data = cb_data; + + gettimeofday(&(r->tv),NULL); - g_array_append_val(c->reqs,req); + PARENT_DBG((4,"reqh_add: idx='%d'",idx)); - PARENT_DBG((1,"REQH_SND: type='%c' chld_id=%d reqh_id=%d",t,c->chld_id,req.reqh_id)); + PARENT_DBG((1,"REQH_SND: type='%c' chld_id=%d reqh_id=%d",t, c->chld_id,reqh_id)); - PARENT_SEND(ba,c->chld_id,t); + PARENT_SEND(ba,c->chld_id,t,reqh_id); - if (ba) g_byte_array_free(ba,TRUE); + if (ba) g_byte_array_free(ba,TRUE); /* do we? */ - return req.reqh_id; + return reqh_id; } @@ -699,6 +722,7 @@ static long parent_read_frame(guint8* b, size_t len, echld_chld_id_t chld_id, ec echld_t* c = get_child(chld_id); GByteArray* ba = g_byte_array_new(); + PARENT_DBG((3,"parent_read_frame ch=%d t='%c' rh=%d",chld_id,t,reqh_id)); g_byte_array_append(ba,b, (guint)len); if (c) { @@ -708,14 +732,27 @@ static long parent_read_frame(guint8* b, size_t len, echld_chld_id_t chld_id, ec gboolean go_ahead = TRUE; if (r) { /* got that reqh_id */ - go_ahead = r->cb ? r->cb(t,ba,r->cb_data) : TRUE; + if (r->cb) { + go_ahead = r->cb(t,ba,r->cb_data); + } + + r->reqh_id = -1; + r->cb = NULL; + r->cb_data = 0; + r->tv.tv_sec = 0; + r->tv.tv_usec = 0; + + PARENT_DBG((2,"hanlded by reqh_id=%d msg='%s'",reqh_id,go_ahead?"retrying":"done")); } while(go_ahead && ( h = get_next_hdlr_for_type(c,t,&i))) { - go_ahead = h->cb(t,ba,r->cb_data); - } + if (h->cb) + go_ahead = h->cb(t,ba,h->cb_data); + + PARENT_DBG((2,"hanlded by t='%c' msgh_id=%d msg='%s'",h->type, h->id,go_ahead?"retrying":"done")); + } } else { - /* no such child??? */ + PARENT_DBG((1,"parent_read_frame: No such child")); } g_byte_array_free(ba,TRUE); @@ -733,31 +770,33 @@ extern int echld_fd_read(fd_set* rfds, fd_set* efds) { int r_nfds=0; if (FD_ISSET(parent.reader.fd,efds) || FD_ISSET(parent.dispatcher_fd,efds) ) { /* Handle errored dispatcher */ - r_nfds--; + PARENT_DBG((1,"parent errored")); return -1; } if (FD_ISSET(parent.reader.fd,rfds)) { - r_nfds++; + PARENT_DBG((1,"reading from dispatcher")); echld_read_frame(&(parent.reader),parent_read_frame,&(parent)); } return r_nfds; } -extern int echld_select(int nfds, fd_set* rfds, fd_set* wfds, fd_set* efds, struct timeval* timeout) { +extern int echld_select(int nfds _U_, fd_set* rfds, fd_set* wfds, fd_set* efds, struct timeval* timeout) { fd_set my_rfds, my_wfds, my_efds; int r_nfds; + if (rfds == NULL) { rfds = &my_rfds; FD_ZERO(rfds); } if (wfds == NULL) { wfds = &my_wfds; FD_ZERO(wfds); } if (efds == NULL) { efds = &my_efds; FD_ZERO(efds); } - nfds += echld_fdset(rfds,efds); + echld_fdset(rfds,efds); - r_nfds = select(nfds, rfds, wfds, efds, timeout); + PARENT_DBG((2,"Select()")); + r_nfds = select(FD_SETSIZE, rfds, wfds, efds, timeout); - r_nfds += echld_fd_read(rfds,efds); + echld_fd_read(rfds,efds); return r_nfds ; } |