/* * Secure Network Transport Layer Library implementation. * This is a proprietary software. See COPYING for further details. * * (c) 2013-2014 Copyright Askele, inc. * (c) 2013-2014 Copyright Askele Ingria, inc. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include struct __rpc_job { sxmsg_t *msg; sexp_t *sx; int (*rpcf) (void *, sexp_t *); }; conn_sys_t *conn_sys = NULL; static long __cmp_ulong(const void *a, const void *b); /* message alloc and destroy */ extern sxmsg_t *__allocate_msg(int *res); extern void __destroy_msg(sxmsg_t *msg); /* examination */ static inline int __exam_connection(conn_t *co) { int r = 0; pthread_mutex_lock(&co->oplock); if(co->flags | CXCONN_BROKEN) { /* wake up all */ /* destroy thread poll */ /* free all memory and sync primitives */ r = 1; } pthread_mutex_unlock(&co->oplock); return r; } static int __rpc_callback(void *data) { struct __rpc_job *job = (struct __rpc_job *)data; int rc = 0; rc = job->rpcf((void *)(job->msg), job->sx); free(job); return rc; } extern int __alloc_channel(ulong_t cid, conn_t *co, rpc_typed_list_t *rlist, chnl_t **channel); static int __conn_read(conn_t *co, void *buf, size_t buf_len) { int rfd = SSL_get_fd(co->ssl), r; fd_set readset; fprintf(stderr, "\tListening ... on %s\n", co->uuid); /* get prepare to select */ FD_ZERO(&readset); FD_SET(rfd, &readset); /* waits until something will be ready to read */ r = select(FD_SETSIZE, &readset, NULL, NULL, NULL); if(r < 0) { printf("select (%d)\n", errno); return -1; } if(!r) { printf("Nothing to wait for\n"); return 0; } if(r && FD_ISSET(rfd, &readset)) { do { //pthread_mutex_lock(&(co->oplock)); /* ok, now we're ready to perform SSL_read */ r = SSL_read(co->ssl, buf, (int)buf_len); switch(SSL_get_error(co->ssl, r)) { case SSL_ERROR_NONE: //printf("Read done (f:%d)\n", rfd); /* this is means we're get ridden it all */ return r; break; case SSL_ERROR_ZERO_RETURN: printf("No data to read\n"); /* no data to read ... */ return 0; break; case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_WRITE: printf("Bypass until SSL buffer not ready.\n"); return 0; default: /* seems the connection lost */ fprintf(stderr, "(RD)Unknown error on %s\n", co->uuid); return -1; } //pthread_mutex_unlock(&(co->oplock)); } while(SSL_pending(co->ssl)); } return 0; } static int __conn_write(conn_t *co, void *buf, size_t buf_len) { int r; pthread_mutex_lock(&(co->oplock)); __retry: r = SSL_write(co->ssl, buf, (int)buf_len); switch(SSL_get_error(co->ssl, r)) { case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_WRITE: goto __retry; break; default: pthread_mutex_unlock(&(co->oplock)); if(r < 0) { fprintf(stderr, "(WR)Unknown error on %s (%d)\n", co->uuid, r); return -1; } else return 0; } pthread_mutex_unlock(&(co->oplock)); return 0; } static long __cmp_cstr(const void *a, const void *b) { return strcmp((char *)a, (char *)b); } static long __cmp_int(const void *a, const void *b) { return *(int *)a - *(int *)b; } static long __cmp_ulong(const void *a, const void *b) { return *(ulong_t *)a - *(ulong_t *)b; } static int __resolvehost(const char *hostname, char *buf, int buf_len, struct hostent **rhp) { struct hostent *hostbuf = malloc(sizeof(struct hostent)); struct hostent *hp = *rhp = NULL; int herr = 0, hres = 0; if(!hostbuf) return NO_ADDRESS; hres = gethostbyname_r(hostname, hostbuf, buf, buf_len, &hp, &herr); if (!hp) return NO_ADDRESS; if(hres) return NO_ADDRESS; *rhp = hp; return NETDB_SUCCESS; } static void __destroy_rpc_list_tree(usrtc_t *tree) { usrtc_node_t *node; cx_rpc_t *ent; for(node = usrtc_first(tree); node != NULL; node = usrtc_first(tree)) { ent = (cx_rpc_t *)usrtc_node_getdata(node); usrtc_delete(tree, node); free(ent->name); free(ent); } return; } static int __insert_rpc_function(usrtc_t *tree, const char *name, int (*rpcf)(void *, sexp_t *)) { cx_rpc_t *ent = malloc(sizeof(cx_rpc_t)); usrtc_node_t *node; if(!ent) return ENOMEM; else node = &ent->node; if(!(ent->name = strdup(name))) { free(ent); return ENOMEM; } else ent->rpcf = rpcf; usrtc_node_init(node, ent); usrtc_insert(tree, node, ent->name); return 0; } /* wake up all waiters on messages with given opcode */ static void __wake_up_waiters(conn_t *co, int opcode) { usrtc_node_t *node = NULL, *last_node = NULL; usrtc_node_t *msg_node = NULL, *last_msg_node = NULL; chnl_t *ch; sxmsg_t *smsg = NULL; pthread_rwlock_wrlock(&(co->chnl_lock)); node = usrtc_first(co->chnl_tree); last_node = usrtc_last(co->chnl_tree); /* going through channels tree */ while(!usrtc_isempty(co->chnl_tree)) { ch = (chnl_t *)usrtc_node_getdata(node); pthread_rwlock_rdlock(&(ch->msglock)); msg_node = usrtc_first(ch->msgs_tree); last_msg_node = usrtc_last(ch->msgs_tree); while(!usrtc_isempty(ch->msgs_tree)) { /* messages bypassing */ smsg = (sxmsg_t *)usrtc_node_getdata(msg_node); smsg->opcode = opcode; /* wake up waiting thread */ pthread_mutex_unlock(&(smsg->wait)); if(msg_node == last_msg_node) break; msg_node = usrtc_next(ch->msgs_tree, msg_node); } pthread_rwlock_unlock(&(ch->msglock)); if(node == last_node) break; node = usrtc_next(co->chnl_tree, node); } pthread_rwlock_unlock(&(co->chnl_lock)); return; } /* (!) NOTE: this call use only after all threads are dead ! */ static void __destroy_all_channels(conn_t *co) { usrtc_node_t *node = NULL; chnl_t *ch; for(node = usrtc_first(co->chnl_tree); node != NULL; node = usrtc_first(co->chnl_tree)) { ch = (chnl_t *)usrtc_node_getdata(node); /* free allocated resources */ if(ch->uuid) free(ch->uuid); idx_allocator_destroy(ch->idx_msg); /* allocator */ free(ch->idx_msg); free(ch->msgs_tree); /* locks */ pthread_mutex_destroy(&(ch->oplock)); pthread_rwlock_destroy(&(ch->msglock)); /* remove it */ usrtc_delete(co->chnl_tree, node); /* free */ free(ch); } return; } static int __default_auth_set_context(void *cctx, sexp_t *sx) { conn_t *co = (conn_t *)cctx; char *val, *var, *tbuf = NULL; sexp_t *lsx, *sx_iter, *sx_in; int llen, idx, err = 0; //co->pctx = malloc(sizeof(perm_ctx_t)); /* skip keyword itself */ lsx = sx->list->next; /* now we expect a list of lists */ if(lsx->ty != SEXP_LIST) { err = ESXRCBADPROT; goto __reply; } /* take length of the list */ llen = sexp_list_length(lsx); if(!llen) return 0; /* other side will not set any security attributes */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { sexp_list_car(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { err = ESXRCBADPROT; goto __reply; } else val = sx_in->val; if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */ sexp_list_cdr(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_DQUOTE)) { err = ESXRCBADPROT; goto __reply; } else var = sx_in->val; /* ok, now we need to analyze parameters */ if(!strcmp(val, ":user") && var) { co->pctx->login = strdup(var); } else if(!strcmp(val, ":passwd") && var) { co->pctx->passwd = strdup(var); } else { /* just ignore in default implementation */ } } else continue; /* ignore */ } /* ok, now we need to fill security context */ tbuf = malloc(2048); if(!tbuf) { err = ENOMEM; goto __reply; } if(conn_sys->secure_check) err = conn_sys->secure_check(co); __reply: if(err) { snprintf(tbuf, 2048, "(auth-set-error (%d))", err); } else { snprintf(tbuf, 2048, "(auth-set-attr (:attr %d)(:uid %ld)(:gid %ld))", co->pctx->p_attr, co->pctx->uid, co->pctx->gid); } /* we will send it */ if(__conn_write(co, tbuf, strlen(tbuf)) < 0) { co->flags &= ~CXCONN_ESTABL; co->flags |= CXCONN_BROKEN; __wake_up_waiters(co, ESXNOCONNECT); } destroy_sexp(sx); free(tbuf); return err; } static int __default_auth_set_attr(void *cctx, sexp_t *sx) { conn_t *co = (conn_t *)cctx; char *val, *var; sexp_t *lsx, *sx_iter, *sx_in; int llen, idx, r = 0; /* skip keyword itself */ lsx = sx->list->next; /* now we expect a list of lists */ if(lsx->ty != SEXP_LIST) { // printf("%s:%d\n", __FUNCTION__, __LINE__); r = ESXRCBADPROT; goto __finish; } /* take length of the list */ llen = sexp_list_length(lsx); if(!llen) return 0; /* other side will not set any security attributes */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { sexp_list_car(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { r = ESXRCBADPROT; goto __finish; } else val = sx_in->val; if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */ sexp_list_cdr(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { r = ESXRCBADPROT; goto __finish; } else var = sx_in->val; /* ok, now we need to analyze parameters */ if(!strcmp(val, ":attr")) { co->pctx->p_attr = atoi(var); } else if(!strcmp(val, ":uid")) { co->pctx->uid = (ulong_t)atoll(var); } else if(!strcmp(val, ":gid")) { co->pctx->gid = (ulong_t)atoll(var); } else { /* just ignore in default implementation */ } } else continue; /* ignore */ } __finish: destroy_sexp(sx); return r; } static int __default_auth_set_error(void *cctx, sexp_t *sx) { char *errstr = NULL; int r; /* skip keyword itself */ sx->list = sx->list->next; /* be sure - this is a list */ if(sx->ty != SEXP_LIST) return ESXRCBADPROT; else sx = sx->list; /* get it */ errstr = sx->list->val; r = atoi(errstr); destroy_sexp(sx); return r; } static int __default_ch_get_types(void *cctx, sexp_t *sx) { conn_t *co = (conn_t *)cctx; usrtc_node_t *node; rpc_typed_list_t *list_ent; char *tbuf = malloc(4096), *tt; int err = 0; /* if we cannot allocate anything ... */ if(!tbuf) return ENOMEM; /* ok here we go */ co->rpc_list = conn_sys->get_rpc_typed_list_tree(co); /* ok, here we're don't need to parse anything */ if(!usrtc_count(co->rpc_list)) { err = ENXIO; snprintf(tbuf, 4096, "(ch-gl-error (%d))", err); } else { tt = tbuf; snprintf(tt, 4096, "(ch-set-types ("); tt += strlen(tt); for(node = usrtc_first(co->rpc_list); node != NULL; node = usrtc_next(co->rpc_list, node), tt += strlen(tt)) { list_ent = (rpc_typed_list_t *)usrtc_node_getdata(node); snprintf(tt, 4096, "(:%d \"%s\")", list_ent->type_id, list_ent->description); } snprintf(tt, 4096, "))"); } /* reply to this rpc */ if(__conn_write(co, tbuf, strlen(tbuf)) < 0) { co->flags &= ~CXCONN_ESTABL; co->flags |= CXCONN_BROKEN; __wake_up_waiters(co, ESXNOCONNECT); } free(tbuf); destroy_sexp(sx); return err; } static int __default_ch_set_types(void *cctx, sexp_t *sx) { conn_t *co = (conn_t *)cctx; char buf[1024], *val, *var; int r = 0, llen, typeid, idx; sexp_t *lsx, *sx_iter, *sx_in; /* skip keyword itself */ lsx = sx->list->next; /* now we expect a list of lists */ if(lsx->ty != SEXP_LIST) { r = ESXRCBADPROT; goto __send_reply; } /* take length of the list */ llen = sexp_list_length(lsx); if(!llen) return 0; /* other side will not set any security attributes */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { sexp_list_car(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { r = ESXRCBADPROT; goto __send_reply; } else val = sx_in->val; if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */ sexp_list_cdr(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_DQUOTE)) { r = ESXRCBADPROT; goto __send_reply; } else var = sx_in->val; /* ok, now we need to analyze parameters */ if(*val != ':') { r = ESXRCBADPROT; goto __send_reply; } else { if(conn_sys->set_typed_list_callback) { typeid = atoi((char *)(val + sizeof(char))); if(conn_sys->set_typed_list_callback(co, typeid, var)) { destroy_sexp(sx); return ENXIO; } } /* FIXME: if no function, accept or decline ? */ } } else continue; /* ignore */ } __send_reply: snprintf(buf, 1024, "(ch-gl-error (%d))", r); if(__conn_write(co, buf, strlen(buf)) < 0) { co->flags &= ~CXCONN_ESTABL; co->flags |= CXCONN_BROKEN; __wake_up_waiters(co, ESXNOCONNECT); } destroy_sexp(sx); return r; } static int __default_ch_gl_error(void *cctx, sexp_t *sx) { int r; char *errstr; conn_t *co = (conn_t *)cctx; if(co->flags & CXCONN_ESTABL) return EINVAL; /* error, we're already have channels list */ /* skip keyword itself */ sx->list = sx->list->next; /* be sure - this is a list */ if(sx->ty != SEXP_LIST) return ESXRCBADPROT; else sx = sx->list; /* get it */ errstr = sx->list->val; r = atoi(errstr); if(!r) co->flags |= CXCONN_ESTABL; return r; } static int __default_ch_open(void *cctx, sexp_t *sx) { conn_t *co = (conn_t *)cctx; usrtc_node_t *node; char *val, *var, *uuid = NULL, *buf; int typ = -1, idx, llen, r; ulong_t cid; sexp_t *lsx, *sx_iter, *sx_in; rpc_typed_list_t *rlist; chnl_t *channel; /* skip keyword itself */ lsx = sx->list->next; /* now we expect a list of lists */ if(lsx->ty != SEXP_LIST) { printf("%s:%d\n", __FUNCTION__, __LINE__); r = ESXRCBADPROT; goto __send_repl; } /* take length of the list */ llen = sexp_list_length(lsx); if(!llen) return 0; /* other side will not set any security attributes */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { sexp_list_car(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { printf("%s:%d\n", __FUNCTION__, __LINE__); r = ESXRCBADPROT; goto __send_repl; } else val = sx_in->val; if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */ sexp_list_cdr(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { r = ESXRCBADPROT; printf("%s:%d\n", __FUNCTION__, __LINE__); goto __send_repl; } else var = sx_in->val; /* ok, now we need to analyze parameters */ if(*val != ':') { r = ESXRCBADPROT; goto __send_repl; } else { if(!strcmp((char *)(val + sizeof(char)), "type")) typ = atoi(var); else if(!strcmp((char *)(val + sizeof(char)), "id")) cid = atoll(var); else if(!strcmp((char *)(val + sizeof(char)), "uuid")) uuid = var; } } else continue; /* ignore */ } /* additional check for type of the channel */ node = usrtc_lookup(co->rpc_list, &typ); if(!node) { r = ESXNOCHANSUP; /* printf("%s:%d (usrtc count: %d) (typ %d)\n", __FUNCTION__, __LINE__, usrtc_count(co->rpc_list), typ);*/ node = usrtc_first(co->rpc_list); rlist = (rpc_typed_list_t *)usrtc_node_getdata(node); printf("---- rlist->type_id = %d\n", rlist->type_id); goto __send_repl; } else rlist = (rpc_typed_list_t *)usrtc_node_getdata(node); /* now we need to check up the channel */ pthread_rwlock_rdlock(&(co->chnl_lock)); node = usrtc_lookup(co->chnl_tree, &cid); if(node) { pthread_rwlock_unlock(&(co->chnl_lock)); r = EEXIST; goto __send_repl; } else { idx_reserve(co->idx_ch, cid); pthread_rwlock_unlock(&(co->chnl_lock)); /* now we should alloc channel */ if((r = __alloc_channel(cid, co, rlist, &channel))) { pthread_rwlock_wrlock(&(co->chnl_lock)); idx_free(co->idx_ch, cid); pthread_rwlock_unlock(&(co->chnl_lock)); goto __send_repl; } else { /* now we ready to confirm channel creation */ pthread_rwlock_wrlock(&(co->chnl_lock)); usrtc_insert(co->chnl_tree, &(channel->node), &(channel->cid)); pthread_rwlock_unlock(&(co->chnl_lock)); r = 0; } } __send_repl: buf = malloc(2048); snprintf(buf, 2048, "(ch-open-ret ((:error %d)(:uuid %s)(:id %ld)))", r, uuid, cid); if(__conn_write(co, buf, strlen(buf)) < 0) { co->flags &= ~CXCONN_ESTABL; co->flags |= CXCONN_BROKEN; __wake_up_waiters(co, ESXNOCONNECT); } destroy_sexp(sx); free(buf); return r; } static int __default_ch_open_ret(void *cctx, sexp_t *sx) { conn_t *co = (conn_t *)cctx; chnl_t *chan; usrtc_node_t *node; int err = 0, r, llen, idx; ulong_t id; char *val, *var; sexp_t *lsx, *sx_iter, *sx_in; sxmsg_t *sms = NULL; /* skip keyword itself */ lsx = sx->list->next; /* now we expect a list of lists */ if(lsx->ty != SEXP_LIST) { //printf("%s:%d\n", __FUNCTION__, __LINE__); r = ESXRCBADPROT; goto __mark_msg; } /* take length of the list */ llen = sexp_list_length(lsx); if(!llen) return EINVAL; /* !! other side will not set any security attributes */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { sexp_list_car(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { r = ESXRCBADPROT; goto __mark_msg; } else val = sx_in->val; if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */ sexp_list_cdr(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { r = ESXRCBADPROT; goto __mark_msg; } else var = sx_in->val; /* ok, now we need to analyze parameters */ if(*val != ':') { r = ESXRCBADPROT; goto __mark_msg; } else { if(!strcmp((char *)(val + sizeof(char)), "error")) err = atoi(var); else if(!strcmp((char *)(val + sizeof(char)), "id")) id = atoll(var); } } else continue; /* ignore */ } /* try to find desired channel to intercept message */ pthread_rwlock_rdlock(&(co->chnl_lock)); node = usrtc_lookup(co->chnl_tree, (void *)&id); //printf("channels (%d)\n", usrtc_count(co->chnl_tree)); pthread_rwlock_unlock(&(co->chnl_lock)); if(node) { chan = (chnl_t *)usrtc_node_getdata(node); sms = chan->sysmsg; } __mark_msg: if(!sms) return r; sms->flags &= ~ESXMSG_PENDING; /* the message is done */ sms->opcode = err; destroy_sexp(sx); /* unlock mutex to wake up the waiting thread */ pthread_mutex_unlock(&(sms->wait)); return 0; } static int __default_ch_close(void *cctx, sexp_t *sx) { conn_t *co = (conn_t *)cctx; usrtc_node_t *node; char *val, *var, *buf; int idx, llen, r; ulong_t cid = -1; sexp_t *lsx, *sx_iter, *sx_in; chnl_t *channel = NULL; r = 0; /* skip keyword itself */ lsx = sx->list->next; /* now we expect a list of lists */ if(lsx->ty != SEXP_LIST) { printf("%s:%d\n", __FUNCTION__, __LINE__); r = ESXRCBADPROT; goto __send_repl; } /* take length of the list */ llen = sexp_list_length(lsx); if(!llen) return 0; /* other side will not set any security attributes */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { continue; } if(!SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { printf("%s:%d\n", __FUNCTION__, __LINE__); r = ESXRCBADPROT; goto __send_repl; } else val = sx_iter->val; sx_in = sx_iter->next; if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { r = ESXRCBADPROT; printf("%s:%d\n", __FUNCTION__, __LINE__); goto __send_repl; } else var = sx_in->val; /* ok, now we need to analyze parameters */ if(*val != ':') { r = ESXRCBADPROT; goto __send_repl; } else { if(!strcmp((char *)(val + sizeof(char)), "id")) { cid = atoll(var); break; } } } //printf("%s(%ld)\n", __FUNCTION__, cid); /* additional check for type of the channel */ pthread_rwlock_rdlock(&(co->chnl_lock)); node = usrtc_lookup(co->chnl_tree, &cid); pthread_rwlock_unlock(&(co->chnl_lock)); if(!node) { r = ENOENT; printf("there is no channel with id=%ld\n", cid); goto __send_repl; } channel = (chnl_t *)usrtc_node_getdata(node); __send_repl: buf = malloc(2048); snprintf(buf, 2048, "(ch-close-ret ((:id %ld) (:error %d)))", channel->cid, r); /* check up the message queue */ pthread_rwlock_rdlock(&(channel->msglock)); if(usrtc_count(channel->msgs_tree)) { fprintf(stderr, "Operation is not permitted. There are some " "undelivered messages in the message tree"); free(buf); destroy_sexp(sx); return EPERM; } pthread_rwlock_unlock(&(channel->msglock)); /* remove channel from the search tree */ pthread_rwlock_wrlock(&(co->chnl_lock)); usrtc_delete(co->chnl_tree, &(channel->node)); /* free index */ idx_free(co->idx_ch, channel->cid); pthread_rwlock_unlock(&(co->chnl_lock)); idx_allocator_destroy(channel->idx_msg); free(channel->idx_msg); free(channel->msgs_tree); pthread_mutex_destroy(&(channel->oplock)); pthread_rwlock_destroy(&(channel->msglock)); free(channel); destroy_sexp(sx); if(__conn_write(co, buf, strlen(buf)) < 0) { co->flags &= ~CXCONN_ESTABL; co->flags |= CXCONN_BROKEN; __wake_up_waiters(co, ESXNOCONNECT); } free(buf); return 0; } static int __default_ch_close_ret(void *cctx, sexp_t *sx) { ; conn_t *co = (conn_t *)cctx; chnl_t *chan; usrtc_node_t *node; int err = 0, r, llen, idx; ulong_t id; char *val, *var; sexp_t *lsx, *sx_iter, *sx_in; sxmsg_t *sms = NULL; /* skip keyword itself */ lsx = sx->list->next; /* now we expect a list of lists */ if(lsx->ty != SEXP_LIST) { //printf("%s:%d\n", __FUNCTION__, __LINE__); r = ESXRCBADPROT; goto __mark_msg; } /* take length of the list */ llen = sexp_list_length(lsx); if(!llen) return EINVAL; /* !! other side will not set any security attributes */ ; SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { sexp_list_car(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { r = ESXRCBADPROT; goto __mark_msg; } else val = sx_in->val; if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */ sexp_list_cdr(sx_iter, &sx_in); if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { r = ESXRCBADPROT; goto __mark_msg; } else var = sx_in->val; /* ok, now we need to analyze parameters */ if(*val != ':') { r = ESXRCBADPROT; goto __mark_msg; } else { if(!strcmp((char *)(val + sizeof(char)), "error")) err = atoi(var); else if(!strcmp((char *)(val + sizeof(char)), "id")) id = atoll(var); } } else continue; /* ignore */ } ; /* try to find desired channel to intercept message */ pthread_rwlock_rdlock(&(co->chnl_lock)); node = usrtc_lookup(co->chnl_tree, (void *)&id); //printf("channels (%d)\n", usrtc_count(co->chnl_tree)); pthread_rwlock_unlock(&(co->chnl_lock)); if(node) { chan = (chnl_t *)usrtc_node_getdata(node); sms = chan->sysmsg; } ; __mark_msg: ; if(!sms) return r; sms->flags &= ~ESXMSG_PENDING; /* the message is done */ sms->opcode = err; destroy_sexp(sx); /* unlock mutex to wake up the waiting thread */ pthread_mutex_unlock(&(sms->wait)); return 0; } /* create a nould of the message */ static int __create_reg_msg_mould(sxmsg_t **msg, chnl_t *ch, ulong_t mid) { int r = 0; sxmsg_t *sm = __allocate_msg(&r); if(r) return r; else { sm->pch = ch; sm->flags = (ESXMSG_USR | ESXMSG_PENDING); sm->mid = mid; /* ok reserve message ID */ pthread_mutex_lock(&(ch->oplock)); idx_reserve(ch->idx_msg, mid); pthread_mutex_unlock(&(ch->oplock)); pthread_mutex_lock(&(sm->wait)); *msg = sm; } return 0; } // TODO: check and continue static int __default_msg_pulse(void *cctx, sexp_t *sx) { conn_t *co = (conn_t *)cctx; usrtc_node_t *node = NULL; chnl_t *chan = NULL; int r = 0; sexp_t *lsx = NULL, *sx_iter = NULL; sexp_t *sx_sublist = NULL, *sx_value = NULL; ulong_t chnl_id = -1; ulong_t msg_id = -1; sexp_t *msg = NULL; sxmsg_t *smsg = NULL; int idx; /* get parameters from the message */ if(sexp_list_cdr(sx, &lsx)) return ESXRCBADPROT; if(!SEXP_IS_LIST(lsx)) return ESXRCBADPROT; /* find channel id */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { sx_sublist = sx_iter; continue; } else { if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { if(strcmp(sx_iter->val, ":chid")) { continue; // ignore it } sx_value = sx_iter->next; if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { continue; } chnl_id = atol(sx_value->val); } else continue; // ignore it } } lsx = sx_sublist; /* find message id */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { msg = sx_iter; continue; } else { if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { if(strcmp(sx_iter->val, ":msgid")) { continue; // ignore } sx_value = sx_iter->next; if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { continue; } msg_id = atol(sx_value->val); } else continue; // ignore it } } if(msg_id < 0 || chnl_id < 0) { return ESXRCBADPROT; } /* find channel */ if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT; else chan = (chnl_t *)usrtc_node_getdata(node); /* lookup for the message */ pthread_rwlock_rdlock(&(chan->msglock)); if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) { pthread_rwlock_unlock(&(chan->msglock)); /* here, rpc lookup has no sense, just put this job to the queue */ /* btw, we're need to create a message first */ r = __create_reg_msg_mould(&smsg, chan, msg_id); if(r) return r; /* assign the message */ smsg->opcode = 0; smsg->payload = (void *)msg; /* assign initial S-expression structure */ smsg->initial_sx = sx; /* put the message to the search tree */ pthread_rwlock_wrlock(&(chan->msglock)); usrtc_insert(chan->msgs_tree, &(smsg->pendingq_node), &(smsg->mid)); pthread_rwlock_unlock(&(chan->msglock)); } else { //printf(">>>>>>>>>>>>>>>>>>>msg_id = %lu\n", msg_id); pthread_rwlock_unlock(&(chan->msglock)); smsg = (sxmsg_t *)usrtc_node_getdata(node); msg_return(smsg, EEXIST); return EEXIST; } /* put job to the queue and give up */ r = pth_queue_add(co->rqueue, (void *)smsg, USR_MSG); if(r) { /* cannot put job to the queue */ pthread_rwlock_wrlock(&(chan->msglock)); usrtc_delete(chan->msgs_tree, &(smsg->pendingq_node)); pthread_rwlock_unlock(&(chan->msglock)); __destroy_msg(smsg); return r; } //msg_return(smsg, r); /* put to the IN queue */ return r; } static int __default_msg_pulse_ret(void *cctx, sexp_t *sx) { return 0; } static int __default_msg(void *cctx, sexp_t *sx) { conn_t *co = (conn_t *)cctx; usrtc_node_t *node = NULL; chnl_t *chan = NULL; int r = 0; sexp_t *lsx = NULL, *sx_iter = NULL; sexp_t *sx_sublist = NULL, *sx_value = NULL; ulong_t chnl_id = -1; ulong_t msg_id = -1; sexp_t *msg = NULL; sxmsg_t *smsg = NULL; int idx; /* get parameters from the message */ if(sexp_list_cdr(sx, &lsx)) return ESXRCBADPROT; if(!SEXP_IS_LIST(lsx)) return ESXRCBADPROT; /* find channel id */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { sx_sublist = sx_iter; continue; } else { if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { if(strcmp(sx_iter->val, ":chid")) { continue; // ignore it } sx_value = sx_iter->next; if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { continue; } chnl_id = atol(sx_value->val); } else continue; // ignore it } } lsx = sx_sublist; /* find message id */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { msg = sx_iter; continue; } else { if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { if(strcmp(sx_iter->val, ":msgid")) { continue; // ignore } sx_value = sx_iter->next; if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { continue; } msg_id = atol(sx_value->val); } else continue; // ignore it } } if(msg_id < 0 || chnl_id < 0) { return ESXRCBADPROT; } /* find channel */ if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT; else chan = (chnl_t *)usrtc_node_getdata(node); /* lookup for the message */ pthread_rwlock_rdlock(&(chan->msglock)); if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) { pthread_rwlock_unlock(&(chan->msglock)); /* here, rpc lookup has no sense, just put this job to the queue */ /* btw, we're need to create a message first */ r = __create_reg_msg_mould(&smsg, chan, msg_id); if(r) return r; /* assign the message */ smsg->opcode = 0; smsg->payload = (void *)msg; /* assign initial S-expression structure */ smsg->initial_sx = sx; /* put the message to the search tree */ pthread_rwlock_wrlock(&(chan->msglock)); usrtc_insert(chan->msgs_tree, &(smsg->pendingq_node), &(smsg->mid)); pthread_rwlock_unlock(&(chan->msglock)); } else { //printf(">>>>>>>>>>>>>>>>>>>msg_id = %lu\n", msg_id); pthread_rwlock_unlock(&(chan->msglock)); smsg = (sxmsg_t *)usrtc_node_getdata(node); msg_return(smsg, EEXIST); return EEXIST; } /* put job to the queue and give up */ r = pth_queue_add(co->rqueue, (void *)smsg, USR_MSG); if(r) { /* cannot put job to the queue */ msg_return(smsg, r); pthread_rwlock_wrlock(&(chan->msglock)); usrtc_delete(chan->msgs_tree, &(smsg->pendingq_node)); pthread_rwlock_unlock(&(chan->msglock)); __destroy_msg(smsg); return r; } /* put to the IN queue */ return r; } /* TODO: optimize amount of code: (must be first) * __default_msg_return * __default_msg_reply * there are many copy-n-paste code!!!!! */ static int __default_msg_return(void *cctx, sexp_t *sx) { conn_t *co = (conn_t *)cctx; usrtc_node_t *node = NULL; chnl_t *chan = NULL; int r = 0; sexp_t *lsx = NULL, *sx_iter = NULL; sexp_t *sx_sublist = NULL, *sx_value = NULL; ulong_t chnl_id = -1; ulong_t msg_id = -1; sexp_t *msg = NULL; sxmsg_t *smsg = NULL; int idx, opcode; /* get parameters from the message */ if(sexp_list_cdr(sx, &lsx)) { r = ESXRCBADPROT; goto __finish; } if(!SEXP_IS_LIST(lsx)) { r = ESXRCBADPROT; goto __finish; } /* get parameters */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { sx_sublist = sx_iter; continue; } else { if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { if(strcmp(sx_iter->val, ":chid")) { continue; // ignore it } sx_value = sx_iter->next; if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { continue; } chnl_id = atol(sx_value->val); } else continue; // ignore it } } lsx = sx_sublist; /* find message id */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { msg = sx_iter; continue; } else { if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { if(strcmp(sx_iter->val, ":msgid")) { continue; // ignore } sx_value = sx_iter->next; if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { continue; } msg_id = atol(sx_value->val); } else continue; // ignore it } } /* get opcode */ sexp_list_car(msg, &lsx); opcode = atoi(lsx->val); if(msg_id < 0 || chnl_id < 0) { r = ESXRCBADPROT; goto __finish; } //printf("chnl_id = %ld\n", chnl_id); if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT; else chan = (chnl_t *)usrtc_node_getdata(node); /* lookup for the message */ pthread_rwlock_rdlock(&(chan->msglock)); if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) { pthread_rwlock_unlock(&(chan->msglock)); r = ENOENT; goto __finish; } else { pthread_rwlock_unlock(&(chan->msglock)); smsg = (sxmsg_t *)usrtc_node_getdata(node); smsg->opcode = opcode; if(smsg->flags & ESXMSG_ISREPLY) destroy_sexp((sexp_t *)smsg->payload); smsg->payload = NULL; smsg->flags |= ESXMSG_CLOSURE; /* Q: can we remove the message from the tree there??? */ /* A: actually no */ /* first remove the message from tree */ if(smsg->flags & ESXMSG_RMONRETR) { pthread_rwlock_wrlock(&(chan->msglock)); usrtc_delete(chan->msgs_tree, &(smsg->pendingq_node)); pthread_rwlock_unlock(&(chan->msglock)); } pthread_mutex_unlock(&(smsg->wait)); } __finish: destroy_sexp(sx); return r; } static int __default_msg_reply(void *cctx, sexp_t *sx) { conn_t *co = (conn_t *)cctx; usrtc_node_t *node = NULL; chnl_t *chan = NULL; int r = 0; sexp_t *lsx = NULL, *sx_iter = NULL; sexp_t *sx_sublist = NULL, *sx_value = NULL; ulong_t chnl_id = -1; ulong_t msg_id = -1; sexp_t *msg = NULL; sxmsg_t *smsg = NULL; int idx; /* get parameters from the message */ if(sexp_list_cdr(sx, &lsx)) { r = ESXRCBADPROT; goto __finish; } if(!SEXP_IS_LIST(lsx)) { r = ESXRCBADPROT; goto __finish; } /* get parameters */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { sx_sublist = sx_iter; continue; } else { if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { if(strcmp(sx_iter->val, ":chid")) { continue; // ignore it } sx_value = sx_iter->next; if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { continue; } chnl_id = atol(sx_value->val); } else continue; // ignore it } } lsx = sx_sublist; /* find message id */ SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { msg = sx_iter; continue; } else { if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { if(strcmp(sx_iter->val, ":msgid")) { continue; // ignore } sx_value = sx_iter->next; if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { continue; } msg_id = atol(sx_value->val); } else continue; // ignore it } } if(msg_id < 0 || chnl_id < 0) { r = ESXRCBADPROT; goto __finish; } if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT; else chan = (chnl_t *)usrtc_node_getdata(node); /* lookup for the message */ pthread_rwlock_rdlock(&(chan->msglock)); if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) { pthread_rwlock_unlock(&(chan->msglock)); r = ENOENT; goto __finish; } else { pthread_rwlock_unlock(&(chan->msglock)); smsg = (sxmsg_t *)usrtc_node_getdata(node); smsg->opcode = SXOREPLYREQ; smsg->payload = copy_sexp(msg); smsg->flags |= ESXMSG_ISREPLY; pthread_mutex_unlock(&(smsg->wait)); } __finish: destroy_sexp(sx); return r; } static int __init_systemrpc_tree(usrtc_t *rtree) { /* security context functions */ if(__insert_rpc_function(rtree, "auth-set-context", __default_auth_set_context)) goto __fail; if(__insert_rpc_function(rtree, "auth-set-attr", __default_auth_set_attr)) goto __fail; if(__insert_rpc_function(rtree, "auth-set-error", __default_auth_set_error)) goto __fail; /* channels negotiation ops */ if(__insert_rpc_function(rtree, "ch-get-types", __default_ch_get_types)) goto __fail; if(__insert_rpc_function(rtree, "ch-gl-error", __default_ch_gl_error)) goto __fail; if(__insert_rpc_function(rtree, "ch-set-types", __default_ch_set_types)) goto __fail; if(__insert_rpc_function(rtree, "ch-open", __default_ch_open)) goto __fail; if(__insert_rpc_function(rtree, "ch-open-ret", __default_ch_open_ret)) goto __fail; if(__insert_rpc_function(rtree, "ch-close", __default_ch_close)) goto __fail; if(__insert_rpc_function(rtree, "ch-close-ret", __default_ch_close_ret)) goto __fail; /* messaging functions */ if(__insert_rpc_function(rtree, "ch-msg-pulse", __default_msg_pulse)) goto __fail; if(__insert_rpc_function(rtree, "ch-msg-pulse-ret", __default_msg_pulse_ret)) goto __fail; if(__insert_rpc_function(rtree, "ch-msg", __default_msg)) goto __fail; if(__insert_rpc_function(rtree, "ch-msg-rete", __default_msg_return)) goto __fail; if(__insert_rpc_function(rtree, "ch-msg-repl", __default_msg_reply)) goto __fail; return 0; __fail: __destroy_rpc_list_tree(rtree); return ENOMEM; } static int __eval_cstr(char *cstr, cx_rpc_list_t *rpc_list, void *ctx) { int r = ENOENT; sexp_t *sx; usrtc_node_t *node; cx_rpc_t *rentry; char *rpcf; if(!(sx = parse_sexp(cstr, strlen(cstr)))) return EBADE; if(sx->ty == SEXP_LIST) rpcf = sx->list->val; else rpcf = sx->val; /* find an appropriate function */ node = usrtc_lookup(rpc_list->rpc_tree, rpcf); if(!node) return ENOENT; else rentry = (cx_rpc_t *)usrtc_node_getdata(node); /* call it */ r = rentry->rpcf(ctx, sx); return r; } static void *__cxslave_thread_listener(void *wctx) { conn_t *co = (conn_t *)wctx; char *buf = malloc(4096); int r; while((r = __conn_read(co, buf, 4096)) != -1) { buf[r] = '\0'; r = __eval_cstr(buf, conn_sys->system_rpc, co); } co->flags &= ~CXCONN_ESTABL; co->flags |= CXCONN_BROKEN; __wake_up_waiters(co, ESXNOCONNECT); free(buf); return NULL; } static void *__cxmaster_thread_listener(void *wctx) { conn_t *co = (conn_t *)wctx; char *buf = malloc(4096); int r; while((r = __conn_read(co, buf, 4096)) != -1) { buf[r] = '\0'; // if(r) printf("Got the message %s (%d bytes)\n", buf, r); r = __eval_cstr(buf, conn_sys->system_rpc, co); } co->flags &= ~CXCONN_ESTABL; co->flags |= CXCONN_BROKEN; __wake_up_waiters(co, ESXNOCONNECT); free(buf); return NULL; } static void *__rmsg_queue_thread(void *ctx) { conn_t *co = (conn_t *)ctx; pth_msg_t *tmp = malloc(sizeof(pth_msg_t)); sxmsg_t *msg; chnl_t *ch; int r = 0; char *rpcf; sexp_t *sx; usrtc_node_t *node = NULL; cx_rpc_t *rpccall; struct __rpc_job *rjob = NULL; if(!tmp) return NULL; while(1) { r = pth_queue_get(co->rqueue, NULL, tmp); if(r) { free(tmp); return NULL; } msg = tmp->data; if(!msg) continue; /* spurious !! */ /* check to right job */ if(!(msg->flags & ESXMSG_USR)) { /* not a regular message */ msg->flags |= ESXMSG_NOWAY; /* mark it's as undeliverable */ msg->flags &= ~ESXMSG_PENDING; pthread_mutex_unlock(&(msg->wait)); /* wake up the waitee */ continue; } else { /* now we're need to have a deal with the rpc calling, other - we don't care */ ch = msg->pch; sx = (sexp_t *)msg->payload; /* get the function name */ if(sx->ty == SEXP_LIST) rpcf = sx->list->val; else { //rpcf = sx->val; r = ESXRCBADPROT; goto __err_ret; } node = usrtc_lookup(ch->rpc_list->rpc_tree, rpcf); if(!node) { r = ENOENT; __err_ret: msg_return(msg, r); } else { rpccall = (cx_rpc_t *)usrtc_node_getdata(node); /* call this ! */ rjob = malloc(sizeof(struct __rpc_job)); // TODO: check it rjob->msg = msg; rjob->sx = sx; rjob->rpcf = rpccall->rpcf; pth_dqtpoll_add(co->tpoll, (void *)rjob, USR_MSG); // TODO: check it } } } return NULL; } static void *__msg_queue_thread(void *ctx) { conn_t *co = (conn_t *)ctx; pth_msg_t *tmp = malloc(sizeof(pth_msg_t)); sxmsg_t *msg; chnl_t *ch; int r = 0, len; char *buf = malloc(4096), *tb; sexp_t *sx; if(!tmp || !buf) { if(tmp) free(tmp); if(buf) free(buf); return NULL; } while(1) { r = pth_queue_get(co->mqueue, NULL, tmp); if(r) { free(buf); free(tmp); return NULL; } /* message workout */ msg = tmp->data; if(!msg) continue; /* spurious message */ if(!(msg->flags & ESXMSG_USR)) { /* not a regular message */ msg->flags |= ESXMSG_NOWAY; /* mark it's as undeliverable */ msg->flags &= ~ESXMSG_PENDING; pthread_mutex_unlock(&(msg->wait)); /* wake up the waitee */ continue; } else { ch = msg->pch; /* now we need to complete the request */ sx = (sexp_t *)msg->payload; tb = buf; if(!(msg->flags & ESXMSG_PULSE)) { /* %s))) */ if(!(msg->flags & ESXMSG_ISREPLY)) snprintf(buf, 4096, "(ch-msg (:chid %lu (:msgid %lu ", ch->cid, msg->mid); else { if(!sx) { snprintf(buf, 4096, "(ch-msg-rete (:chid %lu (:msgid %lu (%d))))", ch->cid, msg->mid, msg->opcode); /* mark it to close */ msg->flags |= ESXMSG_CLOSURE; /* ok, here we will write it and wait, destroying dialog while reply */ ; goto __ssl_write; } else snprintf(buf, 4096, "(ch-msg-repl (:chid %lu (:msgid %lu ", ch->cid, msg->mid); } ; len = strlen(buf); tb += len*sizeof(char); if(print_sexp(tb, 4096 - (len + 4*sizeof(char)), sx) == -1) { msg->opcode = ENOMEM; /* we don't need to wake up anybody */ if(msg->flags & ESXMSG_TIMEDOUT) { /* clean up all the shit: * 1. remove from message tree * 2. destroy message itself */ destroy_sexp(msg->initial_sx); msg->initial_sx = NULL; msg->payload = NULL; if(msg->flags & ESXMSG_ISREPLY) destroy_sexp(msg->payload); } else { ; pthread_mutex_unlock(&(msg->wait)); } } } else { /* pulse messages */ /* here we're shouldn't process reply procedure */ snprintf(buf, 4096, "(ch-msg-pulse (:chid %lu (:msgid %lu ", ch->cid, msg->mid); len = strlen(buf); /* FIXME: code double shit ! */ tb += len*sizeof(char); if(print_sexp(tb, 4096 - (len + 4*sizeof(char)), sx) == -1) { msg->opcode = ENOMEM; /* we don't need to wake up anybody */ if(msg->flags & ESXMSG_TIMEDOUT) { /* clean up all the shit: * 1. remove from message tree * 2. destroy message itself */ destroy_sexp(msg->initial_sx); msg->initial_sx = NULL; msg->payload = NULL; __destroy_msg(msg); } else pthread_mutex_unlock(&(msg->wait)); } } len = strlen(tb); tb += len*sizeof(char); strcat(tb, ")))"); __ssl_write: if(msg->flags & ESXMSG_CLOSURE) { /* first remove the message from tree */ pthread_rwlock_wrlock(&(ch->msglock)); usrtc_delete(ch->msgs_tree, &(msg->pendingq_node)); pthread_rwlock_unlock(&(ch->msglock)); /* destroy */ destroy_sexp(msg->initial_sx); if(msg->flags & ESXMSG_ISREPLY && msg->payload) destroy_sexp((sexp_t *)msg->payload); __destroy_msg(msg); } /* write it */ if(__conn_write(co, (void *)buf, strlen(buf) + sizeof(char)) < 0) { co->flags &= ~CXCONN_ESTABL; co->flags |= CXCONN_BROKEN; __wake_up_waiters(co, ESXNOCONNECT); } } len = 0; } free(buf); return NULL; } /* this function is an ugly implementation to get C string with uuid */ extern char *__generate_uuid(void); /* this is a callback to perform a custom SSL certs chain validation, * as I promised here the comments, a lot of ... * The first shit: 0 means validation failed, 1 otherwise * The second shit: X509 API, I guess u will love it ;-) * openssl calls this function for each certificate in chain, * since our case is a simple (depth of chain is one, since we're * don't care for public certificates lists or I cannot find any reasons to * do it ...), amount of calls reduced, and in this case we're interested * only in top of chain i.e. actual certificate used on client side, * the validity of signing for other certificates within chain is * guaranteed by the ssl itself. * u know, we need to lookup in database, or elsewhere... some information * about client certificate, and decide - is it valid, or not?, if so * yep I mean it's valid, we can assign it's long fucking number to * security context, to use in ongoing full scaled connection handshaking. */ static int __verify_certcall(int preverify_ok, X509_STORE_CTX *ctx) { // X509 *cert = X509_STORE_CTX_get_current_cert(ctx); int err = X509_STORE_CTX_get_error(ctx), depth = X509_STORE_CTX_get_error_depth(ctx); SSL *ssl = X509_STORE_CTX_get_ex_data(ctx, SSL_get_ex_data_X509_STORE_CTX_idx()); conn_t *co = SSL_get_ex_data(ssl, conn_sys->ex_ssldata_index); /* this is a custom data we're set before */ /* now we need to check for certificates with a long chain, * so since we have a short one, reject long ones */ if(depth > VERIFY_DEPTH) { /* longer than we expect */ preverify_ok = 0; /* yep, 0 means error for those function callback in openssl, fucking set */ err = X509_V_ERR_CERT_CHAIN_TOO_LONG; X509_STORE_CTX_set_error(ctx, err); } if(!preverify_ok) return 0; /* ok, now we're on top of SSL (depth == 0) certs chain, * and we can validate client certificate */ if(!depth) { co->pctx = malloc(sizeof(perm_ctx_t)); co->pctx->certid = ASN1_INTEGER_get((const ASN1_INTEGER *)X509_get_serialNumber(ctx->current_cert)); //printf("Certificate ID: %lu\n", co->pctx->certid); /* now we're need to check the ssl cert */ if(conn_sys->validate_sslpem) { if(conn_sys->validate_sslpem(co)) return 0; else return 1; } else return 0; } return preverify_ok; } /* dummy just to check the server side */ static int __verify_certcall_dummy(int preverify_ok, X509_STORE_CTX *ctx) { return preverify_ok; } /* subsystem: here u can told me about how it's ugly to use global pointers, * yep, it's a business of fucking morons, btw it works (heh, openssl uses this * ancient shit method too, many many and many others too, trust me ...). * subsystem required to define varios RPC lists, control list for connections, * general queues, certificates (all connections uses the same set of certificates * within application), general calls such as ... calls to get info about client * cert and ... many other things. */ void *__system_queue_listener(void *data) { int r; pth_msg_t *tmp = malloc(sizeof(pth_msg_t)); sxmsg_t *sysmsg; sxpayload_t *payload; chnl_t *chan; conn_t *co; if(!tmp) return NULL; while(1) { r = pth_queue_get(conn_sys->ioqueue, NULL, tmp); if(r) { free(tmp); return NULL; } /* ok message is delivered */ sysmsg = tmp->data; if(!sysmsg) continue; /* ignore dummy messages */ if(!(sysmsg->flags & ESXMSG_SYS)) { /* not a system message */ sysmsg->flags |= ESXMSG_NOWAY; /* mark it's as undeliverable */ sysmsg->flags &= ~ESXMSG_PENDING; pthread_mutex_unlock(&(sysmsg->wait)); /* wake up the waitee */ continue; } else { chan = sysmsg->pch; co = chan->connection; payload = (sxpayload_t *)sysmsg->payload; /* write the buf */ if(__conn_write(co, (void *)payload->cstr, strlen(payload->cstr) + 1) < 0) { co->flags &= ~CXCONN_ESTABL; co->flags |= CXCONN_BROKEN; __wake_up_waiters(co, ESXNOCONNECT); } } } return NULL; } /* general initialization must be called within app uses connection layer */ int connections_subsystem_init(void) { int r = 0; if(!(conn_sys = malloc(sizeof(conn_sys_t)))) return ENOMEM; else if(!(conn_sys->connections = malloc(sizeof(usrtc_t)))) { r = ENOMEM; goto __fail; } /* zeroing */ conn_sys->rootca = conn_sys->certkey = conn_sys->certpem = NULL; conn_sys->validate_sslpem = NULL; conn_sys->secure_check = NULL; /* init connections list */ usrtc_init(conn_sys->connections, USRTC_REDBLACK, MAX_CONNECTIONS, __cmp_cstr); if((r = pthread_rwlock_init(&(conn_sys->rwlock), NULL))) goto __fail_1; /* init queues */ if(!(conn_sys->ioq = malloc(sizeof(pth_queue_t)))) { /* general io queue */ r = ENOMEM; goto __fail_2; } if((r = pth_queue_init(conn_sys->ioq))) goto __fail_3; if(!(conn_sys->ioqueue = malloc(sizeof(pth_queue_t)))) { /* system io queue */ r = ENOMEM; goto __fail_2; } if((r = pth_queue_init(conn_sys->ioqueue))) goto __fail_3_1; /* init SSL certificates checking functions */ /* init RPC list related functions */ if(!(conn_sys->system_rpc = malloc(sizeof(cx_rpc_list_t)))) { r = ENOMEM; goto __fail_3; } else { if(!(conn_sys->system_rpc->rpc_tree = malloc(sizeof(usrtc_t)))) { r = ENOMEM; __fail_rpc: free(conn_sys->system_rpc); goto __fail_3_1; } usrtc_init(conn_sys->system_rpc->rpc_tree, USRTC_SPLAY, 256, __cmp_cstr); r = __init_systemrpc_tree(conn_sys->system_rpc->rpc_tree); if(r) { free(conn_sys->system_rpc->rpc_tree); goto __fail_rpc; } } /* init SSL library */ SSL_library_init(); OpenSSL_add_all_algorithms(); SSL_load_error_strings(); conn_sys->ex_ssldata_index = SSL_get_ex_new_index(0, "__ssldata index", NULL, NULL, NULL); /* create threads for queue */ if((r = pthread_create(&conn_sys->ios_thread, NULL, __system_queue_listener, NULL))) { goto __fail_rpc; } return 0; __fail_3_1: free(conn_sys->ioqueue); __fail_3: free(conn_sys->ioq); __fail_2: pthread_rwlock_destroy(&(conn_sys->rwlock)); __fail_1: free(conn_sys->connections); __fail: free(conn_sys); return r; } /* load certificates */ int connections_subsystem_setsslserts(const char *rootca, const char *certpem, const char *certkey) { int r = ENOMEM; if(!conn_sys) return EINVAL; /* simply copying */ if(!(conn_sys->rootca = strdup(rootca))) return ENOMEM; if(!(conn_sys->certkey = strdup(certkey))) goto __fail; if(!(conn_sys->certpem = strdup(certpem))) goto __fail; r = 0; return 0; __fail: if(conn_sys->rootca) free(conn_sys->rootca); if(conn_sys->certkey) free(conn_sys->certkey); if(conn_sys->certpem) free(conn_sys->certpem); return r; } int connections_subsystem_setrpclist_function(usrtc_t* (*get_rpc_typed_list_tree)(conn_t *)) { conn_sys->get_rpc_typed_list_tree = get_rpc_typed_list_tree; return 0; } #define __TMPBUFLEN 2048 /* connection_initiate: perform a connection thru the socket to the * host with master certificate, i.e. it's a slave one for client. */ int connection_initiate(conn_t *co, const char *host, int port, const char *SSL_cert, perm_ctx_t *pctx) { int r = 0, sd; int bytes = 0; char *uuid; char *buf = NULL; struct hostent *host_; struct sockaddr_in addr; usrtc_t *ch_tree, *rpc_tree; pth_queue_t *mqueue = malloc(sizeof(pth_queue_t)); pth_dqtpoll_t *tpoll = malloc(sizeof(pth_dqtpoll_t)); idx_allocator_t *idx_ch = malloc(sizeof(idx_allocator_t)); if(!mqueue) { __fallenomem: r = ENOMEM; __fall0: if(mqueue) free(mqueue); if(tpoll) free(tpoll); if(idx_ch) free(idx_ch); return r; } if(!tpoll) goto __fallenomem; if(!idx_ch) goto __fallenomem; if(!co) { __falleinval: r = EINVAL; goto __fall0; } if(!host) goto __falleinval; if(!SSL_cert) goto __falleinval; if(!pctx) goto __falleinval; memset(co, 0, sizeof(conn_t)); pth_dqtpoll_init(tpoll, __rpc_callback); if(!idx_ch) return ENOMEM; else r = idx_allocator_init(idx_ch, MAX_CHANNELS*MAX_MULTI, 0); if(r) return r; if(!(uuid = __generate_uuid())) return ENOMEM; if(!(ch_tree = malloc(sizeof(usrtc_t)))) { r = ENOMEM; goto __fail; } if(!(rpc_tree = malloc(sizeof(usrtc_t)))) { r = ENOMEM; goto __fail_1; } if((r = pthread_mutex_init(&co->oplock, NULL))) goto __fail_2; if((r = pthread_rwlock_init(&co->chnl_lock, NULL))) goto __fail_3; usrtc_init(rpc_tree, USRTC_REDBLACK, MAX_RPC_LIST, __cmp_int); usrtc_init(ch_tree, USRTC_REDBLACK, MAX_CHANNELS, __cmp_ulong); co->idx_ch = idx_ch; /* assign message queue */ if((r = pth_queue_init(mqueue))) goto __fail_3; co->mqueue = mqueue; /* init SSL certificates and context */ co->ctx = SSL_CTX_new(TLSv1_2_client_method()); if(!co->ctx) { ERR_print_errors_fp(stderr); r = EINVAL; goto __fail_3; } else { SSL_CTX_set_verify(co->ctx, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, __verify_certcall_dummy); SSL_CTX_set_verify_depth(co->ctx, 1); /* FIXME: use configuration */ } /* load certificates */ SSL_CTX_load_verify_locations(co->ctx, conn_sys->rootca, NULL); /* set the local certificate from CertFile */ if(SSL_CTX_use_certificate_file(co->ctx, SSL_cert, SSL_FILETYPE_PEM)<=0) { r = EINVAL; goto __fail_3; } /* set the private key from KeyFile (may be the same as CertFile) */ if(SSL_CTX_use_PrivateKey_file(co->ctx, SSL_cert, SSL_FILETYPE_PEM)<=0) { r = EINVAL; goto __fail_3; } /* verify private key */ if (!SSL_CTX_check_private_key(co->ctx)) { r = EINVAL; goto __fail_3; } /* assign allocated memory */ co->rpc_list = rpc_tree; co->chnl_tree = ch_tree; co->uuid = uuid; /* connect to the pointed server */ /* resolve host */ if(!(buf = malloc(__TMPBUFLEN))) { r = ENOMEM; goto __fail_3; } if(__resolvehost(host, buf, __TMPBUFLEN, &host_) != NETDB_SUCCESS) { r = ENOENT; free(buf); goto __fail_3; } /* create a socket */ sd = socket(PF_INET, SOCK_STREAM, 0); bzero(&addr, sizeof(addr)); /* try to connect it */ addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = *(uint32_t*)(host_->h_addr); free(host_); if (connect(sd, (struct sockaddr*)&addr, sizeof(addr)) != 0) { close(sd); free(buf); r = ENOENT; /* couldn't connect to the desired host */ goto __fail_3; } /* now we will create an SSL connection */ co->ssl = SSL_new(co->ctx); SSL_set_fd(co->ssl, sd); /* attach connected socket */ if(SSL_connect(co->ssl) == -1) { r = EBADE; free(buf); /* shutdown connection */ goto __fail_3; } /* if success we're ready to use established SSL channel */ /* auth and RPC contexts sync */ co->pctx = pctx; snprintf(buf, __TMPBUFLEN, "(auth-set-context ((:user \"%s\")(:passwd \"%s\")))", pctx->login, pctx->passwd); /* send an auth request */ SSL_write(co->ssl, buf, strlen(buf) + sizeof(char)); /* read the message reply */ bytes = __conn_read(co, buf, __TMPBUFLEN); if(bytes == -1) { // we've lost the connection co->flags &= ~CXCONN_ESTABL; r = ESXNOCONNECT; co->flags |= CXCONN_BROKEN; //__wake_up_waiters(co, ESXNOCONNECT); free(buf); /* shutdown connection */ goto __fail_3; } buf[bytes] = 0; /* perform an rpc call */ r = __eval_cstr(buf, conn_sys->system_rpc, (void *)co); if(!r) { /* all is fine security context is good */ snprintf(buf, __TMPBUFLEN, "(ch-get-types)"); /* now we should receive possible channel types */ SSL_write(co->ssl, buf, strlen(buf) + sizeof(char)); /* read the message reply */ bytes = __conn_read(co, buf, __TMPBUFLEN); if(bytes == -1) { // we've lost the connection co->flags &= ~CXCONN_ESTABL; co->flags |= CXCONN_BROKEN; r = ESXNOCONNECT; //__wake_up_waiters(co, ESXNOCONNECT); free(buf); /* shutdown connection */ goto __fail_3; } buf[bytes] = 0; /* perform an rpc call */ r = __eval_cstr(buf, conn_sys->system_rpc, (void *)co); } free(buf); /* now we can free the temporary buffer */ /* a listening thread creation (incoming messages) */ if(!r) { /* success let's start a listening thread */ r = pthread_create(&co->cthread, NULL, __cxslave_thread_listener, (void *)co); if(!r) { /* add connection to the list */ usrtc_node_init(&co->csnode, co); co->flags = (CXCONN_SLAVE | CXCONN_ESTABL); /* set the right flags */ pthread_rwlock_wrlock(&conn_sys->rwlock); usrtc_insert(conn_sys->connections, &co->csnode, (void *)co->uuid); pthread_rwlock_unlock(&conn_sys->rwlock); } r = pthread_create(&co->msgthread, NULL, __msg_queue_thread, (void *)co); if(r) goto __fail_3; pth_dqtpoll_run(tpoll); co->tpoll = tpoll; return 0; } __fail_3: pthread_mutex_destroy(&co->oplock); __fail_2: free(rpc_tree); __fail_1: free(ch_tree); __fail: free(uuid); return r; } int connection_create(conn_t *co, int sck) { int r = 0; int bytes = 0; char *uuid; char *buf = NULL; usrtc_t *ch_tree, *rpc_tree; pth_queue_t *rqueue = malloc(sizeof(pth_queue_t)); pth_queue_t *mqueue = malloc(sizeof(pth_queue_t)); pth_dqtpoll_t *tpoll = malloc(sizeof(pth_dqtpoll_t)); if(!tpoll) return ENOMEM; // TODO: fallback idx_allocator_t *idx_ch = malloc(sizeof(idx_allocator_t)); if(!co) return EINVAL; else memset(co, 0, sizeof(conn_t)); pth_dqtpoll_init(tpoll, __rpc_callback); // TODO: check it if(!idx_ch) return ENOMEM; else r = idx_allocator_init(idx_ch, MAX_CHANNELS*MAX_MULTI, 0); if(r) return r; if(!(uuid = __generate_uuid())) return ENOMEM; if(!(ch_tree = malloc(sizeof(usrtc_t)))) { r = ENOMEM; goto __fail; } if(!(rpc_tree = malloc(sizeof(usrtc_t)))) { r = ENOMEM; goto __fail_1; } if((r = pthread_mutex_init(&co->oplock, NULL))) goto __fail_2; if((r = pthread_rwlock_init(&co->chnl_lock, NULL))) goto __fail_3; usrtc_init(rpc_tree, USRTC_REDBLACK, MAX_RPC_LIST, __cmp_int); usrtc_init(ch_tree, USRTC_REDBLACK, MAX_CHANNELS, __cmp_ulong); co->idx_ch = idx_ch; /* assign message queue */ r = pth_queue_init(rqueue); if(r) goto __fail_3; co->rqueue = rqueue; /* assigned outbone message queue master also has this one */ r = pth_queue_init(mqueue); if(r) goto __fail_3; co->mqueue = mqueue; /* init SSL certificates and context */ co->ctx = SSL_CTX_new(TLSv1_2_server_method()); if(!co->ctx) { r = EINVAL; printf("%s:%d\n", __FUNCTION__, __LINE__);goto __fail_3; } else { /* set verify context */ SSL_CTX_set_verify(co->ctx, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, __verify_certcall); /* set verify depth */ SSL_CTX_set_verify_depth(co->ctx, VERIFY_DEPTH); } /* load certificates */ SSL_CTX_load_verify_locations(co->ctx, conn_sys->rootca, NULL); /* set the local certificate from CertFile */ if(SSL_CTX_use_certificate_file(co->ctx, conn_sys->certpem, SSL_FILETYPE_PEM)<=0) { printf("certpem1 = %s\n", conn_sys->certpem); ERR_print_errors_fp(stderr); r = EINVAL; printf("%s:%d\n", __FUNCTION__, __LINE__); goto __fail_3; } /* set the private key from KeyFile (may be the same as CertFile) */ if(SSL_CTX_use_PrivateKey_file(co->ctx, conn_sys->certkey, SSL_FILETYPE_PEM)<=0) { r = EINVAL; printf("%s:%d\n", __FUNCTION__, __LINE__); goto __fail_3; } /* verify private key */ if (!SSL_CTX_check_private_key(co->ctx)) { r = EINVAL; printf("%s:%d\n", __FUNCTION__, __LINE__); goto __fail_3; } /* assign allocated memory */ co->rpc_list = rpc_tree; co->chnl_tree = ch_tree; co->uuid = uuid; if(!(buf = malloc(__TMPBUFLEN))) { r = ENOMEM; goto __fail_3; } /* now we will create an SSL connection */ co->ssl = SSL_new(co->ctx); SSL_set_fd(co->ssl, sck); /* attach connected socket */ /* set the context to verify ssl connection */ SSL_set_ex_data(co->ssl, conn_sys->ex_ssldata_index, (void *)co); if(SSL_accept(co->ssl) == -1) { r = EBADE; free(buf); /* shutdown connection */ goto __fail_3; } /* if success we're ready to use established SSL channel */ printf("%s:%d\n", __FUNCTION__, __LINE__); BIO_set_nbio(SSL_get_rbio(co->ssl), 1); /*******************************************/ /*-=Protocol part of connection establish=-*/ /*******************************************/ while(!(co->flags & CXCONN_ESTABL)) { /* read the initiation stage connections */ bytes = __conn_read(co, buf, __TMPBUFLEN); if(bytes > 0) { buf[bytes] = 0; r = __eval_cstr(buf, conn_sys->system_rpc, (void *)co); printf("%s return %d (bytes %d)\n", buf, r, bytes); if(r) goto __fail_3; } else { printf("bytes = %d\n", bytes); if(bytes < 0) { printf("Terminate SSL connection, the other end is lost.\n"); co->flags &= ~CXCONN_ESTABL; co->flags |= CXCONN_BROKEN; //__wake_up_waiters(co, ESXNOCONNECT); r = ESXNOCONNECT; goto __fail_3; } } } /* before it will be done assign rpc list */ if(conn_sys->get_rpc_typed_list_tree) co->rpc_list = conn_sys->get_rpc_typed_list_tree(co); free(buf); r = pthread_create(&co->cthread, NULL, __cxmaster_thread_listener, (void *)co); if(!r) { /* add connection to the list */ usrtc_node_init(&co->csnode, co); co->flags |= CXCONN_MASTER; /* set the right flags */ pthread_rwlock_wrlock(&conn_sys->rwlock); usrtc_insert(conn_sys->connections, &co->csnode, (void *)co->uuid); pthread_rwlock_unlock(&conn_sys->rwlock); /* threads poll --- */ r = pthread_create(&co->msgthread, NULL, __msg_queue_thread, (void *)co); if(r) goto __fail_3; r = pthread_create(&co->rmsgthread, NULL, __rmsg_queue_thread, (void *)co); if(r) goto __fail_3; pth_dqtpoll_run(tpoll); co->tpoll = tpoll; } return r; __fail_3: pthread_mutex_destroy(&co->oplock); __fail_2: free(rpc_tree); __fail_1: free(ch_tree); __fail: free(uuid); return r; } int connection_close(conn_t *co) /* TODO: */ { return 0; } int connection_reinit(conn_t *co) /* TODO: the next version */ { return ENOSYS; } extern int __create_reg_msg(sxmsg_t **msg, chnl_t *ch); extern int __create_sys_msg(sxmsg_t **msg, char *uuid, chnl_t *ch, sxpayload_t *data);