From 36bd01716e13f01fc409944236cfec45cbe970c9 Mon Sep 17 00:00:00 2001 From: Alexander Vdolainen Date: Sat, 20 Dec 2014 02:40:57 +0200 Subject: [PATCH] added numerious fixes, TODOs, and some implementation; --- include/sntl/connection.h | 1 + lib/connection.c | 184 +++++++++++++++++++++++++++++++++++--- 2 files changed, 173 insertions(+), 12 deletions(-) diff --git a/include/sntl/connection.h b/include/sntl/connection.h index e7a4f34..1f87dd9 100644 --- a/include/sntl/connection.h +++ b/include/sntl/connection.h @@ -139,6 +139,7 @@ typedef struct __message_t { usrtc_node_t pendingq_node; /** < node for the pending queue */ pthread_mutex_t wait; /** < special wait mutex, used for sync */ void *payload; /** < payload */ + sexp_t *initial_sx; int opcode; /** < opcode for system and pulse messages */ int flags; /** < flags of the message (type, state etc ...)*/ int use_count; /** < use count */ diff --git a/lib/connection.c b/lib/connection.c index c871030..b47daf4 100644 --- a/lib/connection.c +++ b/lib/connection.c @@ -847,7 +847,8 @@ static int __default_msg(void *cctx, sexp_t *sx) /* get parameters from the message */ if(sexp_list_cdr(sx, &lsx)) return EINVAL; if(!SEXP_IS_LIST(lsx)) return EINVAL; - // find channel id + /* TODO: optimize it, i.e. do it with one pass iteraction !! */ + /* find channel id */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { sx_sublist = sx_iter; @@ -863,10 +864,10 @@ static int __default_msg(void *cctx, sexp_t *sx) } chnl_id = atol(sx_value->val); } else continue; // ignore it - } + } } lsx = sx_sublist; - // find message id + /* find message id */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { msg = sx_iter; @@ -882,7 +883,7 @@ static int __default_msg(void *cctx, sexp_t *sx) } msg_id = atol(sx_value->val); } else continue; // ignore it - } + } } if(msg_id < 0 || chnl_id < 0) { @@ -903,6 +904,9 @@ static int __default_msg(void *cctx, sexp_t *sx) smsg->opcode = 0; smsg->payload = (void *)msg; + /* assign initial S-expression structure */ + smsg->initial_sx = sx; + /* put the message to the search tree */ pthread_rwlock_wrlock(&(chan->msglock)); usrtc_insert(chan->msgs_tree, &(smsg->pendingq_node), &(smsg->mid)); @@ -915,7 +919,7 @@ static int __default_msg(void *cctx, sexp_t *sx) } /* put job to the queue and give up */ - r = pth_queue_add(co->rqueue, (void *)msg, USR_MSG); + r = pth_queue_add(co->rqueue, (void *)smsg, USR_MSG); if(r) { /* cannot put job to the queue */ /* TODO: remove message and reply with error code */ return r; @@ -925,15 +929,165 @@ static int __default_msg(void *cctx, sexp_t *sx) return r; } +/* TODO: optimize amount of code: (must be first) + * __default_msg_return + * __default_msg_reply + * there are many copy-n-paste code!!!!! + */ + static int __default_msg_return(void *cctx, sexp_t *sx) { + conn_t *co = (conn_t *)cctx; + usrtc_node_t *node = NULL; + chnl_t *chan = NULL; + int r = 0; + sexp_t *lsx = NULL, *sx_iter = NULL; + sexp_t *sx_sublist = NULL, *sx_value = NULL; + ulong_t chnl_id = -1; + ulong_t msg_id = -1; + sexp_t *msg = NULL; + sxmsg_t *smsg = NULL; + int idx, opcode; + __DBGLINE; + /* get parameters from the message */ + if(sexp_list_cdr(sx, &lsx)) return EINVAL; + if(!SEXP_IS_LIST(lsx)) return EINVAL; __DBGLINE; + /* get parameters */ + SEXP_ITERATE_LIST(lsx, sx_iter, idx) { + if(SEXP_IS_LIST(sx_iter)) { + sx_sublist = sx_iter; + continue; + } else { + if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { + if(strcmp(sx_iter->val, ":chid")) { + continue; // ignore it + } + sx_value = sx_iter->next; + if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { + continue; + } + chnl_id = atol(sx_value->val); + } else continue; // ignore it + } + } + lsx = sx_sublist; + /* find message id */ + SEXP_ITERATE_LIST(lsx, sx_iter, idx) { + if(SEXP_IS_LIST(sx_iter)) { + msg = sx_iter; + continue; + } else { + if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { + if(strcmp(sx_iter->val, ":msgid")) { + continue; // ignore + } + sx_value = sx_iter->next; + if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { + continue; + } + msg_id = atol(sx_value->val); + } else continue; // ignore it + } + } + /* get opcode */ + sexp_list_car(msg, &lsx); + opcode = atoi(lsx->val); + + if(msg_id < 0 || chnl_id < 0) { + return EINVAL; + } + printf("chnl_id = %ld\n", chnl_id); + if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT; + else chan = (chnl_t *)usrtc_node_getdata(node); + /* lookup for the message */ + if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) { + /* TODO: return gently an opcode about absent message */ + } else { + smsg = (sxmsg_t *)usrtc_node_getdata(node); + smsg->opcode = opcode; + //destroy_sexp((sexp_t *)smsg->payload); + smsg->payload = NULL; + pthread_mutex_unlock(&(smsg->wait)); + } + return 0; } static int __default_msg_reply(void *cctx, sexp_t *sx) { - __DBGLINE; + conn_t *co = (conn_t *)cctx; + usrtc_node_t *node = NULL; + chnl_t *chan = NULL; + int r = 0; + sexp_t *lsx = NULL, *sx_iter = NULL; + sexp_t *sx_sublist = NULL, *sx_value = NULL; + ulong_t chnl_id = -1; + ulong_t msg_id = -1; + sexp_t *msg = NULL; + sxmsg_t *smsg = NULL; + int idx; + + /* get parameters from the message */ + if(sexp_list_cdr(sx, &lsx)) return EINVAL; + if(!SEXP_IS_LIST(lsx)) return EINVAL; + + /* get parameters */ + SEXP_ITERATE_LIST(lsx, sx_iter, idx) { + if(SEXP_IS_LIST(sx_iter)) { + sx_sublist = sx_iter; + continue; + } else { + if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { + if(strcmp(sx_iter->val, ":chid")) { + continue; // ignore it + } + sx_value = sx_iter->next; + if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { + continue; + } + chnl_id = atol(sx_value->val); + } else continue; // ignore it + } + } + lsx = sx_sublist; + /* find message id */ + SEXP_ITERATE_LIST(lsx, sx_iter, idx) { + if(SEXP_IS_LIST(sx_iter)) { + msg = sx_iter; + continue; + } else { + if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { + if(strcmp(sx_iter->val, ":msgid")) { + continue; // ignore + } + sx_value = sx_iter->next; + if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { + continue; + } + msg_id = atol(sx_value->val); + } else continue; // ignore it + } + } + + if(msg_id < 0 || chnl_id < 0) { + /* FIXME: ?? */ + return EINVAL; + } + + if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT; + else chan = (chnl_t *)usrtc_node_getdata(node); + /* lookup for the message */ + if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) { + /* TODO: return gently an opcode about absent message */ + } else { + smsg = (sxmsg_t *)usrtc_node_getdata(node); + smsg->opcode = SXOREPLYREQ; + //destroy_sexp((sexp_t *)smsg->payload); + smsg->payload = msg; + pthread_mutex_unlock(&(smsg->wait)); + } + return 0; } @@ -1041,7 +1195,7 @@ static void *__rmsg_queue_thread(void *ctx) if(!tmp) return NULL; while(1) { - r = pth_queue_get(co->mqueue, NULL, tmp); + r = pth_queue_get(co->rqueue, NULL, tmp); if(r) { free(tmp); return NULL; @@ -1070,9 +1224,11 @@ static void *__rmsg_queue_thread(void *ctx) if(!node) { printf("RPC call illegal!\n"); /* TODO: correct reply with an error code */ - } else rpccall = (cx_rpc_t *)usrtc_node_getdata(node); - /* call this ! */ - rpccall->rpcf((void *)msg, sx); + } else { + rpccall = (cx_rpc_t *)usrtc_node_getdata(node); + /* call this ! */ /* TODO: move this job to the queue under dynamic thread poll */ + rpccall->rpcf((void *)msg, sx); + } } } @@ -1740,12 +1896,12 @@ int connection_create(conn_t *co, int sck) return r; } -int connection_close(conn_t *co) +int connection_close(conn_t *co) /* TODO: */ { return 0; } -int connection_reinit(conn_t *co) +int connection_reinit(conn_t *co) /* TODO: */ { return 0; } @@ -2040,6 +2196,8 @@ static int __message_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec /* message assign */ m->opcode = 0; m->payload = (void *)sx; + /* assign initial sx */ + m->initial_sx = sx; /* put the message to the run queue */ r = pth_queue_add(co->mqueue, (void *)m, USR_MSG); @@ -2093,6 +2251,8 @@ static int __msg_reply(sxmsg_t *msg, sexp_t *sx, struct timespec *tio, int opcod r = pth_queue_add(co->mqueue, (void *)msg, USR_MSG); if(r) return r; /* FIXME: better give up */ + if(!sx) return 0; /* TODO: destroy a message */ + if(msg->flags & ESXMSG_PENDING) { if(!tio) pthread_mutex_lock(&(msg->wait)); else pthread_mutex_timedlock(&(msg->wait), tio);