/* * Secure Network Transport Layer Library implementation. * This is a proprietary software. See COPYING for further details. * * (c) 2013-2014 Copyright Askele, inc. * (c) 2013-2014 Copyright Askele Ingria, inc. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include struct __rpc_job { sxmsg_t *msg; sexp_t *sx; int (*rpcf) (void *, sexp_t *); }; conn_sys_t *conn_sys = NULL; static long __cmp_ulong(const void *a, const void *b); /* message alloc and destroy */ static sxmsg_t *__allocate_msg(int *res); static void __destroy_msg(sxmsg_t *msg); static int __rpc_callback(void *data) { struct __rpc_job *job = (struct __rpc_job *)data; return job->rpcf((void *)(job->msg), job->sx); } int __alloc_channel(ulong_t cid, conn_t *co, rpc_typed_list_t *rlist, chnl_t **channel) { int r = 0; chnl_t *ch = malloc(sizeof(chnl_t)); usrtc_t *msg_tree = malloc(sizeof(usrtc_t)); idx_allocator_t *idx_msg = malloc(sizeof(idx_allocator_t)); if(!idx_msg) goto __fin_enomem; else if(idx_allocator_init(idx_msg, MAX_MSGINDEX, 0)) goto __fin_enomem; if(!ch || !msg_tree) { __fin_enomem: r = ENOMEM; goto __fin_up; } else { usrtc_init(msg_tree, USRTC_REDBLACK, MAX_PENDINGMSG, __cmp_ulong); ch->cid = cid; ch->flags = ch->use_count = 0; usrtc_node_init(&ch->node, ch); if(rlist) ch->rpc_list = rlist->rpc_list; /* init locks */ if(pthread_rwlock_init(&(ch->msglock), NULL)) { r = ENOMEM; goto __fin_up; } if(pthread_mutex_init(&(ch->oplock), NULL)) { pthread_rwlock_destroy(&(ch->msglock)); r = ENOMEM; goto __fin_up; } /* assign all the stuff */ ch->idx_msg = idx_msg; ch->msgs_tree = msg_tree; ch->connection = co; } __fin_up: if(r) { if(idx_msg) free(idx_msg); if(ch) free(ch); if(msg_tree) free(msg_tree); return r; } else { *channel = ch; return 0; } } static int __conn_read(conn_t *co, void *buf, size_t buf_len) { int rfd = SSL_get_fd(co->ssl), r; fd_set readset; fprintf(stderr, "\tListening ... on %s\n", co->uuid); /* get prepare to select */ FD_ZERO(&readset); FD_SET(rfd, &readset); /* waits until something will be ready to read */ r = select(FD_SETSIZE, &readset, NULL, NULL, NULL); if(r < 0) { printf("select (%d)\n", errno); return -1; } if(!r) { printf("Nothing to wait for\n"); return 0; } if(r && FD_ISSET(rfd, &readset)) { do { //pthread_mutex_lock(&(co->oplock)); /* ok, now we're ready to perform SSL_read */ r = SSL_read(co->ssl, buf, (int)buf_len); switch(SSL_get_error(co->ssl, r)) { case SSL_ERROR_NONE: //printf("Read done (f:%d)\n", rfd); /* this is means we're get ridden it all */ return r; break; case SSL_ERROR_ZERO_RETURN: printf("No data to read\n"); /* no data to read ... */ return 0; break; case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_WRITE: printf("Bypass until SSL buffer not ready.\n"); return 0; default: /* seems the connection lost */ fprintf(stderr, "(RD)Unknown error on %s\n", co->uuid); return -1; } //pthread_mutex_unlock(&(co->oplock)); } while(SSL_pending(co->ssl)); } return 0; } static int __conn_write(conn_t *co, void *buf, size_t buf_len) { int r; pthread_mutex_lock(&(co->oplock)); __retry: r = SSL_write(co->ssl, buf, (int)buf_len); switch(SSL_get_error(co->ssl, r)) { case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_WRITE: goto __retry; break; default: pthread_mutex_unlock(&(co->oplock)); if(r < 0) { fprintf(stderr, "(WR)Unknown error on %s (%d)\n", co->uuid, r); return -1; } else return 0; } pthread_mutex_unlock(&(co->oplock)); return 0; } static long __cmp_cstr(const void *a, const void *b) { return strcmp((char *)a, (char *)b); } static long __cmp_int(const void *a, const void *b) { return *(int *)a - *(int *)b; } static long __cmp_ulong(const void *a, const void *b) { //printf("(??cmp_ulong)a = %ld b = %ld\n", *(ulong_t *)a , *(ulong_t *)b); return *(ulong_t *)a - *(ulong_t *)b; } static int __resolvehost(const char *hostname, char *buf, int buf_len, struct hostent **rhp) { struct hostent *hostbuf = malloc(sizeof(struct hostent)); struct hostent *hp = *rhp = NULL; int herr = 0, hres = 0; if(!hostbuf) return NO_ADDRESS; hres = gethostbyname_r(hostname, hostbuf, buf, buf_len, &hp, &herr); if (!hp) return NO_ADDRESS; *rhp = hp; return NETDB_SUCCESS; } static void __destroy_rpc_list_tree(usrtc_t *tree) { usrtc_node_t *node; cx_rpc_t *ent; for(node = usrtc_first(tree); node != NULL; node = usrtc_first(tree)) { ent = (cx_rpc_t *)usrtc_node_getdata(node); usrtc_delete(tree, node); free(ent->name); free(ent); } return; } static int __insert_rpc_function(usrtc_t *tree, const char *name, int (*rpcf)(void *, sexp_t *)) { cx_rpc_t *ent = malloc(sizeof(cx_rpc_t)); usrtc_node_t *node; if(!ent) return ENOMEM; else node = &ent->node; if(!(ent->name = strdup(name))) { free(ent); return ENOMEM; } else ent->rpcf = rpcf; usrtc_node_init(node, ent); usrtc_insert(tree, node, ent->name); return 0; } static void __wake_up_waiters(conn_t *co, int opcode) { usrtc_node_t *node = NULL, *last_node = NULL; usrtc_node_t *msg_node = NULL, *last_msg_node = NULL; chnl_t *ch; sxmsg_t *smsg = NULL; pthread_rwlock_wrlock(&(co->chnl_lock)); node = usrtc_first(co->chnl_tree); last_node = usrtc_last(co->chnl_tree); while(!usrtc_isempty(co->chnl_tree)) { ch = (chnl_t *)usrtc_node_getdata(node); pthread_rwlock_rdlock(&(ch->msglock)); msg_node = usrtc_first(ch->msgs_tree); last_msg_node = usrtc_last(ch->msgs_tree); while(!usrtc_isempty(ch->msgs_tree)) { smsg = (sxmsg_t *)usrtc_node_getdata(msg_node); smsg->opcode = opcode; pthread_mutex_unlock(&(smsg->wait)); if(msg_node == last_msg_node) break; msg_node = usrtc_next(ch->msgs_tree, msg_node); } pthread_rwlock_unlock(&(ch->msglock)); if(node == last_node) break; node = usrtc_next(co->chnl_tree, node); } pthread_rwlock_unlock(&(co->chnl_lock)); } static int __default_auth_set_context(void *cctx, sexp_t *sx) { conn_t *co = (conn_t *)cctx; char *val, *var, *tbuf; sexp_t *lsx, *sx_iter, *sx_in; int llen, idx, err; //co->pctx = malloc(sizeof(perm_ctx_t)); /* skip keyword itself */ lsx = sx->list->next; /* now we expect a list of lists */ if(lsx->ty != SEXP_LIST) { err = ESXRCBADPROT; goto __reply; } /* take length of the list */ llen = sexp_list_length(lsx); if(!llen) return 0; /* other side will not set any security attributes */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { sexp_list_car(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { err = ESXRCBADPROT; goto __reply; } else val = sx_in->val; if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */ sexp_list_cdr(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_DQUOTE)) { err = ESXRCBADPROT; goto __reply; } else var = sx_in->val; /* ok, now we need to analyze parameters */ if(!strcmp(val, ":user") && var) { co->pctx->login = strdup(var); } else if(!strcmp(val, ":passwd") && var) { co->pctx->passwd = strdup(var); } else { /* just ignore in default implementation */ } } else continue; /* ignore */ } /* ok, now we need to fill security context */ tbuf = malloc(2048); if(conn_sys->secure_check) err = conn_sys->secure_check(co); __reply: if(err) { snprintf(tbuf, 2048, "(auth-set-error (%d))", err); } else { snprintf(tbuf, 2048, "(auth-set-attr (:attr %d)(:uid %ld)(:gid %ld))", co->pctx->p_attr, co->pctx->uid, co->pctx->gid); } /* we will send it */ if(__conn_write(co, tbuf, strlen(tbuf)) < 0) { co->flags &= ~CXCONN_ESTABL; __wake_up_waiters(co, ESXNOCONNECT); } destroy_sexp(sx); free(tbuf); return err; } static int __default_auth_set_attr(void *cctx, sexp_t *sx) { conn_t *co = (conn_t *)cctx; char *val, *var; sexp_t *lsx, *sx_iter, *sx_in; int llen, idx, r = 0; /* skip keyword itself */ lsx = sx->list->next; /* now we expect a list of lists */ if(lsx->ty != SEXP_LIST) { // printf("%s:%d\n", __FUNCTION__, __LINE__); r = ESXRCBADPROT; goto __finish; } /* take length of the list */ llen = sexp_list_length(lsx); if(!llen) return 0; /* other side will not set any security attributes */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { sexp_list_car(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { r = ESXRCBADPROT; goto __finish; } else val = sx_in->val; if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */ sexp_list_cdr(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { r = ESXRCBADPROT; goto __finish; } else var = sx_in->val; /* ok, now we need to analyze parameters */ if(!strcmp(val, ":attr")) { co->pctx->p_attr = atoi(var); } else if(!strcmp(val, ":uid")) { co->pctx->uid = (ulong_t)atoll(var); } else if(!strcmp(val, ":gid")) { co->pctx->gid = (ulong_t)atoll(var); } else { /* just ignore in default implementation */ } } else continue; /* ignore */ } __finish: destroy_sexp(sx); return r; } static int __default_auth_set_error(void *cctx, sexp_t *sx) { char *errstr = NULL; int r; /* skip keyword itself */ sx->list = sx->list->next; /* be sure - this is a list */ if(sx->ty != SEXP_LIST) return ESXRCBADPROT; else sx = sx->list; /* get it */ errstr = sx->list->val; r = atoi(errstr); destroy_sexp(sx); return r; } static int __default_ch_get_types(void *cctx, sexp_t *sx) { conn_t *co = (conn_t *)cctx; usrtc_node_t *node; rpc_typed_list_t *list_ent; char *tbuf = malloc(4096), *tt; int err = 0; /* if we cannot allocate anything ... */ if(!tbuf) return ENOMEM; /* ok here we go */ co->rpc_list = conn_sys->get_rpc_typed_list_tree(co); /* ok, here we're don't need to parse anything */ if(!usrtc_count(co->rpc_list)) { err = ENXIO; snprintf(tbuf, 4096, "(ch-gl-error (%d))", err); } else { tt = tbuf; snprintf(tt, 4096, "(ch-set-types ("); tt += strlen(tt); for(node = usrtc_first(co->rpc_list); node != NULL; node = usrtc_next(co->rpc_list, node), tt += strlen(tt)) { list_ent = (rpc_typed_list_t *)usrtc_node_getdata(node); snprintf(tt, 4096, "(:%d \"%s\")", list_ent->type_id, list_ent->description); } snprintf(tt, 4096, "))"); } /* reply to this rpc */ if(__conn_write(co, tbuf, strlen(tbuf)) < 0) { co->flags &= ~CXCONN_ESTABL; __wake_up_waiters(co, ESXNOCONNECT); } free(tbuf); destroy_sexp(sx); return err; } static int __default_ch_set_types(void *cctx, sexp_t *sx) { conn_t *co = (conn_t *)cctx; char buf[1024], *val, *var; int r = 0, llen, typeid, idx; sexp_t *lsx, *sx_iter, *sx_in; /* skip keyword itself */ lsx = sx->list->next; /* now we expect a list of lists */ if(lsx->ty != SEXP_LIST) { r = ESXRCBADPROT; goto __send_reply; } /* take length of the list */ llen = sexp_list_length(lsx); if(!llen) return 0; /* other side will not set any security attributes */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { sexp_list_car(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { r = ESXRCBADPROT; goto __send_reply; } else val = sx_in->val; if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */ sexp_list_cdr(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_DQUOTE)) { r = ESXRCBADPROT; goto __send_reply; } else var = sx_in->val; /* ok, now we need to analyze parameters */ if(*val != ':') { r = ESXRCBADPROT; goto __send_reply; } else { if(conn_sys->set_typed_list_callback) { typeid = atoi((char *)(val + sizeof(char))); if(conn_sys->set_typed_list_callback(co, typeid, var)) { destroy_sexp(sx); return ENXIO; } } /* FIXME: if no function, accept or decline ? */ } } else continue; /* ignore */ } __send_reply: snprintf(buf, 1024, "(ch-gl-error (%d))", r); if(__conn_write(co, buf, strlen(buf)) < 0) { co->flags &= ~CXCONN_ESTABL; __wake_up_waiters(co, ESXNOCONNECT); } destroy_sexp(sx); return r; } static int __default_ch_gl_error(void *cctx, sexp_t *sx) { int r; char *errstr; conn_t *co = (conn_t *)cctx; if(co->flags & CXCONN_ESTABL) return EINVAL; /* error, we're already have channels list */ /* skip keyword itself */ sx->list = sx->list->next; /* be sure - this is a list */ if(sx->ty != SEXP_LIST) return ESXRCBADPROT; else sx = sx->list; /* get it */ errstr = sx->list->val; r = atoi(errstr); if(!r) co->flags |= CXCONN_ESTABL; return r; } static int __default_ch_open(void *cctx, sexp_t *sx) { conn_t *co = (conn_t *)cctx; usrtc_node_t *node; char *val, *var, *uuid = NULL, *buf; int typ = -1, idx, llen, r; ulong_t cid; sexp_t *lsx, *sx_iter, *sx_in; rpc_typed_list_t *rlist; chnl_t *channel; /* skip keyword itself */ lsx = sx->list->next; /* now we expect a list of lists */ if(lsx->ty != SEXP_LIST) { printf("%s:%d\n", __FUNCTION__, __LINE__); r = ESXRCBADPROT; goto __send_repl; } /* take length of the list */ llen = sexp_list_length(lsx); if(!llen) return 0; /* other side will not set any security attributes */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { sexp_list_car(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { printf("%s:%d\n", __FUNCTION__, __LINE__); r = ESXRCBADPROT; goto __send_repl; } else val = sx_in->val; if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */ sexp_list_cdr(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { r = ESXRCBADPROT; printf("%s:%d\n", __FUNCTION__, __LINE__); goto __send_repl; } else var = sx_in->val; /* ok, now we need to analyze parameters */ if(*val != ':') { r = ESXRCBADPROT; goto __send_repl; } else { if(!strcmp((char *)(val + sizeof(char)), "type")) typ = atoi(var); else if(!strcmp((char *)(val + sizeof(char)), "id")) cid = atoll(var); else if(!strcmp((char *)(val + sizeof(char)), "uuid")) uuid = var; } } else continue; /* ignore */ } /* additional check for type of the channel */ node = usrtc_lookup(co->rpc_list, &typ); if(!node) { r = ESXNOCHANSUP; /* printf("%s:%d (usrtc count: %d) (typ %d)\n", __FUNCTION__, __LINE__, usrtc_count(co->rpc_list), typ);*/ node = usrtc_first(co->rpc_list); rlist = (rpc_typed_list_t *)usrtc_node_getdata(node); printf("---- rlist->type_id = %d\n", rlist->type_id); goto __send_repl; } else rlist = (rpc_typed_list_t *)usrtc_node_getdata(node); /* now we need to check up the channel */ pthread_rwlock_rdlock(&(co->chnl_lock)); node = usrtc_lookup(co->chnl_tree, &cid); if(node) { pthread_rwlock_unlock(&(co->chnl_lock)); r = EEXIST; goto __send_repl; } else { idx_reserve(co->idx_ch, cid); pthread_rwlock_unlock(&(co->chnl_lock)); /* now we should alloc channel */ if((r = __alloc_channel(cid, co, rlist, &channel))) { pthread_rwlock_wrlock(&(co->chnl_lock)); idx_free(co->idx_ch, cid); pthread_rwlock_unlock(&(co->chnl_lock)); goto __send_repl; } else { /* now we ready to confirm channel creation */ pthread_rwlock_wrlock(&(co->chnl_lock)); usrtc_insert(co->chnl_tree, &(channel->node), &(channel->cid)); pthread_rwlock_unlock(&(co->chnl_lock)); r = 0; } } __send_repl: buf = malloc(2048); snprintf(buf, 2048, "(ch-open-ret ((:error %d)(:uuid %s)(:id %ld)))", r, uuid, cid); if(__conn_write(co, buf, strlen(buf)) < 0) { co->flags &= ~CXCONN_ESTABL; __wake_up_waiters(co, ESXNOCONNECT); } destroy_sexp(sx); free(buf); return r; } static int __default_ch_open_ret(void *cctx, sexp_t *sx) { conn_t *co = (conn_t *)cctx; chnl_t *chan; usrtc_node_t *node; int err = 0, r, llen, idx; ulong_t id; char *val, *var; sexp_t *lsx, *sx_iter, *sx_in; sxmsg_t *sms = NULL; /* skip keyword itself */ lsx = sx->list->next; /* now we expect a list of lists */ if(lsx->ty != SEXP_LIST) { //printf("%s:%d\n", __FUNCTION__, __LINE__); r = ESXRCBADPROT; goto __mark_msg; } /* take length of the list */ llen = sexp_list_length(lsx); if(!llen) return EINVAL; /* !! other side will not set any security attributes */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { sexp_list_car(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { r = ESXRCBADPROT; goto __mark_msg; } else val = sx_in->val; if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */ sexp_list_cdr(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { r = ESXRCBADPROT; goto __mark_msg; } else var = sx_in->val; /* ok, now we need to analyze parameters */ if(*val != ':') { r = ESXRCBADPROT; goto __mark_msg; } else { if(!strcmp((char *)(val + sizeof(char)), "error")) err = atoi(var); else if(!strcmp((char *)(val + sizeof(char)), "id")) id = atoll(var); } } else continue; /* ignore */ } /* try to find desired channel to intercept message */ pthread_rwlock_rdlock(&(co->chnl_lock)); node = usrtc_lookup(co->chnl_tree, (void *)&id); //printf("channels (%d)\n", usrtc_count(co->chnl_tree)); pthread_rwlock_unlock(&(co->chnl_lock)); if(node) { chan = (chnl_t *)usrtc_node_getdata(node); sms = chan->sysmsg; } __mark_msg: if(!sms) return r; sms->flags &= ~ESXMSG_PENDING; /* the message is done */ sms->opcode = err; destroy_sexp(sx); /* unlock mutex to wake up the waiting thread */ pthread_mutex_unlock(&(sms->wait)); return 0; } static int __default_ch_close(void *cctx, sexp_t *sx) { conn_t *co = (conn_t *)cctx; usrtc_node_t *node; char *val, *var, *buf; int idx, llen, r; ulong_t cid = -1; sexp_t *lsx, *sx_iter, *sx_in; chnl_t *channel = NULL; r = 0; /* skip keyword itself */ lsx = sx->list->next; /* now we expect a list of lists */ if(lsx->ty != SEXP_LIST) { printf("%s:%d\n", __FUNCTION__, __LINE__); r = ESXRCBADPROT; goto __send_repl; } /* take length of the list */ llen = sexp_list_length(lsx); if(!llen) return 0; /* other side will not set any security attributes */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { continue; } if(!SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { printf("%s:%d\n", __FUNCTION__, __LINE__); r = ESXRCBADPROT; goto __send_repl; } else val = sx_iter->val; sx_in = sx_iter->next; if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { r = ESXRCBADPROT; printf("%s:%d\n", __FUNCTION__, __LINE__); goto __send_repl; } else var = sx_in->val; /* ok, now we need to analyze parameters */ if(*val != ':') { r = ESXRCBADPROT; goto __send_repl; } else { if(!strcmp((char *)(val + sizeof(char)), "id")) { cid = atoll(var); break; } } } //printf("%s(%ld)\n", __FUNCTION__, cid); /* additional check for type of the channel */ pthread_rwlock_rdlock(&(co->chnl_lock)); node = usrtc_lookup(co->chnl_tree, &cid); pthread_rwlock_unlock(&(co->chnl_lock)); if(!node) { r = ENOENT; printf("there is no channel with id=%ld\n", cid); goto __send_repl; } channel = (chnl_t *)usrtc_node_getdata(node); __send_repl: buf = malloc(2048); snprintf(buf, 2048, "(ch-close-ret ((:id %ld) (:error %d)))", channel->cid, r); /* check up the message queue */ pthread_rwlock_rdlock(&(channel->msglock)); if(usrtc_count(channel->msgs_tree)) { fprintf(stderr, "Operation is not permitted. There are some " "undelivered messages in the message tree"); free(buf); destroy_sexp(sx); return EPERM; } pthread_rwlock_unlock(&(channel->msglock)); /* remove channel from the search tree */ pthread_rwlock_wrlock(&(co->chnl_lock)); usrtc_delete(co->chnl_tree, &(channel->node)); /* free index */ idx_free(co->idx_ch, channel->cid); pthread_rwlock_unlock(&(co->chnl_lock)); idx_allocator_destroy(channel->idx_msg); free(channel->idx_msg); free(channel->msgs_tree); pthread_mutex_destroy(&(channel->oplock)); pthread_rwlock_destroy(&(channel->msglock)); free(channel); destroy_sexp(sx); if(__conn_write(co, buf, strlen(buf)) < 0) { co->flags &= ~CXCONN_ESTABL; __wake_up_waiters(co, ESXNOCONNECT); } free(buf); return 0; } static int __default_ch_close_ret(void *cctx, sexp_t *sx) { ; conn_t *co = (conn_t *)cctx; chnl_t *chan; usrtc_node_t *node; int err = 0, r, llen, idx; ulong_t id; char *val, *var; sexp_t *lsx, *sx_iter, *sx_in; sxmsg_t *sms = NULL; /* skip keyword itself */ lsx = sx->list->next; /* now we expect a list of lists */ if(lsx->ty != SEXP_LIST) { //printf("%s:%d\n", __FUNCTION__, __LINE__); r = ESXRCBADPROT; goto __mark_msg; } /* take length of the list */ llen = sexp_list_length(lsx); if(!llen) return EINVAL; /* !! other side will not set any security attributes */ ; SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { sexp_list_car(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { r = ESXRCBADPROT; goto __mark_msg; } else val = sx_in->val; if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */ sexp_list_cdr(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { r = ESXRCBADPROT; goto __mark_msg; } else var = sx_in->val; /* ok, now we need to analyze parameters */ if(*val != ':') { r = ESXRCBADPROT; goto __mark_msg; } else { if(!strcmp((char *)(val + sizeof(char)), "error")) err = atoi(var); else if(!strcmp((char *)(val + sizeof(char)), "id")) id = atoll(var); } } else continue; /* ignore */ } ; /* try to find desired channel to intercept message */ pthread_rwlock_rdlock(&(co->chnl_lock)); node = usrtc_lookup(co->chnl_tree, (void *)&id); //printf("channels (%d)\n", usrtc_count(co->chnl_tree)); pthread_rwlock_unlock(&(co->chnl_lock)); if(node) { chan = (chnl_t *)usrtc_node_getdata(node); sms = chan->sysmsg; } ; __mark_msg: ; if(!sms) return r; sms->flags &= ~ESXMSG_PENDING; /* the message is done */ sms->opcode = err; destroy_sexp(sx); /* unlock mutex to wake up the waiting thread */ pthread_mutex_unlock(&(sms->wait)); return 0; } /* create a nould of the message */ static int __create_reg_msg_mould(sxmsg_t **msg, chnl_t *ch, ulong_t mid) { int r = 0; sxmsg_t *sm = __allocate_msg(&r); if(r) return r; else { sm->pch = ch; sm->flags = (ESXMSG_USR | ESXMSG_PENDING); sm->mid = mid; /* ok reserve message ID */ pthread_mutex_lock(&(ch->oplock)); idx_reserve(ch->idx_msg, mid); pthread_mutex_unlock(&(ch->oplock)); pthread_mutex_lock(&(sm->wait)); *msg = sm; } return 0; } // TODO: check and continue static int __default_msg_pulse(void *cctx, sexp_t *sx) { conn_t *co = (conn_t *)cctx; usrtc_node_t *node = NULL; chnl_t *chan = NULL; int r = 0; sexp_t *lsx = NULL, *sx_iter = NULL; sexp_t *sx_sublist = NULL, *sx_value = NULL; ulong_t chnl_id = -1; ulong_t msg_id = -1; sexp_t *msg = NULL; sxmsg_t *smsg = NULL; int idx; /* get parameters from the message */ if(sexp_list_cdr(sx, &lsx)) return ESXRCBADPROT; if(!SEXP_IS_LIST(lsx)) return ESXRCBADPROT; /* find channel id */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { sx_sublist = sx_iter; continue; } else { if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { if(strcmp(sx_iter->val, ":chid")) { continue; // ignore it } sx_value = sx_iter->next; if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { continue; } chnl_id = atol(sx_value->val); } else continue; // ignore it } } lsx = sx_sublist; /* find message id */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { msg = sx_iter; continue; } else { if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { if(strcmp(sx_iter->val, ":msgid")) { continue; // ignore } sx_value = sx_iter->next; if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { continue; } msg_id = atol(sx_value->val); } else continue; // ignore it } } if(msg_id < 0 || chnl_id < 0) { return ESXRCBADPROT; } /* find channel */ if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT; else chan = (chnl_t *)usrtc_node_getdata(node); /* lookup for the message */ pthread_rwlock_rdlock(&(chan->msglock)); if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) { pthread_rwlock_unlock(&(chan->msglock)); /* here, rpc lookup has no sense, just put this job to the queue */ /* btw, we're need to create a message first */ r = __create_reg_msg_mould(&smsg, chan, msg_id); if(r) return r; /* assign the message */ smsg->opcode = 0; smsg->payload = (void *)msg; /* assign initial S-expression structure */ smsg->initial_sx = sx; /* put the message to the search tree */ pthread_rwlock_wrlock(&(chan->msglock)); usrtc_insert(chan->msgs_tree, &(smsg->pendingq_node), &(smsg->mid)); pthread_rwlock_unlock(&(chan->msglock)); } else { //printf(">>>>>>>>>>>>>>>>>>>msg_id = %lu\n", msg_id); pthread_rwlock_unlock(&(chan->msglock)); smsg = (sxmsg_t *)usrtc_node_getdata(node); msg_return(smsg, EEXIST); return EEXIST; } /* put job to the queue and give up */ r = pth_queue_add(co->rqueue, (void *)smsg, USR_MSG); if(r) { /* cannot put job to the queue */ pthread_rwlock_wrlock(&(chan->msglock)); usrtc_delete(chan->msgs_tree, &(smsg->pendingq_node)); pthread_rwlock_unlock(&(chan->msglock)); __destroy_msg(smsg); return r; } //msg_return(smsg, r); /* put to the IN queue */ return r; } static int __default_msg_pulse_ret(void *cctx, sexp_t *sx) { return 0; } static int __default_msg(void *cctx, sexp_t *sx) { conn_t *co = (conn_t *)cctx; usrtc_node_t *node = NULL; chnl_t *chan = NULL; int r = 0; sexp_t *lsx = NULL, *sx_iter = NULL; sexp_t *sx_sublist = NULL, *sx_value = NULL; ulong_t chnl_id = -1; ulong_t msg_id = -1; sexp_t *msg = NULL; sxmsg_t *smsg = NULL; int idx; /* get parameters from the message */ if(sexp_list_cdr(sx, &lsx)) return ESXRCBADPROT; if(!SEXP_IS_LIST(lsx)) return ESXRCBADPROT; /* find channel id */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { sx_sublist = sx_iter; continue; } else { if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { if(strcmp(sx_iter->val, ":chid")) { continue; // ignore it } sx_value = sx_iter->next; if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { continue; } chnl_id = atol(sx_value->val); } else continue; // ignore it } } lsx = sx_sublist; /* find message id */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { msg = sx_iter; continue; } else { if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { if(strcmp(sx_iter->val, ":msgid")) { continue; // ignore } sx_value = sx_iter->next; if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { continue; } msg_id = atol(sx_value->val); } else continue; // ignore it } } if(msg_id < 0 || chnl_id < 0) { return ESXRCBADPROT; } /* find channel */ if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT; else chan = (chnl_t *)usrtc_node_getdata(node); /* lookup for the message */ pthread_rwlock_rdlock(&(chan->msglock)); if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) { pthread_rwlock_unlock(&(chan->msglock)); /* here, rpc lookup has no sense, just put this job to the queue */ /* btw, we're need to create a message first */ r = __create_reg_msg_mould(&smsg, chan, msg_id); if(r) return r; /* assign the message */ smsg->opcode = 0; smsg->payload = (void *)msg; /* assign initial S-expression structure */ smsg->initial_sx = sx; /* put the message to the search tree */ pthread_rwlock_wrlock(&(chan->msglock)); usrtc_insert(chan->msgs_tree, &(smsg->pendingq_node), &(smsg->mid)); pthread_rwlock_unlock(&(chan->msglock)); } else { //printf(">>>>>>>>>>>>>>>>>>>msg_id = %lu\n", msg_id); pthread_rwlock_unlock(&(chan->msglock)); smsg = (sxmsg_t *)usrtc_node_getdata(node); msg_return(smsg, EEXIST); return EEXIST; } /* put job to the queue and give up */ r = pth_queue_add(co->rqueue, (void *)smsg, USR_MSG); if(r) { /* cannot put job to the queue */ msg_return(smsg, r); pthread_rwlock_wrlock(&(chan->msglock)); usrtc_delete(chan->msgs_tree, &(smsg->pendingq_node)); pthread_rwlock_unlock(&(chan->msglock)); __destroy_msg(smsg); return r; } /* put to the IN queue */ return r; } /* TODO: optimize amount of code: (must be first) * __default_msg_return * __default_msg_reply * there are many copy-n-paste code!!!!! */ static int __default_msg_return(void *cctx, sexp_t *sx) { conn_t *co = (conn_t *)cctx; usrtc_node_t *node = NULL; chnl_t *chan = NULL; int r = 0; sexp_t *lsx = NULL, *sx_iter = NULL; sexp_t *sx_sublist = NULL, *sx_value = NULL; ulong_t chnl_id = -1; ulong_t msg_id = -1; sexp_t *msg = NULL; sxmsg_t *smsg = NULL; int idx, opcode; /* get parameters from the message */ if(sexp_list_cdr(sx, &lsx)) { r = ESXRCBADPROT; goto __finish; } if(!SEXP_IS_LIST(lsx)) { r = ESXRCBADPROT; goto __finish; } /* get parameters */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { sx_sublist = sx_iter; continue; } else { if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { if(strcmp(sx_iter->val, ":chid")) { continue; // ignore it } sx_value = sx_iter->next; if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { continue; } chnl_id = atol(sx_value->val); } else continue; // ignore it } } lsx = sx_sublist; /* find message id */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { msg = sx_iter; continue; } else { if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { if(strcmp(sx_iter->val, ":msgid")) { continue; // ignore } sx_value = sx_iter->next; if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { continue; } msg_id = atol(sx_value->val); } else continue; // ignore it } } /* get opcode */ sexp_list_car(msg, &lsx); opcode = atoi(lsx->val); if(msg_id < 0 || chnl_id < 0) { r = ESXRCBADPROT; goto __finish; } //printf("chnl_id = %ld\n", chnl_id); if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT; else chan = (chnl_t *)usrtc_node_getdata(node); /* lookup for the message */ pthread_rwlock_rdlock(&(chan->msglock)); if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) { pthread_rwlock_unlock(&(chan->msglock)); r = ENOENT; goto __finish; } else { pthread_rwlock_unlock(&(chan->msglock)); smsg = (sxmsg_t *)usrtc_node_getdata(node); pthread_rwlock_wrlock(&(chan->msglock)); idx_free(chan->idx_msg, msg_id); usrtc_delete(chan->msgs_tree, node); pthread_rwlock_unlock(&(chan->msglock)); //smsg = (sxmsg_t *)usrtc_node_getdata(node); smsg->opcode = opcode; smsg->payload = NULL; pthread_mutex_unlock(&(smsg->wait)); } __finish: destroy_sexp(sx); return r; } static int __default_msg_reply(void *cctx, sexp_t *sx) { conn_t *co = (conn_t *)cctx; usrtc_node_t *node = NULL; chnl_t *chan = NULL; int r = 0; sexp_t *lsx = NULL, *sx_iter = NULL; sexp_t *sx_sublist = NULL, *sx_value = NULL; ulong_t chnl_id = -1; ulong_t msg_id = -1; sexp_t *msg = NULL; sxmsg_t *smsg = NULL; int idx; /* get parameters from the message */ if(sexp_list_cdr(sx, &lsx)) { r = ESXRCBADPROT; goto __finish; } if(!SEXP_IS_LIST(lsx)) { r = ESXRCBADPROT; goto __finish; } /* get parameters */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { sx_sublist = sx_iter; continue; } else { if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { if(strcmp(sx_iter->val, ":chid")) { continue; // ignore it } sx_value = sx_iter->next; if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { continue; } chnl_id = atol(sx_value->val); } else continue; // ignore it } } lsx = sx_sublist; /* find message id */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { msg = sx_iter; continue; } else { if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { if(strcmp(sx_iter->val, ":msgid")) { continue; // ignore } sx_value = sx_iter->next; if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { continue; } msg_id = atol(sx_value->val); } else continue; // ignore it } } if(msg_id < 0 || chnl_id < 0) { r = ESXRCBADPROT; goto __finish; } if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT; else chan = (chnl_t *)usrtc_node_getdata(node); /* lookup for the message */ pthread_rwlock_rdlock(&(chan->msglock)); if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) { pthread_rwlock_unlock(&(chan->msglock)); r = ENOENT; goto __finish; } else { pthread_rwlock_unlock(&(chan->msglock)); smsg = (sxmsg_t *)usrtc_node_getdata(node); smsg->opcode = SXOREPLYREQ; smsg->payload = copy_sexp(msg); smsg->flags |= ESXMSG_ISREPLY; pthread_mutex_unlock(&(smsg->wait)); } __finish: destroy_sexp(sx); return r; } static int __init_systemrpc_tree(usrtc_t *rtree) { /* security context functions */ if(__insert_rpc_function(rtree, "auth-set-context", __default_auth_set_context)) goto __fail; if(__insert_rpc_function(rtree, "auth-set-attr", __default_auth_set_attr)) goto __fail; if(__insert_rpc_function(rtree, "auth-set-error", __default_auth_set_error)) goto __fail; /* channels negotiation ops */ if(__insert_rpc_function(rtree, "ch-get-types", __default_ch_get_types)) goto __fail; if(__insert_rpc_function(rtree, "ch-gl-error", __default_ch_gl_error)) goto __fail; if(__insert_rpc_function(rtree, "ch-set-types", __default_ch_set_types)) goto __fail; if(__insert_rpc_function(rtree, "ch-open", __default_ch_open)) goto __fail; if(__insert_rpc_function(rtree, "ch-open-ret", __default_ch_open_ret)) goto __fail; if(__insert_rpc_function(rtree, "ch-close", __default_ch_close)) goto __fail; if(__insert_rpc_function(rtree, "ch-close-ret", __default_ch_close_ret)) goto __fail; /* messaging functions */ if(__insert_rpc_function(rtree, "ch-msg-pulse", __default_msg_pulse)) goto __fail; if(__insert_rpc_function(rtree, "ch-msg-pulse-ret", __default_msg_pulse_ret)) goto __fail; if(__insert_rpc_function(rtree, "ch-msg", __default_msg)) goto __fail; if(__insert_rpc_function(rtree, "ch-msg-rete", __default_msg_return)) goto __fail; if(__insert_rpc_function(rtree, "ch-msg-repl", __default_msg_reply)) goto __fail; return 0; __fail: __destroy_rpc_list_tree(rtree); return ENOMEM; } static int __eval_cstr(char *cstr, cx_rpc_list_t *rpc_list, void *ctx) { int r = ENOENT; sexp_t *sx; usrtc_node_t *node; cx_rpc_t *rentry; char *rpcf; if(!(sx = parse_sexp(cstr, strlen(cstr)))) return EBADE; if(sx->ty == SEXP_LIST) rpcf = sx->list->val; else rpcf = sx->val; /* find an appropriate function */ //printf("rpcf = %s (sx = %p)\n", rpcf, sx); node = usrtc_lookup(rpc_list->rpc_tree, rpcf); if(!node) return ENOENT; else rentry = (cx_rpc_t *)usrtc_node_getdata(node); /* call it */ //printf("rentry->rpcf = %p\n", rentry->rpcf); r = rentry->rpcf(ctx, sx); return r; } static void *__cxslave_thread_listener(void *wctx) { conn_t *co = (conn_t *)wctx; char *buf = malloc(4096); int r; while((r = __conn_read(co, buf, 4096)) != -1) { buf[r] = '\0'; if(r) printf("Got the message %s (%d bytes)\n", buf, r); r = __eval_cstr(buf, conn_sys->system_rpc, co); } co->flags &= ~CXCONN_ESTABL; __wake_up_waiters(co, ESXNOCONNECT); free(buf); return NULL; } static void *__cxmaster_thread_listener(void *wctx) { conn_t *co = (conn_t *)wctx; char *buf = malloc(4096); int r; while((r = __conn_read(co, buf, 4096)) != -1) { buf[r] = '\0'; if(r) printf("Got the message %s (%d bytes)\n", buf, r); r = __eval_cstr(buf, conn_sys->system_rpc, co); } co->flags &= ~CXCONN_ESTABL; __wake_up_waiters(co, ESXNOCONNECT); free(buf); return NULL; } static void *__rmsg_queue_thread(void *ctx) { conn_t *co = (conn_t *)ctx; pth_msg_t *tmp = malloc(sizeof(pth_msg_t)); sxmsg_t *msg; chnl_t *ch; int r = 0; char *rpcf; sexp_t *sx; usrtc_node_t *node = NULL; cx_rpc_t *rpccall; struct __rpc_job *rjob = NULL; if(!tmp) return NULL; while(1) { r = pth_queue_get(co->rqueue, NULL, tmp); if(r) { free(tmp); return NULL; } msg = tmp->data; if(!msg) continue; /* spurious !! */ /* check to right job */ if(!(msg->flags & ESXMSG_USR)) { /* not a regular message */ msg->flags |= ESXMSG_NOWAY; /* mark it's as undeliverable */ msg->flags &= ~ESXMSG_PENDING; ; pthread_mutex_unlock(&(msg->wait)); /* wake up the waitee */ continue; } else { /* now we're need to have a deal with the rpc calling, other - we don't care */ ch = msg->pch; sx = (sexp_t *)msg->payload; /* get the function name */ if(sx->ty == SEXP_LIST) rpcf = sx->list->val; else rpcf = sx->val; //printf("Inbound queue RPC call = '%s'\n", rpcf); node = usrtc_lookup(ch->rpc_list->rpc_tree, rpcf); if(!node) { printf("RPC call illegal!\n"); r = ENOENT; msg_return(msg, r); } else { rpccall = (cx_rpc_t *)usrtc_node_getdata(node); /* call this ! */ rjob = malloc(sizeof(struct __rpc_job)); // TODO: check it rjob->msg = msg; rjob->sx = sx; rjob->rpcf = rpccall->rpcf; pth_dqtpoll_add(co->tpoll, (void *)rjob, USR_MSG); // TODO: check it //rpccall->rpcf((void *)msg, sx); } } } return NULL; } static void *__msg_queue_thread(void *ctx) { conn_t *co = (conn_t *)ctx; pth_msg_t *tmp = malloc(sizeof(pth_msg_t)); sxmsg_t *msg; chnl_t *ch; int r = 0, len; char *buf = malloc(4096), *tb; sexp_t *sx; if(!tmp || !buf) { if(tmp) free(tmp); if(buf) free(buf); return NULL; } ; while(1) { r = pth_queue_get(co->mqueue, NULL, tmp); if(r) { free(buf); free(tmp); return NULL; } ; /* message workout */ msg = tmp->data; if(!msg) continue; /* spurious message */ if(!(msg->flags & ESXMSG_USR)) { /* not a regular message */ msg->flags |= ESXMSG_NOWAY; /* mark it's as undeliverable */ msg->flags &= ~ESXMSG_PENDING; ; pthread_mutex_unlock(&(msg->wait)); /* wake up the waitee */ continue; } else { ch = msg->pch; /* now we need to complete the request */ sx = (sexp_t *)msg->payload; tb = buf; if(!(msg->flags & ESXMSG_PULSE)) { /* %s))) */ if(!(msg->flags & ESXMSG_ISREPLY)) snprintf(buf, 4096, "(ch-msg (:chid %lu (:msgid %lu ", ch->cid, msg->mid); else { if(!sx) { snprintf(buf, 4096, "(ch-msg-rete (:chid %lu (:msgid %lu (%d))))", ch->cid, msg->mid, msg->opcode); /* mark it to close */ msg->flags |= ESXMSG_CLOSURE; /* ok, here we will write it and wait, destroying dialog while reply */ ; goto __ssl_write; } else snprintf(buf, 4096, "(ch-msg-repl (:chid %lu (:msgid %lu ", ch->cid, msg->mid); } ; len = strlen(buf); tb += len*sizeof(char); if(print_sexp(tb, 4096 - (len + 4*sizeof(char)), sx) == -1) { msg->opcode = ENOMEM; /* we don't need to wake up anybody */ if(msg->flags & ESXMSG_TIMEDOUT) { /* clean up all the shit: * 1. remove from message tree * 2. destroy message itself */ destroy_sexp(msg->initial_sx); msg->initial_sx = NULL; msg->payload = NULL; destroy_sexp(msg->payload); } else { ; pthread_mutex_unlock(&(msg->wait)); } } } else { /* pulse messages */ /* here we're shouldn't process reply procedure */ snprintf(buf, 4096, "(ch-msg-pulse (:chid %lu (:msgid %lu ", ch->cid, msg->mid); len = strlen(buf); /* FIXME: code double shit ! */ tb += len*sizeof(char); if(print_sexp(tb, 4096 - (len + 4*sizeof(char)), sx) == -1) { msg->opcode = ENOMEM; /* we don't need to wake up anybody */ if(msg->flags & ESXMSG_TIMEDOUT) { /* clean up all the shit: * 1. remove from message tree * 2. destroy message itself */ destroy_sexp(msg->initial_sx); msg->initial_sx = NULL; msg->payload = NULL; __destroy_msg(msg); } else pthread_mutex_unlock(&(msg->wait)); } } len = strlen(tb); tb += len*sizeof(char); strcat(tb, ")))"); __ssl_write: /* write it */ if(__conn_write(co, (void *)buf, strlen(buf) + sizeof(char)) < 0) { co->flags &= ~CXCONN_ESTABL; __wake_up_waiters(co, ESXNOCONNECT); } if(msg->flags & ESXMSG_CLOSURE) { /* first remove the message from tree */ pthread_rwlock_wrlock(&(ch->msglock)); usrtc_delete(ch->msgs_tree, &(msg->pendingq_node)); pthread_rwlock_unlock(&(ch->msglock)); /* destroy */ destroy_sexp(msg->initial_sx); __destroy_msg(msg); } fprintf(stderr, "\t-->%s wrote %s\n", __FUNCTION__, buf); } len = 0; } free(buf); return NULL; } /* this function is an ugly implementation to get C string with uuid */ static char *__generate_uuid(void) { char *uuidc = NULL; uuid_t uuid; int len, i = 0; len = sizeof(char)*(sizeof(uuid_t)*2) + sizeof(char); if(!(uuidc = malloc(len))) return NULL; uuid_generate_time_safe(uuid); for(i = 0; i < sizeof(uuid_t); i++) snprintf(uuidc+(2*i*sizeof(char)), len, "%02x", uuid[i]); return uuidc; } /* this is a callback to perform a custom SSL certs chain validation, * as I promised here the comments, a lot of ... * The first shit: 0 means validation failed, 1 otherwise * The second shit: X509 API, I guess u will love it ;-) * openssl calls this function for each certificate in chain, * since our case is a simple (depth of chain is one, since we're * don't care for public certificates lists or I cannot find any reasons to * do it ...), amount of calls reduced, and in this case we're interested * only in top of chain i.e. actual certificate used on client side, * the validity of signing for other certificates within chain is * guaranteed by the ssl itself. * u know, we need to lookup in database, or elsewhere... some information * about client certificate, and decide - is it valid, or not?, if so * yep I mean it's valid, we can assign it's long fucking number to * security context, to use in ongoing full scaled connection handshaking. */ static int __verify_certcall(int preverify_ok, X509_STORE_CTX *ctx) { X509 *cert = X509_STORE_CTX_get_current_cert(ctx); int err = X509_STORE_CTX_get_error(ctx), depth = X509_STORE_CTX_get_error_depth(ctx); SSL *ssl = X509_STORE_CTX_get_ex_data(ctx, SSL_get_ex_data_X509_STORE_CTX_idx()); conn_t *co = SSL_get_ex_data(ssl, conn_sys->ex_ssldata_index); /* this is a custom data we're set before */ /* now we need to check for certificates with a long chain, * so since we have a short one, reject long ones */ if(depth > VERIFY_DEPTH) { /* longer than we expect */ preverify_ok = 0; /* yep, 0 means error for those function callback in openssl, fucking set */ err = X509_V_ERR_CERT_CHAIN_TOO_LONG; X509_STORE_CTX_set_error(ctx, err); } /* ok, now we're on top of SSL (depth == 0) certs chain, * and we can validate client certificate */ if(!depth) { /* TODO: check serial number and other stuff */ co->pctx = malloc(sizeof(perm_ctx_t)); co->pctx->certid = ASN1_INTEGER_get((const ASN1_INTEGER *)X509_get_serialNumber(ctx->current_cert)); printf("Certificate ID: %lu\n", co->pctx->certid); /* now we're need to check the ssl cert */ if(conn_sys->validate_sslpem) { if(conn_sys->validate_sslpem(co)) return 0; else return 1; } else return 0; } return preverify_ok; } /* subsystem: here u can told me about how it's ugly to use global pointers, * yep, it's a business of fucking morons, btw it works (heh, openssl uses this * ancient shit method too, many many and many others too, trust me ...). * subsystem required to define varios RPC lists, control list for connections, * general queues, certificates (all connections uses the same set of certificates * within application), general calls such as ... calls to get info about client * cert and ... many other things. */ void *__system_queue_listener(void *data) { int r; pth_msg_t *tmp = malloc(sizeof(pth_msg_t)); sxmsg_t *sysmsg; sxpayload_t *payload; chnl_t *chan; conn_t *co; if(!tmp) return NULL; while(1) { r = pth_queue_get(conn_sys->ioqueue, NULL, tmp); if(r) { free(tmp); return NULL; } /* ok message is delivered */ sysmsg = tmp->data; if(!sysmsg) continue; /* ignore dummy messages */ if(!(sysmsg->flags & ESXMSG_SYS)) { /* not a system message */ sysmsg->flags |= ESXMSG_NOWAY; /* mark it's as undeliverable */ sysmsg->flags &= ~ESXMSG_PENDING; pthread_mutex_unlock(&(sysmsg->wait)); /* wake up the waitee */ continue; } else { chan = sysmsg->pch; co = chan->connection; payload = (sxpayload_t *)sysmsg->payload; /* write the buf */ if(__conn_write(co, (void *)payload->cstr, strlen(payload->cstr) + 1) < 0) { co->flags &= ~CXCONN_ESTABL; __wake_up_waiters(co, ESXNOCONNECT); } } } return NULL; } /* general initialization must be called within app uses connection layer */ int connections_subsystem_init(void) { int r = 0; if(!(conn_sys = malloc(sizeof(conn_sys_t)))) return ENOMEM; else if(!(conn_sys->connections = malloc(sizeof(usrtc_t)))) { r = ENOMEM; goto __fail; } /* zeroing */ conn_sys->rootca = conn_sys->certkey = conn_sys->certpem = NULL; conn_sys->validate_sslpem = NULL; conn_sys->secure_check = NULL; /* init connections list */ usrtc_init(conn_sys->connections, USRTC_REDBLACK, MAX_CONNECTIONS, __cmp_cstr); if((r = pthread_rwlock_init(&(conn_sys->rwlock), NULL))) goto __fail_1; /* init queues */ if(!(conn_sys->ioq = malloc(sizeof(pth_queue_t)))) { /* general io queue */ r = ENOMEM; goto __fail_2; } if((r = pth_queue_init(conn_sys->ioq))) goto __fail_3; if(!(conn_sys->ioqueue = malloc(sizeof(pth_queue_t)))) { /* system io queue */ r = ENOMEM; goto __fail_2; } if((r = pth_queue_init(conn_sys->ioqueue))) goto __fail_3_1; /* init SSL certificates checking functions */ /* init RPC list related functions */ if(!(conn_sys->system_rpc = malloc(sizeof(cx_rpc_list_t)))) { r = ENOMEM; goto __fail_3; } else { if(!(conn_sys->system_rpc->rpc_tree = malloc(sizeof(usrtc_t)))) { r = ENOMEM; __fail_rpc: free(conn_sys->system_rpc); goto __fail_3_1; } usrtc_init(conn_sys->system_rpc->rpc_tree, USRTC_SPLAY, 256, __cmp_cstr); r = __init_systemrpc_tree(conn_sys->system_rpc->rpc_tree); if(r) { free(conn_sys->system_rpc->rpc_tree); goto __fail_rpc; } } /* init SSL library */ SSL_library_init(); OpenSSL_add_all_algorithms(); SSL_load_error_strings(); conn_sys->ex_ssldata_index = SSL_get_ex_new_index(0, "__ssldata index", NULL, NULL, NULL); /* create threads for queue */ if((r = pthread_create(&conn_sys->ios_thread, NULL, __system_queue_listener, NULL))) { goto __fail_rpc; } return 0; __fail_3_1: free(conn_sys->ioqueue); __fail_3: free(conn_sys->ioq); __fail_2: pthread_rwlock_destroy(&(conn_sys->rwlock)); __fail_1: free(conn_sys->connections); __fail: free(conn_sys); return r; } /* load certificates */ int connections_subsystem_setsslserts(const char *rootca, const char *certpem, const char *certkey) { int r = ENOMEM; if(!conn_sys) return EINVAL; /* simply copying */ if(!(conn_sys->rootca = strdup(rootca))) return ENOMEM; if(!(conn_sys->certkey = strdup(certkey))) goto __fail; if(!(conn_sys->certpem = strdup(certpem))) goto __fail; r = 0; return 0; __fail: if(conn_sys->rootca) free(conn_sys->rootca); if(conn_sys->certkey) free(conn_sys->certkey); if(conn_sys->certpem) free(conn_sys->certpem); return r; } int connections_subsystem_setrpclist_function(usrtc_t* (*get_rpc_typed_list_tree)(conn_t *)) { conn_sys->get_rpc_typed_list_tree = get_rpc_typed_list_tree; return 0; } #define __TMPBUFLEN 2048 /* connection_initiate: perform a connection thru the socket to the * host with master certificate, i.e. it's a slave one for client. */ int connection_initiate(conn_t *co, const char *host, int port, const char *SSL_cert, perm_ctx_t *pctx) { int r = 0, sd; int bytes = 0; char *uuid; char *buf = NULL; struct hostent *host_; struct sockaddr_in addr; usrtc_t *ch_tree, *rpc_tree; pth_queue_t *mqueue = malloc(sizeof(pth_queue_t)); if(!mqueue) return ENOMEM; pth_dqtpoll_t *tpoll = malloc(sizeof(pth_dqtpoll_t)); if(!tpoll) return ENOMEM; // TODO: fallback idx_allocator_t *idx_ch = malloc(sizeof(idx_allocator_t)); if(!idx_ch) return ENOMEM; if(!co) return EINVAL; if(!host) return EINVAL; if(!SSL_cert) return EINVAL; if(!pctx) return EINVAL; memset(co, 0, sizeof(conn_t)); pth_dqtpoll_init(tpoll, __rpc_callback); if(!idx_ch) return ENOMEM; else r = idx_allocator_init(idx_ch, MAX_CHANNELS*MAX_MULTI, 0); if(r) return r; if(!(uuid = __generate_uuid())) return ENOMEM; if(!(ch_tree = malloc(sizeof(usrtc_t)))) { r = ENOMEM; goto __fail; } if(!(rpc_tree = malloc(sizeof(usrtc_t)))) { r = ENOMEM; goto __fail_1; } if((r = pthread_mutex_init(&co->oplock, NULL))) goto __fail_2; if((r = pthread_rwlock_init(&co->chnl_lock, NULL))) goto __fail_3; usrtc_init(rpc_tree, USRTC_REDBLACK, MAX_RPC_LIST, __cmp_int); usrtc_init(ch_tree, USRTC_REDBLACK, MAX_CHANNELS, __cmp_ulong); co->idx_ch = idx_ch; /* assign message queue */ if((r = pth_queue_init(mqueue))) goto __fail_3; co->mqueue = mqueue; /* init SSL certificates and context */ co->ctx = SSL_CTX_new(SSLv3_client_method()); if(!co->ctx) { ERR_print_errors_fp(stderr); r = EINVAL; goto __fail_3; } else SSL_CTX_set_verify_depth(co->ctx, 1); /* FIXME: use configuration */ /* load certificates */ SSL_CTX_load_verify_locations(co->ctx, conn_sys->rootca, NULL); /* set the local certificate from CertFile */ if(SSL_CTX_use_certificate_file(co->ctx, SSL_cert, SSL_FILETYPE_PEM)<=0) { r = EINVAL; printf("%s:%d\n", __FUNCTION__, __LINE__); goto __fail_3; } /* set the private key from KeyFile (may be the same as CertFile) */ if(SSL_CTX_use_PrivateKey_file(co->ctx, SSL_cert, SSL_FILETYPE_PEM)<=0) { r = EINVAL; printf("%s:%d\n", __FUNCTION__, __LINE__); goto __fail_3; } /* verify private key */ if (!SSL_CTX_check_private_key(co->ctx)) { r = EINVAL; goto __fail_3; } /* assign allocated memory */ co->rpc_list = rpc_tree; co->chnl_tree = ch_tree; co->uuid = uuid; /* connect to the pointed server */ /* resolve host */ if(!(buf = malloc(__TMPBUFLEN))) { r = ENOMEM; goto __fail_3; } if(__resolvehost(host, buf, __TMPBUFLEN, &host_) != NETDB_SUCCESS) { r = ENOENT; free(buf); goto __fail_3; } /* create a socket */ sd = socket(PF_INET, SOCK_STREAM, 0); bzero(&addr, sizeof(addr)); /* try to connect it */ addr.sin_family = AF_INET; addr.sin_port = htons(port); //printf("addr.sin_addr.s_addr = %p, host_ = %p\n", &addr.sin_addr.s_addr, host_); addr.sin_addr.s_addr = *(uint32_t*)(host_->h_addr); free(host_); if (connect(sd, (struct sockaddr*)&addr, sizeof(addr)) != 0) { close(sd); free(buf); r = ENOENT; /* couldn't connect to the desired host */ goto __fail_3; } /* now we will create an SSL connection */ co->ssl = SSL_new(co->ctx); SSL_set_fd(co->ssl, sd); /* attach connected socket */ if(SSL_connect(co->ssl) == -1) { r = EBADE; free(buf); /* shutdown connection */ goto __fail_3; } /* if success we're ready to use established SSL channel */ /* auth and RPC contexts sync */ co->pctx = pctx; snprintf(buf, __TMPBUFLEN, "(auth-set-context ((:user \"%s\")(:passwd \"%s\")))", pctx->login, pctx->passwd); /* send an auth request */ SSL_write(co->ssl, buf, strlen(buf) + sizeof(char)); /* read the message reply */ bytes = __conn_read(co, buf, __TMPBUFLEN); if(bytes == -1) { // we've lost the connection co->flags &= ~CXCONN_ESTABL; r = ESXNOCONNECT; __wake_up_waiters(co, ESXNOCONNECT); free(buf); /* shutdown connection */ goto __fail_3; } buf[bytes] = 0; /* perform an rpc call */ r = __eval_cstr(buf, conn_sys->system_rpc, (void *)co); if(!r) { /* all is fine security context is good */ snprintf(buf, __TMPBUFLEN, "(ch-get-types)"); /* now we should receive possible channel types */ SSL_write(co->ssl, buf, strlen(buf) + sizeof(char)); /* read the message reply */ bytes = __conn_read(co, buf, __TMPBUFLEN); if(bytes == -1) { // we've lost the connection co->flags &= ~CXCONN_ESTABL; r = ESXNOCONNECT; __wake_up_waiters(co, ESXNOCONNECT); free(buf); /* shutdown connection */ goto __fail_3; } buf[bytes] = 0; /* perform an rpc call */ r = __eval_cstr(buf, conn_sys->system_rpc, (void *)co); } free(buf); /* now we can free the temporary buffer */ /* a listening thread creation (incoming messages) */ printf("%s:%d r = %d\n", __FUNCTION__, __LINE__, r); if(!r) { /* success let's start a listening thread */ r = pthread_create(&co->cthread, NULL, __cxslave_thread_listener, (void *)co); if(!r) { /* add connection to the list */ usrtc_node_init(&co->csnode, co); co->flags = (CXCONN_SLAVE | CXCONN_ESTABL); /* set the right flags */ pthread_rwlock_wrlock(&conn_sys->rwlock); usrtc_insert(conn_sys->connections, &co->csnode, (void *)co->uuid); pthread_rwlock_unlock(&conn_sys->rwlock); //return 0; /* FIXME: */ } r = pthread_create(&co->msgthread, NULL, __msg_queue_thread, (void *)co); if(r) goto __fail_3; pth_dqtpoll_run(tpoll); co->tpoll = tpoll; return 0; } __fail_3: pthread_mutex_destroy(&co->oplock); __fail_2: free(rpc_tree); __fail_1: free(ch_tree); __fail: free(uuid); return r; } int connection_create(conn_t *co, int sck) { int r = 0, sd; int bytes = 0; char *uuid; char *buf = NULL; usrtc_t *ch_tree, *rpc_tree; pth_queue_t *rqueue = malloc(sizeof(pth_queue_t)); pth_queue_t *mqueue = malloc(sizeof(pth_queue_t)); pth_dqtpoll_t *tpoll = malloc(sizeof(pth_dqtpoll_t)); if(!tpoll) return ENOMEM; // TODO: fallback idx_allocator_t *idx_ch = malloc(sizeof(idx_allocator_t)); if(!co) return EINVAL; else memset(co, 0, sizeof(co)); pth_dqtpoll_init(tpoll, __rpc_callback); // TODO: check it if(!idx_ch) return ENOMEM; else r = idx_allocator_init(idx_ch, MAX_CHANNELS*MAX_MULTI, 0); if(r) return r; if(!(uuid = __generate_uuid())) return ENOMEM; if(!(ch_tree = malloc(sizeof(usrtc_t)))) { r = ENOMEM; goto __fail; } if(!(rpc_tree = malloc(sizeof(usrtc_t)))) { r = ENOMEM; goto __fail_1; } if((r = pthread_mutex_init(&co->oplock, NULL))) goto __fail_2; if((r = pthread_rwlock_init(&co->chnl_lock, NULL))) goto __fail_3; usrtc_init(rpc_tree, USRTC_REDBLACK, MAX_RPC_LIST, __cmp_int); usrtc_init(ch_tree, USRTC_REDBLACK, MAX_CHANNELS, __cmp_ulong); co->idx_ch = idx_ch; /* assign message queue */ r = pth_queue_init(rqueue); if(r) goto __fail_3; co->rqueue = rqueue; /* assigned outbone message queue master also has this one */ r = pth_queue_init(mqueue); if(r) goto __fail_3; co->mqueue = mqueue; /* init SSL certificates and context */ co->ctx = SSL_CTX_new(SSLv3_server_method()); if(!co->ctx) { r = EINVAL; printf("%s:%d\n", __FUNCTION__, __LINE__);goto __fail_3; } else { /* set verify context */ SSL_CTX_set_verify(co->ctx, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, __verify_certcall); /* set verify depth */ SSL_CTX_set_verify_depth(co->ctx, VERIFY_DEPTH); } /* load certificates */ SSL_CTX_load_verify_locations(co->ctx, conn_sys->rootca, NULL); /* set the local certificate from CertFile */ if(SSL_CTX_use_certificate_file(co->ctx, conn_sys->certpem, SSL_FILETYPE_PEM)<=0) { printf("certpem1 = %s\n", conn_sys->certpem); ERR_print_errors_fp(stderr); r = EINVAL; printf("%s:%d\n", __FUNCTION__, __LINE__); goto __fail_3; } /* set the private key from KeyFile (may be the same as CertFile) */ if(SSL_CTX_use_PrivateKey_file(co->ctx, conn_sys->certkey, SSL_FILETYPE_PEM)<=0) { r = EINVAL; printf("%s:%d\n", __FUNCTION__, __LINE__); goto __fail_3; } /* verify private key */ if (!SSL_CTX_check_private_key(co->ctx)) { r = EINVAL; printf("%s:%d\n", __FUNCTION__, __LINE__); goto __fail_3; } /* assign allocated memory */ co->rpc_list = rpc_tree; co->chnl_tree = ch_tree; co->uuid = uuid; if(!(buf = malloc(__TMPBUFLEN))) { r = ENOMEM; goto __fail_3; } /* now we will create an SSL connection */ co->ssl = SSL_new(co->ctx); SSL_set_fd(co->ssl, sck); /* attach connected socket */ /* set the context to verify ssl connection */ SSL_set_ex_data(co->ssl, conn_sys->ex_ssldata_index, (void *)co); if(SSL_accept(co->ssl) == -1) { r = EBADE; free(buf); /* shutdown connection */ goto __fail_3; } /* if success we're ready to use established SSL channel */ printf("%s:%d\n", __FUNCTION__, __LINE__); BIO_set_nbio(SSL_get_rbio(co->ssl), 1); /*******************************************/ /*-=Protocol part of connection establish=-*/ /*******************************************/ while(!(co->flags & CXCONN_ESTABL)) { /* read the initiation stage connections */ bytes = __conn_read(co, buf, __TMPBUFLEN); if(bytes > 0) { buf[bytes] = 0; r = __eval_cstr(buf, conn_sys->system_rpc, (void *)co); printf("%s return %d (bytes %d)\n", buf, r, bytes); if(r) goto __fail_3; } else { printf("bytes = %d\n", bytes); if(bytes < 0) { printf("Terminate SSL connection, the other end is lost.\n"); co->flags &= ~CXCONN_ESTABL; __wake_up_waiters(co, ESXNOCONNECT); r = ESXNOCONNECT; goto __fail_3; } } } /* before it will be done assign rpc list */ if(conn_sys->get_rpc_typed_list_tree) co->rpc_list = conn_sys->get_rpc_typed_list_tree(co); free(buf); r = pthread_create(&co->cthread, NULL, __cxmaster_thread_listener, (void *)co); if(!r) { /* add connection to the list */ usrtc_node_init(&co->csnode, co); co->flags |= CXCONN_MASTER; /* set the right flags */ pthread_rwlock_wrlock(&conn_sys->rwlock); usrtc_insert(conn_sys->connections, &co->csnode, (void *)co->uuid); pthread_rwlock_unlock(&conn_sys->rwlock); /* threads poll --- */ r = pthread_create(&co->msgthread, NULL, __msg_queue_thread, (void *)co); if(r) goto __fail_3; r = pthread_create(&co->rmsgthread, NULL, __rmsg_queue_thread, (void *)co); if(r) goto __fail_3; pth_dqtpoll_run(tpoll); co->tpoll = tpoll; } return r; __fail_3: pthread_mutex_destroy(&co->oplock); __fail_2: free(rpc_tree); __fail_1: free(ch_tree); __fail: free(uuid); return r; } int connection_close(conn_t *co) /* TODO: */ { return 0; } int connection_reinit(conn_t *co) /* TODO: */ { return 0; } static sxmsg_t *__allocate_msg(int *res) { sxmsg_t *msg = malloc(sizeof(sxmsg_t)); int r = 0; if(!msg) { *res = ENOMEM; return NULL; } else { memset(msg, 0, sizeof(sxmsg_t)); if((r = pthread_mutex_init(&(msg->wait), NULL))) { free(msg); *res = r; return NULL; } usrtc_node_init(&(msg->chnl_node), msg); usrtc_node_init(&(msg->poll_node), msg); usrtc_node_init(&(msg->pendingq_node), msg); } *res = 0; return msg; } static void __destroy_msg(sxmsg_t *msg) { chnl_t *ch = msg->pch; if(msg->flags & ESXMSG_USR) { pthread_mutex_lock(&(ch->oplock)); idx_free(ch->idx_msg, msg->mid); pthread_mutex_unlock(&(ch->oplock)); } else if(msg->flags & ESXMSG_SYS) { //if(msg->uuid) free(msg->uuid); } pthread_mutex_unlock(&(msg->wait)); pthread_mutex_destroy(&(msg->wait)); free(msg); return; } static int __create_reg_msg(sxmsg_t **msg, chnl_t *ch) { int r = 0; sxmsg_t *sm = __allocate_msg(&r); if(r) return r; else { sm->pch = ch; sm->flags = (ESXMSG_USR | ESXMSG_PENDING); /* ok allocate message ID */ pthread_mutex_lock(&(ch->oplock)); sm->mid = idx_allocate(ch->idx_msg); pthread_mutex_unlock(&(ch->oplock)); pthread_mutex_lock(&(sm->wait)); *msg = sm; } return 0; } static int __create_sys_msg(sxmsg_t **msg, char *uuid, chnl_t *ch, sxpayload_t *data) { int r = 0; sxmsg_t *m = __allocate_msg(&r); if(r) return r; else { /* fill values */ m->pch = ch; m->uuid = uuid; m->payload = data; /* set the right flags */ m->flags = (ESXMSG_SYS | ESXMSG_PENDING); /* we need to lock the wait mutex */ pthread_mutex_lock(&(m->wait)); *msg = m; } return 0; } /* channels */ int channel_open(conn_t *co, chnl_t **ch, int type) { chnl_t *nch = NULL; int r = 0; char *uuid_; sxpayload_t *pl; ulong_t cid; rpc_typed_list_t *rpclist; usrtc_node_t *node; sxmsg_t *sms; if(!(co->flags & CXCONN_ESTABL)) { return ESXNOCONNECT; } uuid_ = __generate_uuid(); pl = malloc(sizeof(sxpayload_t)); node = usrtc_lookup(co->rpc_list, &type); /* if(!node) { r = EINVAL; printf("fuck\n"); goto __fini_op; } else rpclist = (rpc_typed_list_t *)usrtc_node_getdata(node); */ if(!uuid_) { if(pl) free(pl); return ENOMEM; } if(!pl) { __ffail: if(uuid_) free(uuid_); return ENOMEM; } else { pl->sx = NULL; if(!(pl->cstr = malloc(sizeof(char)*ESX_SYSMSG_SIZE))) { free(pl); goto __ffail; } else memset(pl->cstr, 0, sizeof(char)*ESX_SYSMSG_SIZE); } pthread_rwlock_wrlock(&(co->chnl_lock)); cid = idx_allocate(co->idx_ch); pthread_rwlock_unlock(&(co->chnl_lock)); if(cid == IDX_INVAL) { r = ENOMEM; goto __fini_op; } if((r = __alloc_channel(cid, co, rpclist, &nch))) { goto __fini_op; } else nch->flags |= ESXCHAN_PENDING; nch->uuid = uuid_; /* ok now we're ready to create a message and push channel to the list */ if((r = __create_sys_msg(&sms, uuid_, nch, pl))) { __fail_chan: /* destroy the channel*/ goto __fini_op; } else { /* put the channel to the channels search tree */ pthread_rwlock_wrlock(&(co->chnl_lock)); //printf("inserting cid = %d\n", nch->cid); usrtc_insert(co->chnl_tree, &nch->node, &nch->cid); pthread_rwlock_unlock(&(co->chnl_lock)); /* put system message to the run queue */ /* first form the message */ snprintf(pl->cstr, sizeof(char)*ESX_SYSMSG_SIZE, "(ch-open ((:id %ld)(:uuid %s)(:type %d)))", nch->cid, nch->uuid, type); nch->sysmsg = sms; /* assign system message to the channel */ /* put it */ if((r = pth_queue_add(conn_sys->ioqueue, (void *)sms, SYS_MSG))) { __fail_chan_r: /* remove it from the search tree */ pthread_rwlock_wrlock(&(co->chnl_lock)); usrtc_delete(co->chnl_tree, &nch->node); pthread_rwlock_unlock(&(co->chnl_lock)); goto __fail_chan; } if(!(sms->flags & ESXMSG_PENDING)) { /* was processed too fast */ goto __process_smsg; } else pthread_mutex_lock(&(sms->wait)); /* will sleep until got a reply */ __process_smsg: if(sms->opcode) { r = sms->opcode; goto __fail_chan_r; } else r = 0; nch->flags &= ~ESXCHAN_PENDING; /* mark it as established */ free(pl->cstr); free(pl); __destroy_msg(nch->sysmsg); } __fini_op: if(r) { if(uuid_) free(uuid_); if(pl) { if(pl->cstr) free(pl->cstr); free(pl); } pthread_rwlock_wrlock(&(co->chnl_lock)); idx_free(co->idx_ch, nch->cid); pthread_rwlock_unlock(&(co->chnl_lock)); idx_allocator_destroy(nch->idx_msg); free(nch->idx_msg); free(nch->msgs_tree); pthread_mutex_destroy(&(nch->oplock)); pthread_rwlock_destroy(&(nch->msglock)); free(nch); } else *ch = nch; return r; } int channel_close(chnl_t *chnl) { char *uuid_; usrtc_node_t *node = NULL; int r; conn_t *co = chnl->connection; sxmsg_t *sms; sxpayload_t *pl; if(!(co->flags & CXCONN_ESTABL)) { return ESXNOCONNECT; } uuid_ = __generate_uuid(); pthread_rwlock_rdlock(&(co->chnl_lock)); node = usrtc_lookup(co->chnl_tree, &chnl->cid); pthread_rwlock_unlock(&(co->chnl_lock)); if(!node) { fprintf(stderr, "No such channel\n"); return ENOENT; } pthread_rwlock_wrlock(&(chnl->msglock)); /* check unprocessed messages */ if(!usrtc_isempty(chnl->msgs_tree)) { pthread_rwlock_unlock(&(chnl->msglock)); fprintf(stderr, "Unable to close channel\n"); return EBUSY; } pl = malloc(sizeof(sxpayload_t)); if(!pl) return ENOMEM; if(__create_sys_msg(&sms, uuid_, chnl, pl)) { if(chnl->idx_msg) free(chnl->idx_msg); if(chnl->msgs_tree) free(chnl->msgs_tree); free(chnl); return ENOMEM; } pl->sx = NULL; if(!(pl->cstr = malloc(sizeof(char) * ESX_SYSMSG_SIZE))) { pthread_rwlock_unlock(&(chnl->msglock)); free(pl); return ENOMEM; } memset(pl->cstr, 0, sizeof(char) * ESX_SYSMSG_SIZE); /* put system message to the run queue */ /* first form the message */ snprintf(pl->cstr, sizeof(char) * ESX_SYSMSG_SIZE, "(ch-close (:id %ld))", chnl->cid); chnl->sysmsg = sms; /* assign system message to the channel */ /* put it */ if((r = pth_queue_add(conn_sys->ioqueue, (void *)sms, SYS_MSG))) { pthread_rwlock_unlock(&(chnl->msglock)); return r; } if(!(sms->flags & ESXMSG_PENDING)) { /* was processed too fast */ goto __process_smsg; } else pthread_mutex_lock(&(sms->wait)); /* will sleep until got a reply */ __process_smsg: if(sms->opcode) { r = sms->opcode; return r; } else r = 0; chnl->flags &= ~ESXCHAN_PENDING; /* mark it as established */ /* remove channel from the search tree */ pthread_rwlock_wrlock(&(chnl->connection->chnl_lock)); usrtc_delete(chnl->connection->chnl_tree, &chnl->node); /* free index */ idx_free(co->idx_ch, chnl->cid); pthread_rwlock_unlock(&(chnl->connection->chnl_lock)); pthread_rwlock_unlock(&(chnl->msglock)); __destroy_msg(chnl->sysmsg); free(uuid_); free(pl->cstr); free(pl); free(chnl->uuid); idx_allocator_destroy(chnl->idx_msg); free(chnl->idx_msg); free(chnl->msgs_tree); pthread_mutex_destroy(&(chnl->oplock)); pthread_rwlock_destroy(&(chnl->msglock)); free(chnl); return 0; } /* message passing */ /* * How message sending works: * 1. Create a message structure assigned to the channel, * 2. Put S-expression context to it * 3. Put the message to the queue * 4. expect the result waiting on the lock mutex */ static int __message_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec *tio) { int r = 0; sxmsg_t *m = NULL; conn_t *co = ch->connection; if(!(co->flags & CXCONN_ESTABL)) { destroy_sexp(sx); return ESXNOCONNECT; } *msg = NULL; r = __create_reg_msg(&m, ch); if(r) return r; else { /* put the message to the search tree */ pthread_rwlock_wrlock(&(ch->msglock)); usrtc_insert(ch->msgs_tree, &(m->pendingq_node), &(m->mid)); pthread_rwlock_unlock(&(ch->msglock)); /* message assign */ m->opcode = 0; m->payload = (void *)sx; /* assign initial sx */ m->initial_sx = sx; /* put the message to the run queue */ r = pth_queue_add(co->mqueue, (void *)m, USR_MSG); if(r) return r; /* FIXME: better give up */ if(m->flags & ESXMSG_PENDING) { if(!tio) pthread_mutex_lock(&(m->wait)); else pthread_mutex_timedlock(&(m->wait), tio); } if(tio && (m->flags & ESXMSG_PENDING)) return SXOTIMEDOUT; if(!m->payload) { r = m->opcode; /* first remove the message from tree */ pthread_rwlock_wrlock(&(ch->msglock)); usrtc_delete(ch->msgs_tree, &(m->pendingq_node)); pthread_rwlock_unlock(&(ch->msglock)); /* destroy s expression */ destroy_sexp(m->initial_sx); /* destroy */ __destroy_msg(m); } else { *msg = m; r = SXOREPLYREQ; } } return r; } int msg_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg) { return __message_send(ch, sx, msg, NULL); } int msg_send_timed(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec *tio) { return __message_send(ch, sx, msg, tio); } static int __msg_reply(sxmsg_t *msg, sexp_t *sx, struct timespec *tio, int opcode) { int r = 0; chnl_t *ch = msg->pch; conn_t *co = ch->connection; if(!(co->flags & CXCONN_ESTABL)) { destroy_sexp(sx); return ESXNOCONNECT; } msg->payload = sx; msg->opcode = opcode; msg->flags |= ESXMSG_PENDING; /* pending */ msg->flags |= ESXMSG_ISREPLY; /* this is a reply */ if(!sx) msg->flags &= ~ESXMSG_PENDING; /* put the message to the queue */ r = pth_queue_add(co->mqueue, (void *)msg, USR_MSG); if(r) return r; /* FIXME: better give up */ if(!sx) return 0; if(msg->flags & ESXMSG_PENDING) { if(!tio) pthread_mutex_lock(&(msg->wait)); else pthread_mutex_timedlock(&(msg->wait), tio); } if(tio && (msg->flags & ESXMSG_PENDING)) return SXOTIMEDOUT; r = msg->opcode; #if 0 if(msg->flags & ESXMSG_CLOSURE) { /* first remove the message from tree */ pthread_rwlock_wrlock(&(ch->msglock)); usrtc_delete(ch->msgs_tree, &(msg->pendingq_node)); pthread_rwlock_unlock(&(ch->msglock)); /* destroy */ destroy_sexp(msg->initial_sx); __destroy_msg(msg); } #endif return r; } int msg_return(sxmsg_t *msg, int opcode) { return __msg_reply(msg, NULL, NULL, opcode); } int msg_reply(sxmsg_t *msg, sexp_t *sx) { return __msg_reply(msg, sx, NULL, 0); } int msg_reply_timed(sxmsg_t *msg, sexp_t *sx, struct timespec *tio) { return __msg_reply(msg, sx, tio, 0); } //TODO: continue. Implement wait for delivery and queue addition /* * How message sending works: * 1. Create a message structure assigned to the channel, * 2. Put S-expression context to it * 3. Put the message to the queue * 4. Wait for job execution */ static int __message_send_pulse(chnl_t *ch, sexp_t *sx, struct timespec *tio) { int r = 0; sxmsg_t *m = NULL; conn_t *co = ch->connection; if(!(co->flags & CXCONN_ESTABL)) { destroy_sexp(sx); return ESXNOCONNECT; } r = __create_reg_msg(&m, ch); if(r) return r; else { /* put the message to the search tree */ pthread_rwlock_wrlock(&(ch->msglock)); usrtc_insert(ch->msgs_tree, &(m->pendingq_node), &(m->mid)); pthread_rwlock_unlock(&(ch->msglock)); /* message assign */ m->opcode = 0; m->payload = (void *)sx; /* assign initial sx */ m->initial_sx = sx; m->flags |= ESXMSG_PULSE; /* put the message to the run queue */ r = pth_queue_add(co->mqueue, (void *)m, USR_MSG); if(r) return r; /* FIXME: better give up */ if(m->flags & ESXMSG_PENDING) { if(!tio) pthread_mutex_lock(&(m->wait)); else pthread_mutex_timedlock(&(m->wait), tio); } if(tio && (m->flags & ESXMSG_PENDING)) return SXOTIMEDOUT; if(!m->payload) { r = m->opcode; /* first remove the message from tree */ pthread_rwlock_wrlock(&(ch->msglock)); usrtc_delete(ch->msgs_tree, &(m->pendingq_node)); pthread_rwlock_unlock(&(ch->msglock)); /* destroy s expression */ destroy_sexp(m->initial_sx); /* destroy */ __destroy_msg(m); } else { r = SXOREPLYREQ; } } return r; } int msg_send_pulse(chnl_t *ch, sexp_t *sx) { return __message_send_pulse(ch, sx, NULL); } int msg_send_pulse_timed(chnl_t *ch, sexp_t *sx, struct timespec *tio) { return 0; } int msg_send_pulse_nowait(chnl_t *ch, sexp_t *sx) { return 0; } /* sexp helpers */ int sexp_list_car(sexp_t *expr, sexp_t **sx) { if (!SEXP_IS_LIST(expr) || expr->list->ty != SEXP_VALUE) return 1; *sx = expr->list; return 0; } int sexp_list_cdr(sexp_t *expr, sexp_t **sx) { /* Dummy function. Can we do cdr properly? */ if (!SEXP_IS_LIST(expr) || expr->list->ty != SEXP_VALUE) return 1; if (!expr->list->next) *sx = NULL; else *sx = expr->list->next; return 0; }