safe commit: severe fixes. channle close implementation;

v0.5.xx
Alexander Vdolainen 10 years ago
parent afeca23eac
commit 5ea003dfad

@ -215,7 +215,7 @@ int connection_reinit(conn_t *co);
/* channels */
int channel_open(conn_t *co, chnl_t **ch, int type);
int channel_close(conn_t *co);
int channel_close(chnl_t *chnl);
/* message passing */
int msg_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg);

@ -631,7 +631,168 @@ static int __default_ch_open_ret(void *cctx, sexp_t *sx)
}
static int __default_ch_close(void *cctx, sexp_t *sx)
{
conn_t *co = (conn_t *)cctx;
usrtc_node_t *node;
char *val, *var, *buf;
int idx, llen, r;
ulong_t cid = -1;
sexp_t *lsx, *sx_iter, *sx_in;
chnl_t *channel;
r = 0;
/* skip keyword itself */
lsx = sx->list->next;
/* now we expect a list of lists */
if(lsx->ty != SEXP_LIST) {
printf("%s:%d\n", __FUNCTION__, __LINE__);
r = EINVAL;
goto __send_repl;
}
/* take length of the list */
llen = sexp_list_length(lsx);
if(!llen) return 0; /* other side will not set any security attributes */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
continue;
}
if(!SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) {
printf("%s:%d\n", __FUNCTION__, __LINE__);
r = EINVAL; /* TODO: return correct error code, clean up*/
goto __send_repl;
} else val = sx_iter->val;
sx_in = sx_iter->next;
if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) {
r = EINVAL; /* TODO: return correct error code, clean up*/
printf("%s:%d\n", __FUNCTION__, __LINE__);
goto __send_repl;
} else var = sx_in->val;
/* ok, now we need to analyze parameters */
if(*val != ':') {
r = EINVAL; /* TODO: clean up all the shit */
goto __send_repl;
} else {
if(!strcmp((char *)(val + sizeof(char)), "id")) {
cid = atoll(var);
break;
}
}
}
printf("%s(%ld)\n", __FUNCTION__, cid);
/* additional check for type of the channel */
node = usrtc_lookup(co->chnl_tree, &cid);
if(!node) {
r = ENOENT;
printf("there is no channel with id=%ld\n", cid);
goto __send_repl;
}
channel = (chnl_t *)usrtc_node_getdata(node);
if(!channel) {
r = ENOENT;
printf("there is no channel with id=%ld\n", cid);
goto __send_repl;
}
// TODO free it correctly
// free(channel->uuid);
// pthread_rwlock_wrlock(&(co->chnl_lock));
// idx_free(co->idx_ch, channel->cid);
// pthread_rwlock_unlock(&(co->chnl_lock));
// free(channel);
usrtc_delete(co->chnl_tree, node);
__send_repl:
buf = malloc(2048);
snprintf(buf, 2048, "(ch-close-ret ((:id %ld) (:error %d)))",
channel->cid, r);
int nbytes = SSL_write(co->ssl, buf, strlen(buf));
printf("%s: replied %s (%d bytes)\n", __FUNCTION__, buf, nbytes);
free(buf);
return 0;
}
static int __default_ch_close_ret(void *cctx, sexp_t *sx)
{
__DBGLINE;
conn_t *co = (conn_t *)cctx;
chnl_t *chan;
usrtc_node_t *node;
int err = 0, r, llen, idx;
ulong_t id;
char *val, *var;
sexp_t *lsx, *sx_iter, *sx_in;
sxmsg_t *sms = NULL;
/* skip keyword itself */
lsx = sx->list->next;
/* now we expect a list of lists */
if(lsx->ty != SEXP_LIST) {
//printf("%s:%d\n", __FUNCTION__, __LINE__);
r = EINVAL; /* TODO: right opcode */
goto __mark_msg;
}
/* take length of the list */
llen = sexp_list_length(lsx);
if(!llen) return EINVAL; /* TODO: !! other side will not set any security attributes */
__DBGLINE;
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
sexp_list_car(sx_iter, &sx_in);
if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) {
r = EINVAL; /* TODO: return correct error code, clean up*/
goto __mark_msg;
} else val = sx_in->val;
if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */
sexp_list_cdr(sx_iter, &sx_in);
if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) {
r = EINVAL; /* TODO: return correct error code, clean up*/
goto __mark_msg;
} else var = sx_in->val;
/* ok, now we need to analyze parameters */
if(*val != ':') {
r = EINVAL; /* TODO: clean up all the shit */
goto __mark_msg;
} else {
if(!strcmp((char *)(val + sizeof(char)), "error"))
err = atoi(var);
else if(!strcmp((char *)(val + sizeof(char)), "id"))
id = atoll(var);
}
} else continue; /* ignore */
}
__DBGLINE;
/* try to find desired channel to intercept message */
pthread_rwlock_rdlock(&(co->chnl_lock));
node = usrtc_lookup(co->chnl_tree, (void *)&id);
//printf("channels (%d)\n", usrtc_count(co->chnl_tree));
pthread_rwlock_unlock(&(co->chnl_lock));
if(node) {
printf("found channel!\n");
chan = (chnl_t *)usrtc_node_getdata(node);
sms = chan->sysmsg;
}
__DBGLINE;
__mark_msg:
__DBGLINE;
if(!sms) return r;
sms->flags &= ~ESXMSG_PENDING; /* the message is done */
sms->opcode = err;
/* unlock mutex to wake up the waiting thread */
pthread_mutex_unlock(&(sms->wait));
__DBGLINE;
return 0;
}
@ -674,38 +835,59 @@ static int __default_msg(void *cctx, sexp_t *sx)
conn_t *co = (conn_t *)cctx;
usrtc_node_t *node = NULL;
chnl_t *chan = NULL;
char *key; int r = 0;
sexp_t *lsx = NULL;
ulong_t chnl_id = 0;
ulong_t msg_id = 0x00;
int r = 0;
sexp_t *lsx = NULL, *sx_iter = NULL;
sexp_t *sx_sublist = NULL, *sx_value = NULL;
ulong_t chnl_id = -1;
ulong_t msg_id = -1;
sexp_t *msg = NULL;
sxmsg_t *smsg = NULL;
int idx;
/* get parameters from the message */
if(sexp_list_cdr(sx, &lsx)) return EINVAL;
if(!SEXP_IS_LIST(lsx)) return EINVAL;
// find channel id
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
sx_sublist = sx_iter;
continue;
} else {
if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) {
if(strcmp(sx_iter->val, ":chid")) {
continue; // ignore it
}
sx_value = sx_iter->next;
if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) {
continue;
}
chnl_id = atol(sx_value->val);
} else continue; // ignore it
}
}
lsx = sx_sublist;
// find message id
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
msg = sx_iter;
continue;
} else {
if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) {
if(strcmp(sx_iter->val, ":msgid")) {
continue; // ignore
}
sx_value = sx_iter->next;
if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) {
continue;
}
msg_id = atol(sx_value->val);
} else continue; // ignore it
}
}
/* FIXME: make it via iteraction, to cover the case with different arguments placement */
key = lsx->list->val;
if(strcmp(key, ":chid")) return EINVAL;
lsx = lsx->list->next;
if(!lsx) return EINVAL;
if(!SEXP_IS_TYPE(lsx, SEXP_BASIC)) return EINVAL;
chnl_id = atol(lsx->val);
lsx = lsx->next;
if(!SEXP_IS_LIST(lsx)) return EINVAL;
key = lsx->list->val;
if(strcmp(key, ":msgid")) return EINVAL;
lsx = lsx->list->next;
if(!lsx) return EINVAL;
if(!SEXP_IS_TYPE(lsx, SEXP_BASIC)) return EINVAL;
msg_id = atol(lsx->val); /* message id */
lsx = lsx->next;
if(!SEXP_IS_LIST(lsx)) return EINVAL;
msg = lsx;
if(msg_id < 0 || chnl_id < 0) {
return EINVAL;
}
/* find channel */
printf("chnl_id = %ld\n", chnl_id);
if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT;
@ -726,7 +908,9 @@ static int __default_msg(void *cctx, sexp_t *sx)
usrtc_insert(chan->msgs_tree, &(smsg->pendingq_node), &(smsg->mid));
pthread_rwlock_unlock(&(chan->msglock));
} else {
/* TODO: reply fdr this message with an error EEXIST */
// TODO is it correct?
smsg = (sxmsg_t *)usrtc_node_getdata(node);
msg_return(smsg, EEXIST);
return EEXIST;
}
@ -743,11 +927,13 @@ static int __default_msg(void *cctx, sexp_t *sx)
static int __default_msg_return(void *cctx, sexp_t *sx)
{
__DBGLINE;
return 0;
}
static int __default_msg_reply(void *cctx, sexp_t *sx)
{
__DBGLINE;
return 0;
}
@ -764,6 +950,7 @@ static int __init_systemrpc_tree(usrtc_t *rtree)
if(__insert_rpc_function(rtree, "ch-open", __default_ch_open)) goto __fail;
if(__insert_rpc_function(rtree, "ch-open-ret", __default_ch_open_ret)) goto __fail;
if(__insert_rpc_function(rtree, "ch-close", __default_ch_close)) goto __fail;
if(__insert_rpc_function(rtree, "ch-close-ret", __default_ch_close_ret)) goto __fail;
/* messaging functions */
if(__insert_rpc_function(rtree, "ch-msg-pulse", __default_msg_pulse)) goto __fail;
if(__insert_rpc_function(rtree, "ch-msg-pulse-ret", __default_msg_pulse_ret)) goto __fail;
@ -812,7 +999,8 @@ static void *__cxslave_thread_listener(void *wctx)
int r;
while((r = __conn_read(co, buf, 4096)) != -1) {
if(r) printf("Got the message %s \n", buf);
buf[r] = '\0';
if(r) printf("Got the message %s (%d bytes)\n", buf, r);
r = __eval_cstr(buf, conn_sys->system_rpc, co);
}
@ -828,7 +1016,8 @@ static void *__cxmaster_thread_listener(void *wctx)
int r;
while((r = __conn_read(co, buf, 4096)) != -1) {
if(r) printf("Got the message %s \n", buf);
buf[r] = '\0';
if(r) printf("Got the message %s (%d bytes)\n", buf, r);
r = __eval_cstr(buf, conn_sys->system_rpc, co);
}
@ -1357,6 +1546,7 @@ int connection_create(conn_t *co, int sck)
char *uuid;
char *buf = NULL;
usrtc_t *ch_tree, *rpc_tree;
pth_queue_t *rqueue = malloc(sizeof(pth_queue_t));
idx_allocator_t *idx_ch = malloc(sizeof(idx_allocator_t));
if(!co) return EINVAL;
@ -1382,6 +1572,10 @@ int connection_create(conn_t *co, int sck)
usrtc_init(ch_tree, USRTC_REDBLACK, MAX_CHANNELS, __cmp_ulong);
co->idx_ch = idx_ch;
/* assign message queue */
pth_queue_init(rqueue); /* TODO: check for initialization */
co->rqueue = rqueue;
/* init SSL certificates and context */
co->ctx = SSL_CTX_new(SSLv3_server_method());
@ -1683,9 +1877,77 @@ int channel_open(conn_t *co, chnl_t **ch, int type)
return r;
}
int channel_close(conn_t *co)
int channel_close(chnl_t *chnl)
{
char *uuid_ = __generate_uuid();
usrtc_node_t *node = NULL;
int r;
/* check unprocessed messages */
if(!usrtc_isempty(chnl->msgs_tree)) {
fprintf(stderr, "Unable to close channel");
return EINVAL;
}
node = usrtc_lookup(chnl->connection->chnl_tree, &chnl->cid);
if(!node) {
fprintf(stderr, "No such channel\n");
return ENOENT;
}
sxpayload_t *pl = malloc(sizeof(sxpayload_t));
if(!pl) return ENOMEM;
sxmsg_t *sms;
if(__create_sys_msg(&sms, uuid_, chnl, pl)) {
/* TODO: destroy the channel*/
return ENOMEM;
}
pl->sx = NULL;
if(!(pl->cstr = malloc(sizeof(char) * ESX_SYSMSG_SIZE))) {
free(pl);
return ENOMEM;
}
memset(pl->cstr, 0, sizeof(char) * ESX_SYSMSG_SIZE);
/* put system message to the run queue */
/* first form the message */
snprintf(pl->cstr, sizeof(char) * ESX_SYSMSG_SIZE,
"(ch-close (:id %ld))", chnl->cid);
chnl->sysmsg = sms; /* assign system message to the channel */
/* put it */
if((r = pth_queue_add(conn_sys->ioqueue, (void *)sms, SYS_MSG))) {
return r;
}
if(!(sms->flags & ESXMSG_PENDING)) {
/* was processed too fast */
goto __process_smsg;
} else pthread_mutex_lock(&(sms->wait)); /* will sleep until got a reply */
__process_smsg:
if(sms->opcode) {
r = sms->opcode;
return r;
} else r = 0;
chnl->flags &= ~ESXCHAN_PENDING; /* mark it as established */
/* TODO: destroy system message in the channel */
__DBGLINE;
/* remove channel from the search tree */
pthread_rwlock_wrlock(&(chnl->connection->chnl_lock));
usrtc_delete(chnl->connection->chnl_tree, &chnl->node);
pthread_rwlock_unlock(&(chnl->connection->chnl_lock));
free(pl->cstr);
free(pl);
// free(chnl->uuid);
// pthread_rwlock_wrlock(&(chnl->connection->chnl_lock));
// idx_free(chnl->connection->idx_ch, chnl->cid);
// pthread_rwlock_unlock(&(chnl->connection->chnl_lock));
// idx_free(chnl->connection->idx_ch, chnl->cid);
// free(chnl);
return 0;
}
/* message passing */

Loading…
Cancel
Save