diff --git a/include/sntl/connection.h b/include/sntl/connection.h index e16ec6f..e7a4f34 100644 --- a/include/sntl/connection.h +++ b/include/sntl/connection.h @@ -215,7 +215,7 @@ int connection_reinit(conn_t *co); /* channels */ int channel_open(conn_t *co, chnl_t **ch, int type); -int channel_close(conn_t *co); +int channel_close(chnl_t *chnl); /* message passing */ int msg_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg); diff --git a/lib/connection.c b/lib/connection.c index fde32b3..f369160 100644 --- a/lib/connection.c +++ b/lib/connection.c @@ -631,7 +631,168 @@ static int __default_ch_open_ret(void *cctx, sexp_t *sx) } static int __default_ch_close(void *cctx, sexp_t *sx) +{ + conn_t *co = (conn_t *)cctx; + usrtc_node_t *node; + char *val, *var, *buf; + int idx, llen, r; + ulong_t cid = -1; + sexp_t *lsx, *sx_iter, *sx_in; + chnl_t *channel; + + r = 0; + /* skip keyword itself */ + lsx = sx->list->next; + /* now we expect a list of lists */ + if(lsx->ty != SEXP_LIST) { + printf("%s:%d\n", __FUNCTION__, __LINE__); + r = EINVAL; + goto __send_repl; + } + + /* take length of the list */ + llen = sexp_list_length(lsx); + if(!llen) return 0; /* other side will not set any security attributes */ + SEXP_ITERATE_LIST(lsx, sx_iter, idx) { + if(SEXP_IS_LIST(sx_iter)) { + continue; + } + if(!SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { + printf("%s:%d\n", __FUNCTION__, __LINE__); + r = EINVAL; /* TODO: return correct error code, clean up*/ + goto __send_repl; + } else val = sx_iter->val; + + sx_in = sx_iter->next; + + if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { + r = EINVAL; /* TODO: return correct error code, clean up*/ + printf("%s:%d\n", __FUNCTION__, __LINE__); + goto __send_repl; + } else var = sx_in->val; + + /* ok, now we need to analyze parameters */ + if(*val != ':') { + r = EINVAL; /* TODO: clean up all the shit */ + goto __send_repl; + } else { + if(!strcmp((char *)(val + sizeof(char)), "id")) { + cid = atoll(var); + break; + } + } + } + + printf("%s(%ld)\n", __FUNCTION__, cid); + + /* additional check for type of the channel */ + node = usrtc_lookup(co->chnl_tree, &cid); + if(!node) { + r = ENOENT; + printf("there is no channel with id=%ld\n", cid); + goto __send_repl; + } + channel = (chnl_t *)usrtc_node_getdata(node); + if(!channel) { + r = ENOENT; + printf("there is no channel with id=%ld\n", cid); + goto __send_repl; + } + +// TODO free it correctly +// free(channel->uuid); +// pthread_rwlock_wrlock(&(co->chnl_lock)); +// idx_free(co->idx_ch, channel->cid); +// pthread_rwlock_unlock(&(co->chnl_lock)); +// free(channel); + usrtc_delete(co->chnl_tree, node); + +__send_repl: + buf = malloc(2048); + snprintf(buf, 2048, "(ch-close-ret ((:id %ld) (:error %d)))", + channel->cid, r); + int nbytes = SSL_write(co->ssl, buf, strlen(buf)); + printf("%s: replied %s (%d bytes)\n", __FUNCTION__, buf, nbytes); + free(buf); + + return 0; +} + +static int __default_ch_close_ret(void *cctx, sexp_t *sx) { + __DBGLINE; + conn_t *co = (conn_t *)cctx; + chnl_t *chan; + usrtc_node_t *node; + int err = 0, r, llen, idx; + ulong_t id; + char *val, *var; + sexp_t *lsx, *sx_iter, *sx_in; + sxmsg_t *sms = NULL; + + /* skip keyword itself */ + lsx = sx->list->next; + /* now we expect a list of lists */ + if(lsx->ty != SEXP_LIST) { + //printf("%s:%d\n", __FUNCTION__, __LINE__); + r = EINVAL; /* TODO: right opcode */ + goto __mark_msg; + } + /* take length of the list */ + llen = sexp_list_length(lsx); + if(!llen) return EINVAL; /* TODO: !! other side will not set any security attributes */ + __DBGLINE; + SEXP_ITERATE_LIST(lsx, sx_iter, idx) { + if(SEXP_IS_LIST(sx_iter)) { + sexp_list_car(sx_iter, &sx_in); + + if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { + r = EINVAL; /* TODO: return correct error code, clean up*/ + goto __mark_msg; + } else val = sx_in->val; + + if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */ + + sexp_list_cdr(sx_iter, &sx_in); + + if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { + r = EINVAL; /* TODO: return correct error code, clean up*/ + goto __mark_msg; + } else var = sx_in->val; + + /* ok, now we need to analyze parameters */ + if(*val != ':') { + r = EINVAL; /* TODO: clean up all the shit */ + goto __mark_msg; + } else { + if(!strcmp((char *)(val + sizeof(char)), "error")) + err = atoi(var); + else if(!strcmp((char *)(val + sizeof(char)), "id")) + id = atoll(var); + } + } else continue; /* ignore */ + } + __DBGLINE; + /* try to find desired channel to intercept message */ + pthread_rwlock_rdlock(&(co->chnl_lock)); + node = usrtc_lookup(co->chnl_tree, (void *)&id); + //printf("channels (%d)\n", usrtc_count(co->chnl_tree)); + pthread_rwlock_unlock(&(co->chnl_lock)); + if(node) { + printf("found channel!\n"); + chan = (chnl_t *)usrtc_node_getdata(node); + sms = chan->sysmsg; + } + __DBGLINE; + __mark_msg: + __DBGLINE; + if(!sms) return r; + sms->flags &= ~ESXMSG_PENDING; /* the message is done */ + sms->opcode = err; + + /* unlock mutex to wake up the waiting thread */ + pthread_mutex_unlock(&(sms->wait)); + __DBGLINE; return 0; } @@ -674,38 +835,59 @@ static int __default_msg(void *cctx, sexp_t *sx) conn_t *co = (conn_t *)cctx; usrtc_node_t *node = NULL; chnl_t *chan = NULL; - char *key; int r = 0; - sexp_t *lsx = NULL; - ulong_t chnl_id = 0; - ulong_t msg_id = 0x00; + int r = 0; + sexp_t *lsx = NULL, *sx_iter = NULL; + sexp_t *sx_sublist = NULL, *sx_value = NULL; + ulong_t chnl_id = -1; + ulong_t msg_id = -1; sexp_t *msg = NULL; sxmsg_t *smsg = NULL; + int idx; /* get parameters from the message */ if(sexp_list_cdr(sx, &lsx)) return EINVAL; if(!SEXP_IS_LIST(lsx)) return EINVAL; + // find channel id + SEXP_ITERATE_LIST(lsx, sx_iter, idx) { + if(SEXP_IS_LIST(sx_iter)) { + sx_sublist = sx_iter; + continue; + } else { + if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { + if(strcmp(sx_iter->val, ":chid")) { + continue; // ignore it + } + sx_value = sx_iter->next; + if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { + continue; + } + chnl_id = atol(sx_value->val); + } else continue; // ignore it + } + } + lsx = sx_sublist; + // find message id + SEXP_ITERATE_LIST(lsx, sx_iter, idx) { + if(SEXP_IS_LIST(sx_iter)) { + msg = sx_iter; + continue; + } else { + if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { + if(strcmp(sx_iter->val, ":msgid")) { + continue; // ignore + } + sx_value = sx_iter->next; + if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { + continue; + } + msg_id = atol(sx_value->val); + } else continue; // ignore it + } + } - /* FIXME: make it via iteraction, to cover the case with different arguments placement */ - key = lsx->list->val; - if(strcmp(key, ":chid")) return EINVAL; - lsx = lsx->list->next; - if(!lsx) return EINVAL; - if(!SEXP_IS_TYPE(lsx, SEXP_BASIC)) return EINVAL; - chnl_id = atol(lsx->val); - - lsx = lsx->next; - if(!SEXP_IS_LIST(lsx)) return EINVAL; - key = lsx->list->val; - if(strcmp(key, ":msgid")) return EINVAL; - lsx = lsx->list->next; - if(!lsx) return EINVAL; - if(!SEXP_IS_TYPE(lsx, SEXP_BASIC)) return EINVAL; - msg_id = atol(lsx->val); /* message id */ - - lsx = lsx->next; - if(!SEXP_IS_LIST(lsx)) return EINVAL; - msg = lsx; - + if(msg_id < 0 || chnl_id < 0) { + return EINVAL; + } /* find channel */ printf("chnl_id = %ld\n", chnl_id); if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT; @@ -726,7 +908,9 @@ static int __default_msg(void *cctx, sexp_t *sx) usrtc_insert(chan->msgs_tree, &(smsg->pendingq_node), &(smsg->mid)); pthread_rwlock_unlock(&(chan->msglock)); } else { - /* TODO: reply fdr this message with an error EEXIST */ + // TODO is it correct? + smsg = (sxmsg_t *)usrtc_node_getdata(node); + msg_return(smsg, EEXIST); return EEXIST; } @@ -743,11 +927,13 @@ static int __default_msg(void *cctx, sexp_t *sx) static int __default_msg_return(void *cctx, sexp_t *sx) { + __DBGLINE; return 0; } static int __default_msg_reply(void *cctx, sexp_t *sx) { + __DBGLINE; return 0; } @@ -764,6 +950,7 @@ static int __init_systemrpc_tree(usrtc_t *rtree) if(__insert_rpc_function(rtree, "ch-open", __default_ch_open)) goto __fail; if(__insert_rpc_function(rtree, "ch-open-ret", __default_ch_open_ret)) goto __fail; if(__insert_rpc_function(rtree, "ch-close", __default_ch_close)) goto __fail; + if(__insert_rpc_function(rtree, "ch-close-ret", __default_ch_close_ret)) goto __fail; /* messaging functions */ if(__insert_rpc_function(rtree, "ch-msg-pulse", __default_msg_pulse)) goto __fail; if(__insert_rpc_function(rtree, "ch-msg-pulse-ret", __default_msg_pulse_ret)) goto __fail; @@ -812,7 +999,8 @@ static void *__cxslave_thread_listener(void *wctx) int r; while((r = __conn_read(co, buf, 4096)) != -1) { - if(r) printf("Got the message %s \n", buf); + buf[r] = '\0'; + if(r) printf("Got the message %s (%d bytes)\n", buf, r); r = __eval_cstr(buf, conn_sys->system_rpc, co); } @@ -828,7 +1016,8 @@ static void *__cxmaster_thread_listener(void *wctx) int r; while((r = __conn_read(co, buf, 4096)) != -1) { - if(r) printf("Got the message %s \n", buf); + buf[r] = '\0'; + if(r) printf("Got the message %s (%d bytes)\n", buf, r); r = __eval_cstr(buf, conn_sys->system_rpc, co); } @@ -1357,6 +1546,7 @@ int connection_create(conn_t *co, int sck) char *uuid; char *buf = NULL; usrtc_t *ch_tree, *rpc_tree; + pth_queue_t *rqueue = malloc(sizeof(pth_queue_t)); idx_allocator_t *idx_ch = malloc(sizeof(idx_allocator_t)); if(!co) return EINVAL; @@ -1382,6 +1572,10 @@ int connection_create(conn_t *co, int sck) usrtc_init(ch_tree, USRTC_REDBLACK, MAX_CHANNELS, __cmp_ulong); co->idx_ch = idx_ch; + + /* assign message queue */ + pth_queue_init(rqueue); /* TODO: check for initialization */ + co->rqueue = rqueue; /* init SSL certificates and context */ co->ctx = SSL_CTX_new(SSLv3_server_method()); @@ -1683,9 +1877,77 @@ int channel_open(conn_t *co, chnl_t **ch, int type) return r; } -int channel_close(conn_t *co) +int channel_close(chnl_t *chnl) { + char *uuid_ = __generate_uuid(); + usrtc_node_t *node = NULL; + int r; + + /* check unprocessed messages */ + if(!usrtc_isempty(chnl->msgs_tree)) { + fprintf(stderr, "Unable to close channel"); + return EINVAL; + } + + node = usrtc_lookup(chnl->connection->chnl_tree, &chnl->cid); + if(!node) { + fprintf(stderr, "No such channel\n"); + return ENOENT; + } + + sxpayload_t *pl = malloc(sizeof(sxpayload_t)); + if(!pl) return ENOMEM; + sxmsg_t *sms; + if(__create_sys_msg(&sms, uuid_, chnl, pl)) { + /* TODO: destroy the channel*/ + return ENOMEM; + } + + pl->sx = NULL; + if(!(pl->cstr = malloc(sizeof(char) * ESX_SYSMSG_SIZE))) { + free(pl); + return ENOMEM; + } + memset(pl->cstr, 0, sizeof(char) * ESX_SYSMSG_SIZE); + + /* put system message to the run queue */ + /* first form the message */ + snprintf(pl->cstr, sizeof(char) * ESX_SYSMSG_SIZE, + "(ch-close (:id %ld))", chnl->cid); + chnl->sysmsg = sms; /* assign system message to the channel */ + /* put it */ + if((r = pth_queue_add(conn_sys->ioqueue, (void *)sms, SYS_MSG))) { + return r; + } + if(!(sms->flags & ESXMSG_PENDING)) { + /* was processed too fast */ + goto __process_smsg; + } else pthread_mutex_lock(&(sms->wait)); /* will sleep until got a reply */ + +__process_smsg: + if(sms->opcode) { + r = sms->opcode; + return r; + } else r = 0; + chnl->flags &= ~ESXCHAN_PENDING; /* mark it as established */ + /* TODO: destroy system message in the channel */ + __DBGLINE; + /* remove channel from the search tree */ + pthread_rwlock_wrlock(&(chnl->connection->chnl_lock)); + usrtc_delete(chnl->connection->chnl_tree, &chnl->node); + pthread_rwlock_unlock(&(chnl->connection->chnl_lock)); + + free(pl->cstr); + free(pl); +// free(chnl->uuid); +// pthread_rwlock_wrlock(&(chnl->connection->chnl_lock)); +// idx_free(chnl->connection->idx_ch, chnl->cid); +// pthread_rwlock_unlock(&(chnl->connection->chnl_lock)); +// idx_free(chnl->connection->idx_ch, chnl->cid); +// free(chnl); + return 0; + } /* message passing */