diff --git a/include/sntl/connection.h b/include/sntl/connection.h index 1f87dd9..79b6e8c 100644 --- a/include/sntl/connection.h +++ b/include/sntl/connection.h @@ -26,6 +26,9 @@ #define SXOREPLYREQ 44 /* protocol require reply with expression, * or expression return for the request */ #define SXOTIMEDOUT 45 /* timedout */ +#define ESXRCBADPROT 46 /* invalid protocol */ +#define ESXNOCONNECT 47 /* connection is lost */ +#define ESXNOCHANSUP 48 /* sexp helpers */ #define SEXP_IS_LIST(sx) \ @@ -85,6 +88,7 @@ typedef struct __connection_t { pthread_t msgthread; /** < thread for message queue (2) */ pth_queue_t *mqueue; /** < message queue (2) */ pth_queue_t *rqueue; /** < message queue (1) */ + pth_dqtpoll_t *tpoll; /** < thread poll for rpc requests */ pthread_mutex_t oplock; /** < mutex used to sync operations on connection */ pthread_rwlock_t chnl_lock; /** < rwlock used to sync ops with channels */ int flags; /** < flags of the connection */ diff --git a/lib/connection.c b/lib/connection.c index 4c729d5..50db287 100644 --- a/lib/connection.c +++ b/lib/connection.c @@ -27,6 +27,13 @@ #include +struct __rpc_job +{ + sxmsg_t *msg; + sexp_t *sx; + int (*rpcf) (void *, sexp_t *); +}; + conn_sys_t *conn_sys = NULL; static long __cmp_ulong(const void *a, const void *b); @@ -35,13 +42,20 @@ static long __cmp_ulong(const void *a, const void *b); static sxmsg_t *__allocate_msg(int *res); static void __destroy_msg(sxmsg_t *msg); +static int __rpc_callback(void *data) +{ + struct __rpc_job *job = (struct __rpc_job *)data; + + return job->rpcf((void *)(job->msg), job->sx); +} + int __alloc_channel(ulong_t cid, conn_t *co, rpc_typed_list_t *rlist, chnl_t **channel) { int r = 0; chnl_t *ch = malloc(sizeof(chnl_t)); usrtc_t *msg_tree = malloc(sizeof(usrtc_t)); idx_allocator_t *idx_msg = malloc(sizeof(idx_allocator_t)); - + if(!idx_msg) goto __fin_enomem; else if(idx_allocator_init(idx_msg, MAX_MSGINDEX, 0)) goto __fin_enomem; @@ -76,7 +90,7 @@ int __alloc_channel(ulong_t cid, conn_t *co, rpc_typed_list_t *rlist, chnl_t **c if(idx_msg) free(idx_msg); if(ch) free(ch); if(msg_tree) free(msg_tree); - return ENOMEM; + return r; } else { *channel = ch; return 0; @@ -87,6 +101,7 @@ static int __conn_read(conn_t *co, void *buf, size_t buf_len) { int rfd = SSL_get_fd(co->ssl), r; fd_set readset; + fprintf(stderr, "\tListening ... on %s\n", co->uuid); /* get prepare to select */ FD_ZERO(&readset); @@ -224,6 +239,37 @@ static int __insert_rpc_function(usrtc_t *tree, const char *name, int (*rpcf)(vo return 0; } +static void __wake_up_waiters(conn_t *co, int opcode) +{ + usrtc_node_t *node = NULL, *last_node = NULL; + usrtc_node_t *msg_node = NULL, *last_msg_node = NULL; + chnl_t *ch; + sxmsg_t *smsg = NULL; + + pthread_rwlock_wrlock(&(co->chnl_lock)); + + node = usrtc_first(co->chnl_tree); + last_node = usrtc_last(co->chnl_tree); + while(!usrtc_isempty(co->chnl_tree)) { + ch = (chnl_t *)usrtc_node_getdata(node); + pthread_rwlock_rdlock(&(ch->msglock)); + msg_node = usrtc_first(ch->msgs_tree); + last_msg_node = usrtc_last(ch->msgs_tree); + while(!usrtc_isempty(ch->msgs_tree)) { + smsg = (sxmsg_t *)usrtc_node_getdata(msg_node); + smsg->opcode = opcode; + pthread_mutex_unlock(&(smsg->wait)); + if(msg_node == last_msg_node) break; + msg_node = usrtc_next(ch->msgs_tree, msg_node); + } + pthread_rwlock_unlock(&(ch->msglock)); + if(node == last_node) break; + node = usrtc_next(co->chnl_tree, node); + } + + pthread_rwlock_unlock(&(co->chnl_lock)); +} + static int __default_auth_set_context(void *cctx, sexp_t *sx) { conn_t *co = (conn_t *)cctx; @@ -237,8 +283,8 @@ static int __default_auth_set_context(void *cctx, sexp_t *sx) lsx = sx->list->next; /* now we expect a list of lists */ if(lsx->ty != SEXP_LIST) { - /* TODO: return error code ! via rpc */ - return EINVAL; + err = ESXRCBADPROT; + goto __reply; } /* take length of the list */ llen = sexp_list_length(lsx); @@ -248,21 +294,23 @@ static int __default_auth_set_context(void *cctx, sexp_t *sx) sexp_list_car(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { - return EINVAL; /* TODO: return correct error code, clean up*/ + err = ESXRCBADPROT; + goto __reply; } else val = sx_in->val; if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */ sexp_list_cdr(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_DQUOTE)) { - return EINVAL; /* TODO: return correct error code, clean up*/ + err = ESXRCBADPROT; + goto __reply; } else var = sx_in->val; /* ok, now we need to analyze parameters */ - if(!strcmp(val, ":user")) { - co->pctx->login = strdup(var); /* FIXME: check */ - } else if(!strcmp(val, ":passwd")) { - co->pctx->passwd = strdup(var); /* FIXME: check */ + if(!strcmp(val, ":user") && var) { + co->pctx->login = strdup(var); + } else if(!strcmp(val, ":passwd") && var) { + co->pctx->passwd = strdup(var); } else { /* just ignore in default implementation */ } @@ -273,12 +321,8 @@ static int __default_auth_set_context(void *cctx, sexp_t *sx) tbuf = malloc(2048); if(conn_sys->secure_check) err = conn_sys->secure_check(co); - else { /* FIXME: remove this ! */ - /* just for tests */ - err = 0; - co->pctx->p_attr = 256; co->pctx->uid = 12; co->pctx->gid = 34; - /* end tests */ - } + +__reply: if(err) { snprintf(tbuf, 2048, "(auth-set-error (%d))", err); } else { @@ -286,10 +330,15 @@ static int __default_auth_set_context(void *cctx, sexp_t *sx) co->pctx->p_attr, co->pctx->uid, co->pctx->gid); } /* we will send it */ - SSL_write(co->ssl, tbuf, strlen(tbuf)+1); /* FIXME: check it */ + if(__conn_write(co, tbuf, strlen(tbuf)) < 0) { + co->flags &= ~CXCONN_ESTABL; + __wake_up_waiters(co, ESXNOCONNECT); + } + destroy_sexp(sx); free(tbuf); return err; + } static int __default_auth_set_attr(void *cctx, sexp_t *sx) @@ -297,14 +346,15 @@ static int __default_auth_set_attr(void *cctx, sexp_t *sx) conn_t *co = (conn_t *)cctx; char *val, *var; sexp_t *lsx, *sx_iter, *sx_in; - int llen, idx; + int llen, idx, r = 0; /* skip keyword itself */ lsx = sx->list->next; /* now we expect a list of lists */ if(lsx->ty != SEXP_LIST) { // printf("%s:%d\n", __FUNCTION__, __LINE__); - return EINVAL; + r = ESXRCBADPROT; + goto __finish; } /* take length of the list */ llen = sexp_list_length(lsx); @@ -314,7 +364,8 @@ static int __default_auth_set_attr(void *cctx, sexp_t *sx) sexp_list_car(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { - return EINVAL; /* TODO: return correct error code, clean up*/ + r = ESXRCBADPROT; + goto __finish; } else val = sx_in->val; if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */ @@ -322,7 +373,8 @@ static int __default_auth_set_attr(void *cctx, sexp_t *sx) sexp_list_cdr(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { - return EINVAL; /* TODO: return correct error code, clean up*/ + r = ESXRCBADPROT; + goto __finish; } else var = sx_in->val; /* ok, now we need to analyze parameters */ @@ -338,7 +390,9 @@ static int __default_auth_set_attr(void *cctx, sexp_t *sx) } else continue; /* ignore */ } - return 0; +__finish: + destroy_sexp(sx); + return r; } static int __default_auth_set_error(void *cctx, sexp_t *sx) @@ -349,11 +403,12 @@ static int __default_auth_set_error(void *cctx, sexp_t *sx) /* skip keyword itself */ sx->list = sx->list->next; /* be sure - this is a list */ - if(sx->ty != SEXP_LIST) return EINVAL; + if(sx->ty != SEXP_LIST) return ESXRCBADPROT; else sx = sx->list; /* get it */ errstr = sx->list->val; r = atoi(errstr); + destroy_sexp(sx); return r; } @@ -386,9 +441,12 @@ static int __default_ch_get_types(void *cctx, sexp_t *sx) } /* reply to this rpc */ - SSL_write(co->ssl, tbuf, strlen(tbuf)+sizeof(char)); /* FIXME: do checks */ - + if(__conn_write(co, tbuf, strlen(tbuf)) < 0) { + co->flags &= ~CXCONN_ESTABL; + __wake_up_waiters(co, ESXNOCONNECT); + } free(tbuf); + destroy_sexp(sx); return err; } @@ -404,8 +462,8 @@ static int __default_ch_set_types(void *cctx, sexp_t *sx) lsx = sx->list->next; /* now we expect a list of lists */ if(lsx->ty != SEXP_LIST) { - printf("%s:%d\n", __FUNCTION__, __LINE__); - return EINVAL; + r = ESXRCBADPROT; + goto __send_reply; } /* take length of the list */ llen = sexp_list_length(lsx); @@ -415,7 +473,8 @@ static int __default_ch_set_types(void *cctx, sexp_t *sx) sexp_list_car(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { - return EINVAL; /* TODO: return correct error code, clean up*/ + r = ESXRCBADPROT; + goto __send_reply; } else val = sx_in->val; if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */ @@ -423,27 +482,35 @@ static int __default_ch_set_types(void *cctx, sexp_t *sx) sexp_list_cdr(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_DQUOTE)) { - return EINVAL; /* TODO: return correct error code, clean up*/ + r = ESXRCBADPROT; + goto __send_reply; } else var = sx_in->val; /* ok, now we need to analyze parameters */ if(*val != ':') { - return EINVAL; /* TODO: clean up all the shit */ + r = ESXRCBADPROT; + goto __send_reply; } else { if(conn_sys->set_typed_list_callback) { typeid = atoi((char *)(val + sizeof(char))); if(conn_sys->set_typed_list_callback(co, typeid, var)) { - return ENXIO; /* TODO: clean up all the stuff */ + destroy_sexp(sx); + return ENXIO; } } /* FIXME: if no function, accept or decline ? */ } } else continue; /* ignore */ } +__send_reply: snprintf(buf, 1024, "(ch-gl-error (%d))", r); - SSL_write(co->ssl, buf, strlen(buf) + 1); - + if(__conn_write(co, buf, strlen(buf)) < 0) { + co->flags &= ~CXCONN_ESTABL; + __wake_up_waiters(co, ESXNOCONNECT); + } + destroy_sexp(sx); return r; + } static int __default_ch_gl_error(void *cctx, sexp_t *sx) @@ -457,7 +524,7 @@ static int __default_ch_gl_error(void *cctx, sexp_t *sx) /* skip keyword itself */ sx->list = sx->list->next; /* be sure - this is a list */ - if(sx->ty != SEXP_LIST) return EINVAL; + if(sx->ty != SEXP_LIST) return ESXRCBADPROT; else sx = sx->list; /* get it */ errstr = sx->list->val; r = atoi(errstr); @@ -483,7 +550,7 @@ static int __default_ch_open(void *cctx, sexp_t *sx) /* now we expect a list of lists */ if(lsx->ty != SEXP_LIST) { printf("%s:%d\n", __FUNCTION__, __LINE__); - r = EINVAL; + r = ESXRCBADPROT; goto __send_repl; } @@ -496,7 +563,7 @@ static int __default_ch_open(void *cctx, sexp_t *sx) if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { printf("%s:%d\n", __FUNCTION__, __LINE__); - r = EINVAL; /* TODO: return correct error code, clean up*/ + r = ESXRCBADPROT; goto __send_repl; } else val = sx_in->val; @@ -505,14 +572,14 @@ static int __default_ch_open(void *cctx, sexp_t *sx) sexp_list_cdr(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { - r = EINVAL; /* TODO: return correct error code, clean up*/ + r = ESXRCBADPROT; printf("%s:%d\n", __FUNCTION__, __LINE__); goto __send_repl; } else var = sx_in->val; /* ok, now we need to analyze parameters */ if(*val != ':') { - r = EINVAL; /* TODO: clean up all the shit */ + r = ESXRCBADPROT; goto __send_repl; } else { if(!strcmp((char *)(val + sizeof(char)), "type")) @@ -528,7 +595,7 @@ static int __default_ch_open(void *cctx, sexp_t *sx) /* additional check for type of the channel */ node = usrtc_lookup(co->rpc_list, &typ); if(!node) { - r = EINVAL; /* FIXME: should be ESXNOCHANSUP */ + r = ESXNOCHANSUP; /* printf("%s:%d (usrtc count: %d) (typ %d)\n", __FUNCTION__, __LINE__, usrtc_count(co->rpc_list), typ);*/ node = usrtc_first(co->rpc_list); @@ -565,7 +632,10 @@ static int __default_ch_open(void *cctx, sexp_t *sx) buf = malloc(2048); snprintf(buf, 2048, "(ch-open-ret ((:error %d)(:uuid %s)(:id %ld)))", r, uuid, cid); - __conn_write(co, buf, strlen(buf)); + if(__conn_write(co, buf, strlen(buf)) < 0) { + co->flags &= ~CXCONN_ESTABL; + __wake_up_waiters(co, ESXNOCONNECT); + } destroy_sexp(sx); free(buf); @@ -579,7 +649,7 @@ static int __default_ch_open_ret(void *cctx, sexp_t *sx) usrtc_node_t *node; int err = 0, r, llen, idx; ulong_t id; - char *uuid = NULL, *val, *var; + char *val, *var; sexp_t *lsx, *sx_iter, *sx_in; sxmsg_t *sms = NULL; @@ -588,18 +658,18 @@ static int __default_ch_open_ret(void *cctx, sexp_t *sx) /* now we expect a list of lists */ if(lsx->ty != SEXP_LIST) { //printf("%s:%d\n", __FUNCTION__, __LINE__); - r = EINVAL; /* TODO: right opcode */ + r = ESXRCBADPROT; goto __mark_msg; } /* take length of the list */ llen = sexp_list_length(lsx); - if(!llen) return EINVAL; /* TODO: !! other side will not set any security attributes */ + if(!llen) return EINVAL; /* !! other side will not set any security attributes */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { sexp_list_car(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { - r = EINVAL; /* TODO: return correct error code, clean up*/ + r = ESXRCBADPROT; goto __mark_msg; } else val = sx_in->val; @@ -608,21 +678,19 @@ static int __default_ch_open_ret(void *cctx, sexp_t *sx) sexp_list_cdr(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { - r = EINVAL; /* TODO: return correct error code, clean up*/ + r = ESXRCBADPROT; goto __mark_msg; } else var = sx_in->val; /* ok, now we need to analyze parameters */ if(*val != ':') { - r = EINVAL; /* TODO: clean up all the shit */ + r = ESXRCBADPROT; goto __mark_msg; } else { if(!strcmp((char *)(val + sizeof(char)), "error")) err = atoi(var); else if(!strcmp((char *)(val + sizeof(char)), "id")) id = atoll(var); - else if(!strcmp((char *)(val + sizeof(char)), "uuid")) - uuid = var; } } else continue; /* ignore */ } @@ -658,7 +726,7 @@ static int __default_ch_close(void *cctx, sexp_t *sx) int idx, llen, r; ulong_t cid = -1; sexp_t *lsx, *sx_iter, *sx_in; - chnl_t *channel; + chnl_t *channel = NULL; r = 0; /* skip keyword itself */ @@ -666,7 +734,7 @@ static int __default_ch_close(void *cctx, sexp_t *sx) /* now we expect a list of lists */ if(lsx->ty != SEXP_LIST) { printf("%s:%d\n", __FUNCTION__, __LINE__); - r = EINVAL; + r = ESXRCBADPROT; goto __send_repl; } @@ -679,21 +747,21 @@ static int __default_ch_close(void *cctx, sexp_t *sx) } if(!SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { printf("%s:%d\n", __FUNCTION__, __LINE__); - r = EINVAL; /* TODO: return correct error code, clean up*/ + r = ESXRCBADPROT; goto __send_repl; } else val = sx_iter->val; sx_in = sx_iter->next; if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { - r = EINVAL; /* TODO: return correct error code, clean up*/ + r = ESXRCBADPROT; printf("%s:%d\n", __FUNCTION__, __LINE__); goto __send_repl; } else var = sx_in->val; /* ok, now we need to analyze parameters */ if(*val != ':') { - r = EINVAL; /* TODO: clean up all the shit */ + r = ESXRCBADPROT; goto __send_repl; } else { if(!strcmp((char *)(val + sizeof(char)), "id")) { @@ -724,8 +792,11 @@ __send_repl: /* check up the message queue */ pthread_rwlock_rdlock(&(channel->msglock)); if(usrtc_count(channel->msgs_tree)) { - /* destroy all */ - /* TODO: make a verbose check, because the client cannot be honest :) (medium) */ + fprintf(stderr, "Operation is not permitted. There are some " + "undelivered messages in the message tree"); + free(buf); + destroy_sexp(sx); + return EPERM; } pthread_rwlock_unlock(&(channel->msglock)); @@ -744,7 +815,10 @@ __send_repl: free(channel); destroy_sexp(sx); - __conn_write(co, buf, strlen(buf)); + if(__conn_write(co, buf, strlen(buf)) < 0) { + co->flags &= ~CXCONN_ESTABL; + __wake_up_waiters(co, ESXNOCONNECT); + } free(buf); return 0; @@ -767,19 +841,19 @@ static int __default_ch_close_ret(void *cctx, sexp_t *sx) /* now we expect a list of lists */ if(lsx->ty != SEXP_LIST) { //printf("%s:%d\n", __FUNCTION__, __LINE__); - r = EINVAL; /* TODO: right opcode */ + r = ESXRCBADPROT; goto __mark_msg; } /* take length of the list */ llen = sexp_list_length(lsx); - if(!llen) return EINVAL; /* TODO: !! other side will not set any security attributes */ + if(!llen) return EINVAL; /* !! other side will not set any security attributes */ ; SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { sexp_list_car(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { - r = EINVAL; /* TODO: return correct error code, clean up*/ + r = ESXRCBADPROT; goto __mark_msg; } else val = sx_in->val; @@ -788,13 +862,13 @@ static int __default_ch_close_ret(void *cctx, sexp_t *sx) sexp_list_cdr(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { - r = EINVAL; /* TODO: return correct error code, clean up*/ + r = ESXRCBADPROT; goto __mark_msg; } else var = sx_in->val; /* ok, now we need to analyze parameters */ if(*val != ':') { - r = EINVAL; /* TODO: clean up all the shit */ + r = ESXRCBADPROT; goto __mark_msg; } else { if(!strcmp((char *)(val + sizeof(char)), "error")) @@ -852,9 +926,110 @@ static int __create_reg_msg_mould(sxmsg_t **msg, chnl_t *ch, ulong_t mid) return 0; } +// TODO: check and continue static int __default_msg_pulse(void *cctx, sexp_t *sx) { - return 0; + conn_t *co = (conn_t *)cctx; + usrtc_node_t *node = NULL; + chnl_t *chan = NULL; + int r = 0; + sexp_t *lsx = NULL, *sx_iter = NULL; + sexp_t *sx_sublist = NULL, *sx_value = NULL; + ulong_t chnl_id = -1; + ulong_t msg_id = -1; + sexp_t *msg = NULL; + sxmsg_t *smsg = NULL; + int idx; + + /* get parameters from the message */ + if(sexp_list_cdr(sx, &lsx)) return ESXRCBADPROT; + if(!SEXP_IS_LIST(lsx)) return ESXRCBADPROT; + + /* find channel id */ + SEXP_ITERATE_LIST(lsx, sx_iter, idx) { + if(SEXP_IS_LIST(sx_iter)) { + sx_sublist = sx_iter; + continue; + } else { + if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { + if(strcmp(sx_iter->val, ":chid")) { + continue; // ignore it + } + sx_value = sx_iter->next; + if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { + continue; + } + chnl_id = atol(sx_value->val); + } else continue; // ignore it + } + } + lsx = sx_sublist; + /* find message id */ + SEXP_ITERATE_LIST(lsx, sx_iter, idx) { + if(SEXP_IS_LIST(sx_iter)) { + msg = sx_iter; + continue; + } else { + if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { + if(strcmp(sx_iter->val, ":msgid")) { + continue; // ignore + } + sx_value = sx_iter->next; + if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { + continue; + } + msg_id = atol(sx_value->val); + } else continue; // ignore it + } + } + + if(msg_id < 0 || chnl_id < 0) { + return ESXRCBADPROT; + } + /* find channel */ + if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT; + else chan = (chnl_t *)usrtc_node_getdata(node); + /* lookup for the message */ + pthread_rwlock_rdlock(&(chan->msglock)); + if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) { + pthread_rwlock_unlock(&(chan->msglock)); + /* here, rpc lookup has no sense, just put this job to the queue */ + /* btw, we're need to create a message first */ + r = __create_reg_msg_mould(&smsg, chan, msg_id); + if(r) return r; + + /* assign the message */ + smsg->opcode = 0; + smsg->payload = (void *)msg; + + /* assign initial S-expression structure */ + smsg->initial_sx = sx; + + /* put the message to the search tree */ + pthread_rwlock_wrlock(&(chan->msglock)); + usrtc_insert(chan->msgs_tree, &(smsg->pendingq_node), &(smsg->mid)); + pthread_rwlock_unlock(&(chan->msglock)); + } else { + //printf(">>>>>>>>>>>>>>>>>>>msg_id = %lu\n", msg_id); + pthread_rwlock_unlock(&(chan->msglock)); + smsg = (sxmsg_t *)usrtc_node_getdata(node); + msg_return(smsg, EEXIST); + return EEXIST; + } + /* put job to the queue and give up */ + r = pth_queue_add(co->rqueue, (void *)smsg, USR_MSG); + if(r) { /* cannot put job to the queue */ + pthread_rwlock_wrlock(&(chan->msglock)); + usrtc_delete(chan->msgs_tree, &(smsg->pendingq_node)); + pthread_rwlock_unlock(&(chan->msglock)); + __destroy_msg(smsg); + return r; + } + + //msg_return(smsg, r); + + /* put to the IN queue */ + return r; } static int __default_msg_pulse_ret(void *cctx, sexp_t *sx) @@ -877,8 +1052,8 @@ static int __default_msg(void *cctx, sexp_t *sx) int idx; /* get parameters from the message */ - if(sexp_list_cdr(sx, &lsx)) return EINVAL; - if(!SEXP_IS_LIST(lsx)) return EINVAL; + if(sexp_list_cdr(sx, &lsx)) return ESXRCBADPROT; + if(!SEXP_IS_LIST(lsx)) return ESXRCBADPROT; /* find channel id */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { @@ -919,7 +1094,7 @@ static int __default_msg(void *cctx, sexp_t *sx) } if(msg_id < 0 || chnl_id < 0) { - return EINVAL; + return ESXRCBADPROT; } /* find channel */ if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT; @@ -931,7 +1106,7 @@ static int __default_msg(void *cctx, sexp_t *sx) /* here, rpc lookup has no sense, just put this job to the queue */ /* btw, we're need to create a message first */ r = __create_reg_msg_mould(&smsg, chan, msg_id); - if(r) return r; /* TODO: reply with right error code */ + if(r) return r; /* assign the message */ smsg->opcode = 0; @@ -951,11 +1126,14 @@ static int __default_msg(void *cctx, sexp_t *sx) msg_return(smsg, EEXIST); return EEXIST; } - /* put job to the queue and give up */ r = pth_queue_add(co->rqueue, (void *)smsg, USR_MSG); if(r) { /* cannot put job to the queue */ - /* TODO: remove message and reply with error code */ + msg_return(smsg, r); + pthread_rwlock_wrlock(&(chan->msglock)); + usrtc_delete(chan->msgs_tree, &(smsg->pendingq_node)); + pthread_rwlock_unlock(&(chan->msglock)); + __destroy_msg(smsg); return r; } @@ -984,8 +1162,14 @@ static int __default_msg_return(void *cctx, sexp_t *sx) int idx, opcode; /* get parameters from the message */ - if(sexp_list_cdr(sx, &lsx)) return EINVAL; - if(!SEXP_IS_LIST(lsx)) return EINVAL; + if(sexp_list_cdr(sx, &lsx)) { + r = ESXRCBADPROT; + goto __finish; + } + if(!SEXP_IS_LIST(lsx)) { + r = ESXRCBADPROT; + goto __finish; + } /* get parameters */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { @@ -1029,7 +1213,8 @@ static int __default_msg_return(void *cctx, sexp_t *sx) opcode = atoi(lsx->val); if(msg_id < 0 || chnl_id < 0) { - return EINVAL; + r = ESXRCBADPROT; + goto __finish; } //printf("chnl_id = %ld\n", chnl_id); if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT; @@ -1038,17 +1223,24 @@ static int __default_msg_return(void *cctx, sexp_t *sx) pthread_rwlock_rdlock(&(chan->msglock)); if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) { pthread_rwlock_unlock(&(chan->msglock)); - /* TODO: return gently an opcode about absent message */ + r = ENOENT; + goto __finish; } else { pthread_rwlock_unlock(&(chan->msglock)); smsg = (sxmsg_t *)usrtc_node_getdata(node); + pthread_rwlock_wrlock(&(chan->msglock)); + idx_free(chan->idx_msg, msg_id); + usrtc_delete(chan->msgs_tree, node); + pthread_rwlock_unlock(&(chan->msglock)); + //smsg = (sxmsg_t *)usrtc_node_getdata(node); smsg->opcode = opcode; - destroy_sexp(sx); smsg->payload = NULL; pthread_mutex_unlock(&(smsg->wait)); } - - return 0; + +__finish: + destroy_sexp(sx); + return r; } static int __default_msg_reply(void *cctx, sexp_t *sx) @@ -1066,8 +1258,14 @@ static int __default_msg_reply(void *cctx, sexp_t *sx) int idx; /* get parameters from the message */ - if(sexp_list_cdr(sx, &lsx)) return EINVAL; - if(!SEXP_IS_LIST(lsx)) return EINVAL; + if(sexp_list_cdr(sx, &lsx)) { + r = ESXRCBADPROT; + goto __finish; + } + if(!SEXP_IS_LIST(lsx)) { + r = ESXRCBADPROT; + goto __finish; + } /* get parameters */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { @@ -1108,8 +1306,8 @@ static int __default_msg_reply(void *cctx, sexp_t *sx) } if(msg_id < 0 || chnl_id < 0) { - /* FIXME: ?? */ - return EINVAL; + r = ESXRCBADPROT; + goto __finish; } if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT; @@ -1118,17 +1316,20 @@ static int __default_msg_reply(void *cctx, sexp_t *sx) pthread_rwlock_rdlock(&(chan->msglock)); if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) { pthread_rwlock_unlock(&(chan->msglock)); - /* TODO: return gently an opcode about absent message */ + r = ENOENT; + goto __finish; } else { pthread_rwlock_unlock(&(chan->msglock)); smsg = (sxmsg_t *)usrtc_node_getdata(node); smsg->opcode = SXOREPLYREQ; - //destroy_sexp((sexp_t *)smsg->payload); - smsg->payload = msg; + smsg->payload = copy_sexp(msg); + smsg->flags |= ESXMSG_ISREPLY; pthread_mutex_unlock(&(smsg->wait)); } - return 0; +__finish: + destroy_sexp(sx); + return r; } static int __init_systemrpc_tree(usrtc_t *rtree) @@ -1161,7 +1362,6 @@ static int __init_systemrpc_tree(usrtc_t *rtree) static int __eval_cstr(char *cstr, cx_rpc_list_t *rpc_list, void *ctx) { - ; int r = ENOENT; sexp_t *sx; usrtc_node_t *node; @@ -1196,6 +1396,8 @@ static void *__cxslave_thread_listener(void *wctx) if(r) printf("Got the message %s (%d bytes)\n", buf, r); r = __eval_cstr(buf, conn_sys->system_rpc, co); } + co->flags &= ~CXCONN_ESTABL; + __wake_up_waiters(co, ESXNOCONNECT); free(buf); @@ -1213,6 +1415,8 @@ static void *__cxmaster_thread_listener(void *wctx) if(r) printf("Got the message %s (%d bytes)\n", buf, r); r = __eval_cstr(buf, conn_sys->system_rpc, co); } + co->flags &= ~CXCONN_ESTABL; + __wake_up_waiters(co, ESXNOCONNECT); free(buf); @@ -1230,6 +1434,7 @@ static void *__rmsg_queue_thread(void *ctx) sexp_t *sx; usrtc_node_t *node = NULL; cx_rpc_t *rpccall; + struct __rpc_job *rjob = NULL; if(!tmp) return NULL; @@ -1239,7 +1444,6 @@ static void *__rmsg_queue_thread(void *ctx) free(tmp); return NULL; } - msg = tmp->data; if(!msg) continue; /* spurious !! */ @@ -1262,11 +1466,18 @@ static void *__rmsg_queue_thread(void *ctx) node = usrtc_lookup(ch->rpc_list->rpc_tree, rpcf); if(!node) { printf("RPC call illegal!\n"); - /* TODO: correct reply with an error code */ + r = ENOENT; + msg_return(msg, r); } else { rpccall = (cx_rpc_t *)usrtc_node_getdata(node); - /* call this ! */ /* TODO: move this job to the queue under dynamic thread poll */ - rpccall->rpcf((void *)msg, sx); + /* call this ! */ + rjob = malloc(sizeof(struct __rpc_job)); // TODO: check it + rjob->msg = msg; + rjob->sx = sx; + rjob->rpcf = rpccall->rpcf; + pth_dqtpoll_add(co->tpoll, (void *)rjob, USR_MSG); // TODO: check it + //rpccall->rpcf((void *)msg, sx); + } } } @@ -1340,21 +1551,15 @@ static void *__msg_queue_thread(void *ctx) * 1. remove from message tree * 2. destroy message itself */ - /* TODO: destroy the message */ -// destroy_sexp(msg->initial_sx); -// msg->initial_sx = NULL; -// msg->payload = NULL; -// destroy_sexp(msg->payload); + destroy_sexp(msg->initial_sx); + msg->initial_sx = NULL; msg->payload = NULL; + destroy_sexp(msg->payload); } else { ; pthread_mutex_unlock(&(msg->wait)); } } - len = strlen(tb); - tb += len*sizeof(char); - strcat(tb, ")))"); - ; } else { /* pulse messages */ /* here we're shouldn't process reply procedure */ snprintf(buf, 4096, "(ch-msg-pulse (:chid %lu (:msgid %lu ", ch->cid, @@ -1369,21 +1574,26 @@ static void *__msg_queue_thread(void *ctx) * 1. remove from message tree * 2. destroy message itself */ - /* TODO: destroy the message */ -// destroy_sexp(msg->initial_sx); -// msg->initial_sx = NULL; -// msg->payload = NULL; -// destroy_sexp(msg->payload); + destroy_sexp(msg->initial_sx); + msg->initial_sx = NULL; msg->payload = NULL; + __destroy_msg(msg); } else pthread_mutex_unlock(&(msg->wait)); } - len = strlen(tb); - tb += len*sizeof(char); - strcat(tb, ")))"); } + + len = strlen(tb); + tb += len*sizeof(char); + strcat(tb, ")))"); __ssl_write: + /* write it */ + if(__conn_write(co, (void *)buf, strlen(buf) + sizeof(char)) < 0) { + co->flags &= ~CXCONN_ESTABL; + __wake_up_waiters(co, ESXNOCONNECT); + } + if(msg->flags & ESXMSG_CLOSURE) { /* first remove the message from tree */ pthread_rwlock_wrlock(&(ch->msglock)); @@ -1393,9 +1603,7 @@ static void *__msg_queue_thread(void *ctx) destroy_sexp(msg->initial_sx); __destroy_msg(msg); } - - /* write it */ - __conn_write(co, (void *)buf, strlen(buf) + sizeof(char)); /* TODO: SSL*/ + fprintf(stderr, "\t-->%s wrote %s\n", __FUNCTION__, buf); } @@ -1468,7 +1676,7 @@ static int __verify_certcall(int preverify_ok, X509_STORE_CTX *ctx) if(conn_sys->validate_sslpem) { if(conn_sys->validate_sslpem(co)) return 0; else return 1; - } else return 1; /* FIXME: return 0 instead of 1 in production */ + } else return 0; } return preverify_ok; @@ -1515,7 +1723,10 @@ void *__system_queue_listener(void *data) co = chan->connection; payload = (sxpayload_t *)sysmsg->payload; /* write the buf */ - __conn_write(co, (void *)payload->cstr, strlen(payload->cstr) + 1); /* TODO: SSL*/ + if(__conn_write(co, (void *)payload->cstr, strlen(payload->cstr) + 1) < 0) { + co->flags &= ~CXCONN_ESTABL; + __wake_up_waiters(co, ESXNOCONNECT); + } } } @@ -1648,15 +1859,20 @@ int connection_initiate(conn_t *co, const char *host, int port, struct sockaddr_in addr; usrtc_t *ch_tree, *rpc_tree; pth_queue_t *mqueue = malloc(sizeof(pth_queue_t)); + if(!mqueue) return ENOMEM; + pth_dqtpoll_t *tpoll = malloc(sizeof(pth_dqtpoll_t)); + if(!tpoll) return ENOMEM; // TODO: fallback idx_allocator_t *idx_ch = malloc(sizeof(idx_allocator_t)); + if(!idx_ch) return ENOMEM; - /* TODO: check for mqueue allocation */ if(!co) return EINVAL; if(!host) return EINVAL; if(!SSL_cert) return EINVAL; if(!pctx) return EINVAL; - memset(co, 0, sizeof(co)); + memset(co, 0, sizeof(conn_t)); + + pth_dqtpoll_init(tpoll, __rpc_callback); if(!idx_ch) return ENOMEM; else r = idx_allocator_init(idx_ch, MAX_CHANNELS*MAX_MULTI, 0); @@ -1680,7 +1896,7 @@ int connection_initiate(conn_t *co, const char *host, int port, co->idx_ch = idx_ch; /* assign message queue */ - pth_queue_init(mqueue); /* TODO: check for initialization */ + if((r = pth_queue_init(mqueue))) goto __fail_3; co->mqueue = mqueue; /* init SSL certificates and context */ @@ -1758,6 +1974,15 @@ int connection_initiate(conn_t *co, const char *host, int port, SSL_write(co->ssl, buf, strlen(buf) + sizeof(char)); /* read the message reply */ bytes = __conn_read(co, buf, __TMPBUFLEN); + if(bytes == -1) { + // we've lost the connection + co->flags &= ~CXCONN_ESTABL; + r = ESXNOCONNECT; + __wake_up_waiters(co, ESXNOCONNECT); + free(buf); + /* shutdown connection */ + goto __fail_3; + } buf[bytes] = 0; /* perform an rpc call */ r = __eval_cstr(buf, conn_sys->system_rpc, (void *)co); @@ -1766,6 +1991,15 @@ int connection_initiate(conn_t *co, const char *host, int port, SSL_write(co->ssl, buf, strlen(buf) + sizeof(char)); /* read the message reply */ bytes = __conn_read(co, buf, __TMPBUFLEN); + if(bytes == -1) { + // we've lost the connection + co->flags &= ~CXCONN_ESTABL; + r = ESXNOCONNECT; + __wake_up_waiters(co, ESXNOCONNECT); + free(buf); + /* shutdown connection */ + goto __fail_3; + } buf[bytes] = 0; /* perform an rpc call */ r = __eval_cstr(buf, conn_sys->system_rpc, (void *)co); @@ -1786,7 +2020,11 @@ int connection_initiate(conn_t *co, const char *host, int port, //return 0; /* FIXME: */ } r = pthread_create(&co->msgthread, NULL, __msg_queue_thread, (void *)co); - /* TODO: check for thread creation */ + if(r) goto __fail_3; + + pth_dqtpoll_run(tpoll); + co->tpoll = tpoll; + return 0; } @@ -1810,10 +2048,14 @@ int connection_create(conn_t *co, int sck) usrtc_t *ch_tree, *rpc_tree; pth_queue_t *rqueue = malloc(sizeof(pth_queue_t)); pth_queue_t *mqueue = malloc(sizeof(pth_queue_t)); + pth_dqtpoll_t *tpoll = malloc(sizeof(pth_dqtpoll_t)); + if(!tpoll) return ENOMEM; // TODO: fallback idx_allocator_t *idx_ch = malloc(sizeof(idx_allocator_t)); if(!co) return EINVAL; else memset(co, 0, sizeof(co)); + + pth_dqtpoll_init(tpoll, __rpc_callback); // TODO: check it if(!idx_ch) return ENOMEM; else r = idx_allocator_init(idx_ch, MAX_CHANNELS*MAX_MULTI, 0); @@ -1837,10 +2079,12 @@ int connection_create(conn_t *co, int sck) co->idx_ch = idx_ch; /* assign message queue */ - pth_queue_init(rqueue); /* TODO: check for initialization */ + r = pth_queue_init(rqueue); + if(r) goto __fail_3; co->rqueue = rqueue; /* assigned outbone message queue master also has this one */ - pth_queue_init(mqueue); /* TODO: check for initialization */ + r = pth_queue_init(mqueue); + if(r) goto __fail_3; co->mqueue = mqueue; /* init SSL certificates and context */ @@ -1913,6 +2157,9 @@ int connection_create(conn_t *co, int sck) printf("bytes = %d\n", bytes); if(bytes < 0) { printf("Terminate SSL connection, the other end is lost.\n"); + co->flags &= ~CXCONN_ESTABL; + __wake_up_waiters(co, ESXNOCONNECT); + r = ESXNOCONNECT; goto __fail_3; } } @@ -1932,8 +2179,13 @@ int connection_create(conn_t *co, int sck) usrtc_insert(conn_sys->connections, &co->csnode, (void *)co->uuid); pthread_rwlock_unlock(&conn_sys->rwlock); /* threads poll --- */ - r = pthread_create(&co->msgthread, NULL, __msg_queue_thread, (void *)co); /* TODO: check for thread creation */ - r = pthread_create(&co->rmsgthread, NULL, __rmsg_queue_thread, (void *)co); /* TODO: check for thread creation */ + r = pthread_create(&co->msgthread, NULL, __msg_queue_thread, (void *)co); + if(r) goto __fail_3; + r = pthread_create(&co->rmsgthread, NULL, __rmsg_queue_thread, (void *)co); + if(r) goto __fail_3; + + pth_dqtpoll_run(tpoll); + co->tpoll = tpoll; } return r; @@ -2052,12 +2304,20 @@ int channel_open(conn_t *co, chnl_t **ch, int type) { chnl_t *nch = NULL; int r = 0; - char *uuid_ = __generate_uuid(); - sxpayload_t *pl = malloc(sizeof(sxpayload_t)); + char *uuid_; + sxpayload_t *pl; ulong_t cid; rpc_typed_list_t *rpclist; - usrtc_node_t *node = usrtc_lookup(co->rpc_list, &type); + usrtc_node_t *node; sxmsg_t *sms; + + if(!(co->flags & CXCONN_ESTABL)) { + return ESXNOCONNECT; + } + + uuid_ = __generate_uuid(); + pl = malloc(sizeof(sxpayload_t)); + node = usrtc_lookup(co->rpc_list, &type); /* if(!node) { r = EINVAL; printf("fuck\n"); @@ -2097,7 +2357,7 @@ int channel_open(conn_t *co, chnl_t **ch, int type) /* ok now we're ready to create a message and push channel to the list */ if((r = __create_sys_msg(&sms, uuid_, nch, pl))) { __fail_chan: - /* TODO: destroy the channel*/ + /* destroy the channel*/ goto __fini_op; } else { /* put the channel to the channels search tree */ @@ -2136,7 +2396,7 @@ int channel_open(conn_t *co, chnl_t **ch, int type) } __fini_op: - if(r) { /* TODO: destroy */ + if(r) { if(uuid_) free(uuid_); if(pl) { if(pl->cstr) free(pl->cstr); @@ -2159,12 +2419,18 @@ int channel_open(conn_t *co, chnl_t **ch, int type) int channel_close(chnl_t *chnl) { - char *uuid_ = __generate_uuid(); + char *uuid_; usrtc_node_t *node = NULL; int r; conn_t *co = chnl->connection; sxmsg_t *sms; sxpayload_t *pl; + + if(!(co->flags & CXCONN_ESTABL)) { + return ESXNOCONNECT; + } + + uuid_ = __generate_uuid(); pthread_rwlock_rdlock(&(co->chnl_lock)); node = usrtc_lookup(co->chnl_tree, &chnl->cid); @@ -2185,7 +2451,9 @@ int channel_close(chnl_t *chnl) pl = malloc(sizeof(sxpayload_t)); if(!pl) return ENOMEM; if(__create_sys_msg(&sms, uuid_, chnl, pl)) { - /* TODO: destroy the channel*/ + if(chnl->idx_msg) free(chnl->idx_msg); + if(chnl->msgs_tree) free(chnl->msgs_tree); + free(chnl); return ENOMEM; } @@ -2218,8 +2486,6 @@ __process_smsg: return r; } else r = 0; chnl->flags &= ~ESXCHAN_PENDING; /* mark it as established */ - /* TODO: destroy system message in the channel */ - ; /* remove channel from the search tree */ pthread_rwlock_wrlock(&(chnl->connection->chnl_lock)); usrtc_delete(chnl->connection->chnl_tree, &chnl->node); @@ -2260,6 +2526,11 @@ static int __message_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec int r = 0; sxmsg_t *m = NULL; conn_t *co = ch->connection; + + if(!(co->flags & CXCONN_ESTABL)) { + destroy_sexp(sx); + return ESXNOCONNECT; + } *msg = NULL; @@ -2321,6 +2592,11 @@ static int __msg_reply(sxmsg_t *msg, sexp_t *sx, struct timespec *tio, int opcod int r = 0; chnl_t *ch = msg->pch; conn_t *co = ch->connection; + + if(!(co->flags & CXCONN_ESTABL)) { + destroy_sexp(sx); + return ESXNOCONNECT; + } msg->payload = sx; msg->opcode = opcode; @@ -2374,9 +2650,71 @@ int msg_reply_timed(sxmsg_t *msg, sexp_t *sx, struct timespec *tio) return __msg_reply(msg, sx, tio, 0); } +//TODO: continue. Implement wait for delivery and queue addition +/* + * How message sending works: + * 1. Create a message structure assigned to the channel, + * 2. Put S-expression context to it + * 3. Put the message to the queue + * 4. Wait for job execution + */ +static int __message_send_pulse(chnl_t *ch, sexp_t *sx, struct timespec *tio) +{ + int r = 0; + sxmsg_t *m = NULL; + conn_t *co = ch->connection; + + if(!(co->flags & CXCONN_ESTABL)) { + destroy_sexp(sx); + return ESXNOCONNECT; + } + + r = __create_reg_msg(&m, ch); + if(r) return r; + else { + /* put the message to the search tree */ + pthread_rwlock_wrlock(&(ch->msglock)); + usrtc_insert(ch->msgs_tree, &(m->pendingq_node), &(m->mid)); + pthread_rwlock_unlock(&(ch->msglock)); + + /* message assign */ + m->opcode = 0; + m->payload = (void *)sx; + /* assign initial sx */ + m->initial_sx = sx; + m->flags |= ESXMSG_PULSE; + + /* put the message to the run queue */ + r = pth_queue_add(co->mqueue, (void *)m, USR_MSG); + if(r) return r; /* FIXME: better give up */ + + if(m->flags & ESXMSG_PENDING) { + if(!tio) pthread_mutex_lock(&(m->wait)); + else pthread_mutex_timedlock(&(m->wait), tio); + } + if(tio && (m->flags & ESXMSG_PENDING)) + return SXOTIMEDOUT; + if(!m->payload) { + r = m->opcode; + /* first remove the message from tree */ + pthread_rwlock_wrlock(&(ch->msglock)); + usrtc_delete(ch->msgs_tree, &(m->pendingq_node)); + pthread_rwlock_unlock(&(ch->msglock)); + /* destroy s expression */ + destroy_sexp(m->initial_sx); + /* destroy */ + __destroy_msg(m); + } else { + r = SXOREPLYREQ; + } + } + + return r; +} + int msg_send_pulse(chnl_t *ch, sexp_t *sx) { - return 0; + return __message_send_pulse(ch, sx, NULL); } int msg_send_pulse_timed(chnl_t *ch, sexp_t *sx, struct timespec *tio) diff --git a/lib/queue.c b/lib/queue.c index efa93e7..f59a3f6 100644 --- a/lib/queue.c +++ b/lib/queue.c @@ -214,20 +214,81 @@ unsigned int pth_queue_length(pth_queue_t *queue) /* dynamic queue thread poll */ +struct __pthrd_data { + pth_dqtpoll_t *pthrd; + int myid; +}; + static void *__poll_thread(void *poll) { int r = 0; - pth_msg_t msgbuf; - pth_dqtpoll_t *p = (pth_dqtpoll_t *)poll; + struct __pthrd_data *thrdata = (struct __pthrd_data *)poll; + struct __pthrd_data *npoll = NULL; + pth_msg_t msgbuf, sysbuf; + pth_dqtpoll_t *p = thrdata->pthrd; pth_queue_t *q = p->queue; + ulong_t myid = thrdata->myid; while(1) { r = pth_queue_get(q, NULL, &msgbuf); - if(r) continue; + pthread_rwlock_wrlock(&(p->stats_lock)); + if(r) { + p->sleep_value++; + pthread_rwlock_unlock(&(p->stats_lock)); + continue; + } else p->sleep_value--; + pthread_rwlock_unlock(&(p->stats_lock)); + switch(msgbuf.msgtype) { case USR_MSG: + /* schedule new task for poll - increase or decrease */ + pthread_rwlock_rdlock(&(p->stats_lock)); + if(pth_queue_length(p->queue) > p->sleep_value) { + memset(&sysbuf, 0, sizeof(pth_msg_t)); + pth_queue_add(p->queue, &sysbuf, POLL_INCREASE); + } else if((pth_queue_length(p->queue) > p->sleep_value) && (p->sleep_value > 1)) { + memset(&sysbuf, 0, sizeof(pth_msg_t)); + pth_queue_add(p->queue, &sysbuf, POLL_DECREASE); + } + pthread_rwlock_unlock(&(p->stats_lock)); + /* do the job */ p->jobdata_callback(msgbuf.data); break; + case POLL_DECREASE: + pthread_rwlock_rdlock(&(p->stats_lock)); + if(p->poll_value > 1) r = 1; /* exit now */ + pthread_rwlock_unlock(&(p->stats_lock)); + if(r) { + pthread_rwlock_wrlock(&(p->stats_lock)); + idx_free(p->idx, myid); + p->poll_value--; + pthread_rwlock_unlock(&(p->stats_lock)); + free(poll); + return NULL; + } + break; + case POLL_INCREASE: + if((npoll = malloc(sizeof(struct __pthrd_data)))) { + pthread_rwlock_rdlock(&(p->stats_lock)); + if(p->poll_value < MAX_POLL_VALUE) r = 1; + pthread_rwlock_unlock(&(p->stats_lock)); + if(r) { + pthread_rwlock_wrlock(&(p->stats_lock)); + if(p->poll_value < MAX_POLL_VALUE) { /* check it again */ + npoll->myid = idx_allocate(p->idx); + npoll->pthrd = p; + p->poll_value++; + /* create thread here */ + if(pthread_create(&(p->poll[npoll->myid]), NULL, __poll_thread, npoll)) { + idx_free(p->idx, npoll->myid); + p->poll_value--; + free(npoll); + } + } + pthread_rwlock_unlock(&(p->stats_lock)); + } + } + break; default: /* TODO: do something ... */ break; @@ -244,8 +305,10 @@ int pth_dqtpoll_init(pth_dqtpoll_t *tpoll, int (*jobdata_callback)(void *)) pth_queue_t *queue = malloc(sizeof(pth_queue_t)); pthread_t *poll = malloc(sizeof(pthread_t)*MAX_POLL_VALUE); idx_allocator_t *idx = malloc(sizeof(idx_allocator_t)); + struct __pthrd_data *ndata = malloc(sizeof(struct __pthrd_data)); /* check it for allocation */ + if(!ndata) goto __enomem; if(!idx) goto __enomem; if(!queue) goto __enomem; if(!poll) goto __enomem; @@ -272,13 +335,16 @@ int pth_dqtpoll_init(pth_dqtpoll_t *tpoll, int (*jobdata_callback)(void *)) /* first thread initiation */ idx_reserve(idx, 0); - if(pthread_create(&(poll[0]), NULL, __poll_thread, tpoll)) { + ndata->myid = 0; + ndata->pthrd = tpoll; + if(pthread_create(&(poll[0]), NULL, __poll_thread, ndata)) { pthread_rwlock_destroy(&(tpoll->stats_lock)); goto __enomem; } __finish: if(r) { + if(ndata) free(ndata); if(idx) free(idx); if(queue) { pth_queue_destroy(queue, 0, NULL);