diff --git a/include/Makefile.am b/include/Makefile.am index 7af89e1..cafb1ff 100644 --- a/include/Makefile.am +++ b/include/Makefile.am @@ -1 +1 @@ -nobase_include_HEADERS = sntl/pth_queue.h sntl/connection.h +nobase_include_HEADERS = sntl/sntllv2.h sntl/errno.h sntl/limits.h diff --git a/include/sntl/pth_queue.h b/include/sntl/pth_queue.h deleted file mode 100644 index 521e849..0000000 --- a/include/sntl/pth_queue.h +++ /dev/null @@ -1,117 +0,0 @@ -/* - * This is a proprietary software. See COPYING for further details. - * - * (c) 2013 Copyright Askele, inc. - * (c) 2013 Copyright Askele Ingria, inc. - * (c) 2014 Copyright Confident, inc. (granted permission to use in commercial software) - */ - -/** - * @file pth_queue.h - * @author Alexander Vdolainen - * @date 4 Nov 2013, 20 Dec 2014 (dynamic polls) - * @brief queue implementation for threads intercommunication - * - */ - -#ifndef __PTH_QUEUE_H__ -#define __PTH_QUEUE_H__ - -#include -#include - -/* possible message types, ones with POLL_ prefix valid on for pth_dqtpoll_* */ -#define SYS_MSG 0x0f0affee -#define USR_MSG 0x0afeeffe -#define POLL_DECREASE 0x0afafafe -#define POLL_INCREASE 0x0afaffff -#define NIL_MSG 0x0 -#define END_MSG 0xdead0000 - -/* max amount of threads within the poll */ -#define MAX_POLL_VALUE 32 - -typedef struct pth_msg_s { - void *data; /** < message payload */ - unsigned int msgtype; /** < message type ID */ - unsigned int qlength; /** < current queue length (actual on add moment), - * it makes no sense with few readers */ - usrtc_node_t node; -} pth_msg_t; - -typedef struct pth_queue_s { - unsigned int length; - /* sync */ - pthread_mutex_t mutex; - pthread_cond_t cond; - /* queue data */ - usrtc_t qtree; - /* cache */ - usrtc_t msgcache; -} pth_queue_t; - -int pth_queue_init(pth_queue_t *queue); - -int pth_queue_add(pth_queue_t *queue, void *data, unsigned int msgtype); - -int pth_queue_get(pth_queue_t *queue, const struct timespec *timeout, - pth_msg_t *msg); - -unsigned int pth_queue_length(pth_queue_t *queue); - -int pth_queue_destroy(pth_queue_t *queue, int freedata, - void (*free_msg)(void *)); - -/* dynamic queue thread poll ... bbrrr .... ok, ok with beer - * Dynamic queue thread poll is a queue like pth_queue, - * but also it has itäs own mamagement for threads - that's - * why dynamic. - * Ideally, the model is trying to achieve the following: - * 1. one thread in queue while no or very small amount of jobs in the queue - * 2. grow until max threads is reached while too many requests - * 3. gently slide down volume of threads after job heat - * 4. minimal additional drawbacks (i hate something periodically running, - * it's bad practice) - * The model is quite simple, we should make spurious wakeups equal to zero, - * if no - decrease poll value, and, if we don't have thread available - - * create it. - */ -typedef struct pth_dqtpoll_s { - pth_queue_t *queue; /** < Job queue */ - pthread_t *poll; /** < Thread descriptors */ - int (*jobdata_callback)(void *); /** < Callback to have a deal with data */ - int flags; /** < Flags */ - idx_allocator_t *idx; /** < index allocator for the poll threads */ - pthread_rwlock_t stats_lock; /** < rwlock for stats data */ - unsigned long spurious_wakeups; /** < amount of spurios wakeups */ - int poll_value; /** < value of the poll (totally) */ - struct timeval sched_time; - int msgop; -} pth_dqtpoll_t; - -/* flags for poll */ -#define DQTPOLL_RUNNING (1 << 1) /* poll is running */ -#define DQTPOLL_DEADSTAGE (1 << 2) /* poll in the stage of destroy */ - -/* keep it stupid */ -#define DQTPOLL_DELTAMS 500000 -#define DQTPOLL_DELTASE 0 - -/* init poll, structure must be allocated */ -int pth_dqtpoll_init(pth_dqtpoll_t*, int (*jobdata_callback)(void *)); - -/* run poll: poll */ -int pth_dqtpoll_run(pth_dqtpoll_t*); - -/* add the job to the queue: poll, job data, message type */ -int pth_dqtpoll_add(pth_dqtpoll_t*, void*, unsigned int); - -/* destroy the poll: poll, force flag - * if force flag is set (!= 0), give up - * about jobs, if no, do the job, but don't - * accept the new ones, and destroy all poll - * with last thread. - */ -int pth_dqtpoll_destroy(pth_dqtpoll_t*, int); - -#endif /* __PTH_QUEUE_H__ */ diff --git a/lib/Makefile.am b/lib/Makefile.am index bda15f2..5e9e6a7 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -14,7 +14,7 @@ lib_LTLIBRARIES = libsntllv2.la libsntllv2_la_SOURCES = \ - connex.c sntllv2.c chansx.c messagesx.c rpclist.c + connex.c sntllv2.c chansx.c messagesx.c rpclist.c uuid.c libsntllv2_la_LDFLAGS = -Wl,--export-dynamic diff --git a/lib/chansx.c b/lib/chansx.c index 7f14707..cc2df7e 100644 --- a/lib/chansx.c +++ b/lib/chansx.c @@ -40,6 +40,8 @@ #include +#include "internal.h" + /* locally used functions */ uint8_t _channel_open(conn_t *co, uint16_t *chid) { diff --git a/lib/connection.c b/lib/connection.c deleted file mode 100644 index 5b4f5e5..0000000 --- a/lib/connection.c +++ /dev/null @@ -1,2477 +0,0 @@ -/* - * Secure Network Transport Layer Library v2 implementation. - * (sntllv2) it superseed all versions before due to the: - * - memory consumption - * - new features such as pulse emitting - * - performance optimization - * - * This is a proprietary software. See COPYING for further details. - * - * (c) Askele Group 2013-2015 - * - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#ifdef WIN32 -#include -#define EBADE 1 -#define NETDB_SUCCESS 0 -#else -#include -#include -#include -#include -#endif - -#include -#include - -#include -#include - -#include - -struct __rpc_job -{ - sxmsg_t *msg; - sexp_t *sx; - int (*rpcf) (void *, sexp_t *); -}; - -conn_sys_t *conn_sys = NULL; - -static int ex_ssldata_index; /** < index used to work with additional data - * provided to the special call during SSL handshake */ - -int sntl_init(void) -{ - /* init SSL library */ - SSL_library_init(); - - OpenSSL_add_all_algorithms(); - SSL_load_error_strings(); - - ex_ssldata_index = SSL_get_ex_new_index(0, "__ssldata index", NULL, NULL, NULL); - - return 0; -} - -static long __cmp_ulong(const void *a, const void *b); - -/* message alloc and destroy */ -extern sxmsg_t *__allocate_msg(int *res); -extern void __destroy_msg(sxmsg_t *msg); - -/* connections */ -static void __connections_subsystem_connection_remove(conn_t *); -static void __connection_free(conn_t *co); - -/* examination */ -static inline int __exam_connection(conn_t *co) -{ - int r = 0; - - pthread_mutex_lock(&co->oplock); - if(co->flags | CXCONN_BROKEN) { - /* wake up all */ - /* destroy thread poll */ - /* free all memory and sync primitives */ - r = 1; - } - pthread_mutex_unlock(&co->oplock); - - return r; -} - -static int __rpc_callback(void *data) -{ - struct __rpc_job *job = (struct __rpc_job *)data; - int rc = 0; - - rc = job->rpcf((void *)(job->msg), job->sx); - - free(job); - - return rc; -} - -extern int __alloc_channel(ulong_t cid, conn_t *co, rpc_typed_list_t *rlist, - chnl_t **channel); - -static int __conn_read(conn_t *co, void *buf, size_t buf_len) -{ - int rfd = SSL_get_fd(co->ssl), r; - fd_set readset, writeset; - int ofcmode, read_blocked = 0, read_blocked_on_write = 0; - - /* First we make the socket nonblocking */ -#ifndef WIN32 - ofcmode = fcntl(rfd, F_GETFL,0); - ofcmode |= O_NDELAY; - if(fcntl(rfd, F_SETFL, ofcmode)) - fprintf(stderr, "Couldn't make socket nonblocking"); -#endif - - __retry: - - do { - __try_again: - r = SSL_read(co->ssl, buf, (int)buf_len); - switch(SSL_get_error (co->ssl, r)) { - case SSL_ERROR_NONE: - return r; - break; - case SSL_ERROR_WANT_READ: - /* get prepare to select */ - read_blocked = 1; - break; - case SSL_ERROR_WANT_WRITE: /* here we blocked on write */ - read_blocked_on_write = 1; - break; - case SSL_ERROR_SYSCALL: - if(errno == EAGAIN || errno == EINTR) goto __try_again; - else { - fprintf(stderr, "SSL syscall error.\n"); - goto __close_conn; - } - break; - case SSL_ERROR_WANT_CONNECT: - case SSL_ERROR_WANT_ACCEPT: - fprintf(stderr, "SSL negotiation required. Trying again.\n"); - goto __try_again; - break; - case SSL_ERROR_SSL: - fprintf(stderr, "SSL error occured. Connection will be closed.\n"); - goto __close_conn; - break; - case SSL_ERROR_ZERO_RETURN: - fprintf(stderr, "SSL connection is cleary closed.\n"); - default: - __close_conn: - fprintf(stderr, "(RD)Unknown error on %s (errno = %d)\n", co->uuid, errno); - return -1; - } - } while(SSL_pending(co->ssl) && !read_blocked); - - __select_retry: - - if(read_blocked) { - FD_ZERO(&readset); - FD_SET(rfd, &readset); - /* waits until something will be ready to read */ - r = select(rfd + 1, &readset, NULL, NULL, NULL); - if(r < 0) { - if(errno == EINTR || errno == EAGAIN) goto __select_retry; - printf("(RD) select (%d) on %s\n", errno, co->uuid); - return -1; - } - if(!r) { - printf("Nothing to wait for\n"); - return 0; - } - read_blocked = 0; - if(r && FD_ISSET(rfd, &readset)) goto __retry; /* try to read again */ - } - if(read_blocked_on_write) { /* we was blocked on write */ - FD_ZERO(&readset); - FD_ZERO(&writeset); - FD_SET(rfd, &readset); - FD_SET(rfd, &writeset); - - r = select(rfd + 1, &readset, &writeset, NULL, NULL); - read_blocked_on_write = 0; - if(r && FD_ISSET(rfd, &writeset)) goto __retry; - } - - return 0; -} - -static int __conn_write(conn_t *co, void *buf, size_t buf_len) -{ - int r, rfd = SSL_get_fd(co->ssl); - fd_set writeset; - - pthread_mutex_lock(&(co->oplock)); - __retry: - r = SSL_write(co->ssl, buf, (int)buf_len); - switch(SSL_get_error(co->ssl, r)) { - case SSL_ERROR_WANT_READ: - case SSL_ERROR_WANT_WRITE: - /* here we should block */ - FD_ZERO(&writeset); - FD_SET(rfd, &writeset); - r = select(rfd + 1, NULL, &writeset, NULL, NULL); - if(r && FD_ISSET(rfd, &writeset)) goto __retry; - break; - case SSL_ERROR_SYSCALL: - if(errno == EAGAIN || errno == EINTR) goto __retry; - else goto __close_conn; - break; - default: - pthread_mutex_unlock(&(co->oplock)); - __close_conn: - if(r < 0) { - fprintf(stderr, "(WR)Unknown error on %s (%d)\n", co->uuid, r); - return -1; - } else return 0; - } - pthread_mutex_unlock(&(co->oplock)); - - return 0; -} - -static long __cmp_cstr(const void *a, const void *b) -{ - return strcmp((char *)a, (char *)b); -} - -static long __cmp_int(const void *a, const void *b) -{ - return *(int *)a - *(int *)b; -} - -static long __cmp_ulong(const void *a, const void *b) -{ - return *(ulong_t *)a - *(ulong_t *)b; -} - -extern int __resolvehost(const char *hostname, char *buf, int buf_len, - struct hostent **rhp); - -static void __destroy_rpc_list_tree(usrtc_t *tree) -{ - usrtc_node_t *node; - cx_rpc_t *ent; - - for(node = usrtc_first(tree); node != NULL; node = usrtc_first(tree)) { - ent = (cx_rpc_t *)usrtc_node_getdata(node); - usrtc_delete(tree, node); - free(ent->name); - free(ent); - } - - return; -} - -static int __insert_rpc_function(usrtc_t *tree, const char *name, int (*rpcf)(void *, sexp_t *)) -{ - cx_rpc_t *ent = malloc(sizeof(cx_rpc_t)); - usrtc_node_t *node; - - if(!ent) return ENOMEM; - else node = &ent->node; - - if(!(ent->name = strdup(name))) { - free(ent); - return ENOMEM; - } else ent->rpcf = rpcf; - - usrtc_node_init(node, ent); - usrtc_insert(tree, node, ent->name); - - return 0; -} - -/* wake up all waiters on messages with given opcode */ -static void __wake_up_waiters(conn_t *co, int opcode) -{ - usrtc_node_t *node = NULL, *last_node = NULL; - usrtc_node_t *msg_node = NULL, *last_msg_node = NULL; - chnl_t *ch; - sxmsg_t *smsg = NULL; - - pthread_rwlock_wrlock(&(co->chnl_lock)); - - if(!co->chnl_tree) goto __skip; - node = usrtc_first(co->chnl_tree); - last_node = usrtc_last(co->chnl_tree); - - /* going through channels tree */ - while(!usrtc_isempty(co->chnl_tree)) { - ch = (chnl_t *)usrtc_node_getdata(node); - - pthread_rwlock_rdlock(&(ch->msglock)); - msg_node = usrtc_first(ch->msgs_tree); - last_msg_node = usrtc_last(ch->msgs_tree); - - while(!usrtc_isempty(ch->msgs_tree)) { /* messages bypassing */ - smsg = (sxmsg_t *)usrtc_node_getdata(msg_node); - smsg->opcode = opcode; - - /* wake up waiting thread */ - pthread_mutex_unlock(&(smsg->wait)); - - if(msg_node == last_msg_node) break; - msg_node = usrtc_next(ch->msgs_tree, msg_node); - } - - pthread_rwlock_unlock(&(ch->msglock)); - - if(node == last_node) break; - node = usrtc_next(co->chnl_tree, node); - } - - __skip: - pthread_rwlock_unlock(&(co->chnl_lock)); - - return; -} - -/* (!) NOTE: this call use only after all threads are dead ! */ -static void __destroy_all_channels(conn_t *co) -{ - usrtc_node_t *node = NULL; - chnl_t *ch; - - for(node = usrtc_first(co->chnl_tree); node != NULL; node = - usrtc_first(co->chnl_tree)) { - ch = (chnl_t *)usrtc_node_getdata(node); - - /* free allocated resources */ - if(ch->uuid) free(ch->uuid); - idx_allocator_destroy(ch->idx_msg); /* allocator */ - free(ch->idx_msg); - free(ch->msgs_tree); - /* locks */ - pthread_mutex_destroy(&(ch->oplock)); - pthread_rwlock_destroy(&(ch->msglock)); - - /* remove it */ - usrtc_delete(co->chnl_tree, node); - - /* free */ - free(ch); - } - - return; -} - -static int __default_auth_set_context(void *cctx, sexp_t *sx) -{ - conn_t *co = (conn_t *)cctx; - conn_sys_t *ssys = co->ssys; - char *val, *var, *tbuf = NULL; - sexp_t *lsx, *sx_iter, *sx_in; - int llen, idx, err = 0; - - //co->pctx = malloc(sizeof(perm_ctx_t)); - - /* skip keyword itself */ - lsx = sx->list->next; - /* now we expect a list of lists */ - if(lsx->ty != SEXP_LIST) { - err = ESXRCBADPROT; - goto __reply; - } - /* take length of the list */ - llen = sexp_list_length(lsx); - if(!llen) return 0; /* other side will not set any security attributes */ - SEXP_ITERATE_LIST(lsx, sx_iter, idx) { - if(SEXP_IS_LIST(sx_iter)) { - sexp_list_car(sx_iter, &sx_in); - - if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { - err = ESXRCBADPROT; - goto __reply; - } else val = sx_in->val; - - if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */ - - sexp_list_cdr(sx_iter, &sx_in); - if(!SEXP_IS_TYPE(sx_in, SEXP_DQUOTE)) { - err = ESXRCBADPROT; - goto __reply; - } else var = sx_in->val; - - /* ok, now we need to analyze parameters */ - if(!strcmp(val, ":user") && var) { - co->pctx->login = strdup(var); - } else if(!strcmp(val, ":passwd") && var) { - co->pctx->passwd = strdup(var); - } else { - /* just ignore in default implementation */ - } - } else continue; /* ignore */ - } - - /* ok, now we need to fill security context */ - tbuf = malloc(2048); - if(!tbuf) { - err = ENOMEM; - goto __reply; - } - if(ssys->secure_check) - err = ssys->secure_check(co); - -__reply: - if(err) { - snprintf(tbuf, 2048, "(auth-set-error (%d))", err); - } else { - snprintf(tbuf, 2048, "(auth-set-attr (:attr %d)(:uid %ld)(:gid %ld))", - co->pctx->p_attr, co->pctx->uid, co->pctx->gid); - } - /* we will send it */ - if(__conn_write(co, tbuf, strlen(tbuf)) < 0) { - co->flags &= ~CXCONN_ESTABL; - co->flags |= CXCONN_BROKEN; - __wake_up_waiters(co, ESXNOCONNECT); - } - destroy_sexp(sx); - - free(tbuf); - return err; -} - -static int __default_auth_set_attr(void *cctx, sexp_t *sx) -{ - conn_t *co = (conn_t *)cctx; - char *val, *var; - sexp_t *lsx, *sx_iter, *sx_in; - int llen, idx, r = 0; - - /* skip keyword itself */ - lsx = sx->list->next; - /* now we expect a list of lists */ - if(lsx->ty != SEXP_LIST) { - r = ESXRCBADPROT; - goto __finish; - } - /* take length of the list */ - llen = sexp_list_length(lsx); - if(!llen) return 0; /* other side will not set any security attributes */ - SEXP_ITERATE_LIST(lsx, sx_iter, idx) { - if(SEXP_IS_LIST(sx_iter)) { - sexp_list_car(sx_iter, &sx_in); - - if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { - r = ESXRCBADPROT; - goto __finish; - } else val = sx_in->val; - - if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */ - - sexp_list_cdr(sx_iter, &sx_in); - - if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { - r = ESXRCBADPROT; - goto __finish; - } else var = sx_in->val; - - /* ok, now we need to analyze parameters */ - if(!strcmp(val, ":attr")) { - co->pctx->p_attr = atoi(var); - } else if(!strcmp(val, ":uid")) { - co->pctx->uid = (ulong_t)atoll(var); - } else if(!strcmp(val, ":gid")) { - co->pctx->gid = (ulong_t)atoll(var); - } else { - /* just ignore in default implementation */ - } - } else continue; /* ignore */ - } - -__finish: - destroy_sexp(sx); - return r; -} - -static int __default_auth_set_error(void *cctx, sexp_t *sx) -{ - char *errstr = NULL; - int r; - - /* skip keyword itself */ - sx->list = sx->list->next; - /* be sure - this is a list */ - if(sx->ty != SEXP_LIST) return ESXRCBADPROT; - else sx = sx->list; /* get it */ - errstr = sx->list->val; - r = atoi(errstr); - - destroy_sexp(sx); - return r; -} - -static int __default_ch_get_types(void *cctx, sexp_t *sx) -{ - conn_t *co = (conn_t *)cctx; - conn_sys_t *ssys = co->ssys; - usrtc_node_t *node; - rpc_typed_list_t *list_ent; - char *tbuf = malloc(4096), *tt; - int err = 0; - - /* if we cannot allocate anything ... */ - if(!tbuf) return ENOMEM; - /* ok here we go */ - co->rpc_list = ssys->get_rpc_typed_list_tree(co); - /* ok, here we're don't need to parse anything */ - if(!usrtc_count(co->rpc_list)) { - err = ENXIO; - snprintf(tbuf, 4096, "(ch-gl-error (%d))", err); - } else { - tt = tbuf; - snprintf(tt, 4096, "(ch-set-types ("); - tt += strlen(tt); - for(node = usrtc_first(co->rpc_list); node != NULL; - node = usrtc_next(co->rpc_list, node), tt += strlen(tt)) { - list_ent = (rpc_typed_list_t *)usrtc_node_getdata(node); - snprintf(tt, 4096, "(:%d \"%s\")", list_ent->type_id, list_ent->description); - } - snprintf(tt, 4096, "))"); - } - - /* reply to this rpc */ - if(__conn_write(co, tbuf, strlen(tbuf)) < 0) { - co->flags &= ~CXCONN_ESTABL; - co->flags |= CXCONN_BROKEN; - __wake_up_waiters(co, ESXNOCONNECT); - } - free(tbuf); - destroy_sexp(sx); - - return err; -} - -static int __default_ch_set_types(void *cctx, sexp_t *sx) -{ - conn_t *co = (conn_t *)cctx; - conn_sys_t *ssys = co->ssys; - char buf[1024], *val, *var; - int r = 0, llen, typeid, idx; - sexp_t *lsx, *sx_iter, *sx_in; - - /* skip keyword itself */ - lsx = sx->list->next; - /* now we expect a list of lists */ - if(lsx->ty != SEXP_LIST) { - r = ESXRCBADPROT; - goto __send_reply; - } - - /* take length of the list */ - llen = sexp_list_length(lsx); - - if(!llen) return 0; /* other side will not set any security attributes */ - SEXP_ITERATE_LIST(lsx, sx_iter, idx) { - if(SEXP_IS_LIST(sx_iter)) { - sexp_list_car(sx_iter, &sx_in); - - if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { - r = ESXRCBADPROT; - goto __send_reply; - } else val = sx_in->val; - - if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */ - - sexp_list_cdr(sx_iter, &sx_in); - - if(!SEXP_IS_TYPE(sx_in, SEXP_DQUOTE)) { - r = ESXRCBADPROT; - goto __send_reply; - } else var = sx_in->val; - - /* ok, now we need to analyze parameters */ - if(*val != ':') { - r = ESXRCBADPROT; - goto __send_reply; - } else { - if(ssys->set_typed_list_callback) { - typeid = atoi((char *)(val + sizeof(char))); - if(ssys->set_typed_list_callback(co, typeid, var)) { - destroy_sexp(sx); - return ENXIO; - } - } /* FIXME: if no function, accept or decline ? */ - } - } else continue; /* ignore */ - } - - __send_reply: - snprintf(buf, 1024, "(ch-gl-error (%d))", r); - - if(__conn_write(co, buf, strlen(buf)) < 0) { - co->flags &= ~CXCONN_ESTABL; - co->flags |= CXCONN_BROKEN; - __wake_up_waiters(co, ESXNOCONNECT); - } - destroy_sexp(sx); - - return r; -} - -static int __default_ch_gl_error(void *cctx, sexp_t *sx) -{ - int r; - char *errstr; - conn_t *co = (conn_t *)cctx; - - if(co->flags & CXCONN_ESTABL) return EINVAL; /* error, we're already have channels list */ - - /* skip keyword itself */ - sx->list = sx->list->next; - /* be sure - this is a list */ - if(sx->ty != SEXP_LIST) return ESXRCBADPROT; - else sx = sx->list; /* get it */ - errstr = sx->list->val; - r = atoi(errstr); - - if(!r) co->flags |= CXCONN_ESTABL; - - return r; -} - -static int __default_ch_open(void *cctx, sexp_t *sx) -{ - conn_t *co = (conn_t *)cctx; - usrtc_node_t *node; - char *val, *var, *uuid = NULL, *buf; - int typ = -1, idx, llen, r; - ulong_t cid; - sexp_t *lsx, *sx_iter, *sx_in; - rpc_typed_list_t *rlist; - chnl_t *channel; - - /* skip keyword itself */ - lsx = sx->list->next; - /* now we expect a list of lists */ - if(lsx->ty != SEXP_LIST) { - printf("%s:%d\n", __FUNCTION__, __LINE__); - r = ESXRCBADPROT; - goto __send_repl; - } - - /* take length of the list */ - llen = sexp_list_length(lsx); - if(!llen) return 0; /* other side will not set any security attributes */ - SEXP_ITERATE_LIST(lsx, sx_iter, idx) { - if(SEXP_IS_LIST(sx_iter)) { - sexp_list_car(sx_iter, &sx_in); - - if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { - printf("%s:%d\n", __FUNCTION__, __LINE__); - r = ESXRCBADPROT; - goto __send_repl; - } else val = sx_in->val; - - if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */ - - sexp_list_cdr(sx_iter, &sx_in); - - if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { - r = ESXRCBADPROT; - printf("%s:%d\n", __FUNCTION__, __LINE__); - goto __send_repl; - } else var = sx_in->val; - - /* ok, now we need to analyze parameters */ - if(*val != ':') { - r = ESXRCBADPROT; - goto __send_repl; - } else { - if(!strcmp((char *)(val + sizeof(char)), "type")) - typ = atoi(var); - else if(!strcmp((char *)(val + sizeof(char)), "id")) - cid = atoll(var); - else if(!strcmp((char *)(val + sizeof(char)), "uuid")) - uuid = var; - } - } else continue; /* ignore */ - } - - /* additional check for type of the channel */ - node = usrtc_lookup(co->rpc_list, &typ); - if(!node) { - r = ESXNOCHANSUP; - /* printf("%s:%d (usrtc count: %d) (typ %d)\n", __FUNCTION__, __LINE__, - usrtc_count(co->rpc_list), typ);*/ - node = usrtc_first(co->rpc_list); - rlist = (rpc_typed_list_t *)usrtc_node_getdata(node); - printf("---- rlist->type_id = %d\n", rlist->type_id); - goto __send_repl; - } else rlist = (rpc_typed_list_t *)usrtc_node_getdata(node); - - /* now we need to check up the channel */ - pthread_rwlock_rdlock(&(co->chnl_lock)); - node = usrtc_lookup(co->chnl_tree, &cid); - if(node) { - pthread_rwlock_unlock(&(co->chnl_lock)); - r = EEXIST; - goto __send_repl; - } else { - idx_reserve(co->idx_ch, cid); - pthread_rwlock_unlock(&(co->chnl_lock)); /* now we should alloc channel */ - if((r = __alloc_channel(cid, co, rlist, &channel))) { - pthread_rwlock_wrlock(&(co->chnl_lock)); - idx_free(co->idx_ch, cid); - pthread_rwlock_unlock(&(co->chnl_lock)); - goto __send_repl; - } else { - /* now we ready to confirm channel creation */ - pthread_rwlock_wrlock(&(co->chnl_lock)); - usrtc_insert(co->chnl_tree, &(channel->node), &(channel->cid)); - pthread_rwlock_unlock(&(co->chnl_lock)); - r = 0; - } - } - - __send_repl: - buf = malloc(2048); - snprintf(buf, 2048, "(ch-open-ret ((:error %d)(:uuid %s)(:id %ld)))", r, - uuid, cid); - if(__conn_write(co, buf, strlen(buf)) < 0) { - co->flags &= ~CXCONN_ESTABL; - co->flags |= CXCONN_BROKEN; - __wake_up_waiters(co, ESXNOCONNECT); - } - destroy_sexp(sx); - free(buf); - - return r; -} - -static int __default_ch_open_ret(void *cctx, sexp_t *sx) -{ - conn_t *co = (conn_t *)cctx; - chnl_t *chan; - usrtc_node_t *node; - int err = 0, r, llen, idx; - ulong_t id; - char *val, *var; - sexp_t *lsx, *sx_iter, *sx_in; - sxmsg_t *sms = NULL; - - /* skip keyword itself */ - lsx = sx->list->next; - /* now we expect a list of lists */ - if(lsx->ty != SEXP_LIST) { - //printf("%s:%d\n", __FUNCTION__, __LINE__); - r = ESXRCBADPROT; - goto __mark_msg; - } - /* take length of the list */ - llen = sexp_list_length(lsx); - if(!llen) return EINVAL; /* !! other side will not set any security attributes */ - SEXP_ITERATE_LIST(lsx, sx_iter, idx) { - if(SEXP_IS_LIST(sx_iter)) { - sexp_list_car(sx_iter, &sx_in); - - if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { - r = ESXRCBADPROT; - goto __mark_msg; - } else val = sx_in->val; - - if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */ - - sexp_list_cdr(sx_iter, &sx_in); - - if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { - r = ESXRCBADPROT; - goto __mark_msg; - } else var = sx_in->val; - - /* ok, now we need to analyze parameters */ - if(*val != ':') { - r = ESXRCBADPROT; - goto __mark_msg; - } else { - if(!strcmp((char *)(val + sizeof(char)), "error")) - err = atoi(var); - else if(!strcmp((char *)(val + sizeof(char)), "id")) - id = atoll(var); - } - } else continue; /* ignore */ - } - - /* try to find desired channel to intercept message */ - pthread_rwlock_rdlock(&(co->chnl_lock)); - node = usrtc_lookup(co->chnl_tree, (void *)&id); - //printf("channels (%d)\n", usrtc_count(co->chnl_tree)); - pthread_rwlock_unlock(&(co->chnl_lock)); - if(node) { - chan = (chnl_t *)usrtc_node_getdata(node); - sms = chan->sysmsg; - } - - __mark_msg: - if(!sms) return r; - sms->flags &= ~ESXMSG_PENDING; /* the message is done */ - sms->opcode = err; - - destroy_sexp(sx); - - /* unlock mutex to wake up the waiting thread */ - pthread_mutex_unlock(&(sms->wait)); - - return 0; -} - -static int __default_ch_close(void *cctx, sexp_t *sx) -{ - conn_t *co = (conn_t *)cctx; - usrtc_node_t *node; - char *val, *var, *buf; - int idx, llen, r; - ulong_t cid = -1; - sexp_t *lsx, *sx_iter, *sx_in; - chnl_t *channel = NULL; - - r = 0; - /* skip keyword itself */ - lsx = sx->list->next; - /* now we expect a list of lists */ - if(lsx->ty != SEXP_LIST) { - printf("%s:%d\n", __FUNCTION__, __LINE__); - r = ESXRCBADPROT; - goto __send_repl; - } - - /* take length of the list */ - llen = sexp_list_length(lsx); - if(!llen) return 0; /* other side will not set any security attributes */ - SEXP_ITERATE_LIST(lsx, sx_iter, idx) { - if(SEXP_IS_LIST(sx_iter)) { - continue; - } - if(!SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { - printf("%s:%d\n", __FUNCTION__, __LINE__); - r = ESXRCBADPROT; - goto __send_repl; - } else val = sx_iter->val; - - sx_in = sx_iter->next; - - if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { - r = ESXRCBADPROT; - printf("%s:%d\n", __FUNCTION__, __LINE__); - goto __send_repl; - } else var = sx_in->val; - - /* ok, now we need to analyze parameters */ - if(*val != ':') { - r = ESXRCBADPROT; - goto __send_repl; - } else { - if(!strcmp((char *)(val + sizeof(char)), "id")) { - cid = atoll(var); - break; - } - } - } - - /* additional check for type of the channel */ - pthread_rwlock_rdlock(&(co->chnl_lock)); - node = usrtc_lookup(co->chnl_tree, &cid); - pthread_rwlock_unlock(&(co->chnl_lock)); - if(!node) { - r = ENOENT; - /* there are no such channel exist */ - destroy_sexp(sx); - goto __send_repl; - } - channel = (chnl_t *)usrtc_node_getdata(node); - - /* check up the message queue */ - pthread_rwlock_rdlock(&(channel->msglock)); - if(usrtc_count(channel->msgs_tree)) { - /* we have some undelivered messages in the queue */ - destroy_sexp(sx); - r = EBUSY; - goto __send_repl; - } - pthread_rwlock_unlock(&(channel->msglock)); - - /* remove channel from the search tree */ - pthread_rwlock_wrlock(&(co->chnl_lock)); - usrtc_delete(co->chnl_tree, &(channel->node)); - /* free index */ - idx_free(co->idx_ch, channel->cid); - pthread_rwlock_unlock(&(co->chnl_lock)); - - idx_allocator_destroy(channel->idx_msg); - free(channel->idx_msg); - free(channel->msgs_tree); - pthread_mutex_destroy(&(channel->oplock)); - pthread_rwlock_destroy(&(channel->msglock)); - free(channel); - - destroy_sexp(sx); - - __send_repl: - buf = malloc(2048); - snprintf(buf, 2048, "(ch-close-ret ((:id %ld) (:error %d)))", cid, r); - - if(__conn_write(co, buf, strlen(buf)) < 0) { - co->flags &= ~CXCONN_ESTABL; - co->flags |= CXCONN_BROKEN; - __wake_up_waiters(co, ESXNOCONNECT); - } - free(buf); - - return 0; -} - -static int __default_ch_close_ret(void *cctx, sexp_t *sx) -{ - conn_t *co = (conn_t *)cctx; - chnl_t *chan; - usrtc_node_t *node; - int err = 0, r, llen, idx; - ulong_t id; - char *val, *var; - sexp_t *lsx, *sx_iter, *sx_in; - sxmsg_t *sms = NULL; - - /* skip keyword itself */ - lsx = sx->list->next; - /* now we expect a list of lists */ - if(lsx->ty != SEXP_LIST) { - r = ESXRCBADPROT; - goto __mark_msg; - } - /* take length of the list */ - llen = sexp_list_length(lsx); - if(!llen) return EINVAL; /* !! other side will not set any security attributes */ - - SEXP_ITERATE_LIST(lsx, sx_iter, idx) { - if(SEXP_IS_LIST(sx_iter)) { - sexp_list_car(sx_iter, &sx_in); - - if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { - r = ESXRCBADPROT; - goto __mark_msg; - } else val = sx_in->val; - - if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */ - - sexp_list_cdr(sx_iter, &sx_in); - - if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { - r = ESXRCBADPROT; - goto __mark_msg; - } else var = sx_in->val; - - /* ok, now we need to analyze parameters */ - if(*val != ':') { - r = ESXRCBADPROT; - goto __mark_msg; - } else { - if(!strcmp((char *)(val + sizeof(char)), "error")) - err = atoi(var); - else if(!strcmp((char *)(val + sizeof(char)), "id")) - id = atoll(var); - } - } else continue; /* ignore */ - } - - /* try to find desired channel to intercept message */ - pthread_rwlock_rdlock(&(co->chnl_lock)); - node = usrtc_lookup(co->chnl_tree, (void *)&id); - pthread_rwlock_unlock(&(co->chnl_lock)); - - if(node) { - chan = (chnl_t *)usrtc_node_getdata(node); - sms = chan->sysmsg; - } - - __mark_msg: - - if(!sms) return r; - sms->flags &= ~ESXMSG_PENDING; /* the message is done */ - sms->opcode = err; - - destroy_sexp(sx); - /* unlock mutex to wake up the waiting thread */ - pthread_mutex_unlock(&(sms->wait)); - - return 0; -} - -/* create a nould of the message */ -static int __create_reg_msg_mould(sxmsg_t **msg, chnl_t *ch, ulong_t mid) -{ - int r = 0; - sxmsg_t *sm = __allocate_msg(&r); - - if(r) return r; - else { - sm->pch = ch; - sm->flags = (ESXMSG_USR | ESXMSG_PENDING); - sm->mid = mid; - - /* ok reserve message ID */ - pthread_mutex_lock(&(ch->oplock)); - idx_reserve(ch->idx_msg, mid); - pthread_mutex_unlock(&(ch->oplock)); - - pthread_mutex_lock(&(sm->wait)); - *msg = sm; - } - - return 0; -} - -static int __default_msg(void *cctx, sexp_t *sx) -{ - conn_t *co = (conn_t *)cctx; - usrtc_node_t *node = NULL; - chnl_t *chan = NULL; - int r = 0; - sexp_t *lsx = NULL, *sx_iter = NULL; - sexp_t *sx_sublist = NULL, *sx_value = NULL; - ulong_t chnl_id = -1; - ulong_t msg_id = -1; - sexp_t *msg = NULL; - sxmsg_t *smsg = NULL; - int idx; - - /* get parameters from the message */ - if(sexp_list_cdr(sx, &lsx)) return ESXRCBADPROT; - if(!SEXP_IS_LIST(lsx)) return ESXRCBADPROT; - - /* find channel id */ - SEXP_ITERATE_LIST(lsx, sx_iter, idx) { - if(SEXP_IS_LIST(sx_iter)) { - sx_sublist = sx_iter; - continue; - } else { - if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { - if(strcmp(sx_iter->val, ":chid")) { - continue; // ignore it - } - sx_value = sx_iter->next; - if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { - continue; - } - chnl_id = atol(sx_value->val); - } else continue; // ignore it - } - } - lsx = sx_sublist; - /* find message id */ - SEXP_ITERATE_LIST(lsx, sx_iter, idx) { - if(SEXP_IS_LIST(sx_iter)) { - msg = sx_iter; - continue; - } else { - if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { - if(strcmp(sx_iter->val, ":msgid")) { - continue; // ignore - } - sx_value = sx_iter->next; - if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { - continue; - } - msg_id = atol(sx_value->val); - } else continue; // ignore it - } - } - - if(msg_id < 0 || chnl_id < 0) { - return ESXRCBADPROT; - } - /* find channel */ - if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT; - else chan = (chnl_t *)usrtc_node_getdata(node); - /* lookup for the message */ - pthread_rwlock_rdlock(&(chan->msglock)); - if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) { - pthread_rwlock_unlock(&(chan->msglock)); - /* here, rpc lookup has no sense, just put this job to the queue */ - /* btw, we're need to create a message first */ - r = __create_reg_msg_mould(&smsg, chan, msg_id); - if(r) return r; - - /* assign the message */ - smsg->opcode = 0; - smsg->payload = (void *)msg; - - /* assign initial S-expression structure */ - smsg->initial_sx = sx; - - /* put the message to the search tree */ - pthread_rwlock_wrlock(&(chan->msglock)); - usrtc_insert(chan->msgs_tree, &(smsg->pendingq_node), &(smsg->mid)); - pthread_rwlock_unlock(&(chan->msglock)); - } else { - pthread_rwlock_unlock(&(chan->msglock)); - smsg = (sxmsg_t *)usrtc_node_getdata(node); - msg_return(smsg, EEXIST); - return EEXIST; - } - /* put job to the queue and give up */ - r = pth_queue_add(co->rqueue, (void *)smsg, USR_MSG); - if(r) { /* cannot put job to the queue */ - msg_return(smsg, r); - pthread_rwlock_wrlock(&(chan->msglock)); - usrtc_delete(chan->msgs_tree, &(smsg->pendingq_node)); - pthread_rwlock_unlock(&(chan->msglock)); - __destroy_msg(smsg); - return r; - } - - /* put to the IN queue */ - return r; -} - -/* TODO: optimize amount of code: (must be first) - * __default_msg_return - * __default_msg_reply - * there are many copy-n-paste code!!!!! - */ - -static int __default_msg_return(void *cctx, sexp_t *sx) -{ - conn_t *co = (conn_t *)cctx; - usrtc_node_t *node = NULL; - chnl_t *chan = NULL; - int r = 0; - sexp_t *lsx = NULL, *sx_iter = NULL; - sexp_t *sx_sublist = NULL, *sx_value = NULL; - ulong_t chnl_id = -1; - ulong_t msg_id = -1; - sexp_t *msg = NULL; - sxmsg_t *smsg = NULL; - int idx, opcode; - - /* get parameters from the message */ - if(sexp_list_cdr(sx, &lsx)) { - r = ESXRCBADPROT; - goto __finish; - } - if(!SEXP_IS_LIST(lsx)) { - r = ESXRCBADPROT; - goto __finish; - } - - /* get parameters */ - SEXP_ITERATE_LIST(lsx, sx_iter, idx) { - if(SEXP_IS_LIST(sx_iter)) { - sx_sublist = sx_iter; - continue; - } else { - if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { - if(strcmp(sx_iter->val, ":chid")) { - continue; // ignore it - } - sx_value = sx_iter->next; - if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { - continue; - } - chnl_id = atol(sx_value->val); - } else continue; // ignore it - } - } - lsx = sx_sublist; - /* find message id */ - SEXP_ITERATE_LIST(lsx, sx_iter, idx) { - if(SEXP_IS_LIST(sx_iter)) { - msg = sx_iter; - continue; - } else { - if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { - if(strcmp(sx_iter->val, ":msgid")) { - continue; // ignore - } - sx_value = sx_iter->next; - if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { - continue; - } - msg_id = atol(sx_value->val); - } else continue; // ignore it - } - } - /* get opcode */ - sexp_list_car(msg, &lsx); - opcode = atoi(lsx->val); - - if(msg_id < 0 || chnl_id < 0) { - r = ESXRCBADPROT; - goto __finish; - } - - if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT; - else chan = (chnl_t *)usrtc_node_getdata(node); - /* lookup for the message */ - pthread_rwlock_rdlock(&(chan->msglock)); - if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) { - pthread_rwlock_unlock(&(chan->msglock)); - r = ENOENT; - goto __finish; - } else { - pthread_rwlock_unlock(&(chan->msglock)); - smsg = (sxmsg_t *)usrtc_node_getdata(node); - smsg->opcode = opcode; - if(smsg->flags & ESXMSG_ISREPLY) - destroy_sexp((sexp_t *)smsg->payload); - smsg->payload = NULL; - smsg->flags |= ESXMSG_CLOSURE; - - /* Q: can we remove the message from the tree there??? */ - /* A: actually no */ - /* first remove the message from tree */ - if(smsg->flags & ESXMSG_RMONRETR) { - pthread_rwlock_wrlock(&(chan->msglock)); - usrtc_delete(chan->msgs_tree, &(smsg->pendingq_node)); - pthread_rwlock_unlock(&(chan->msglock)); - } - if(smsg->flags & ESXMSG_PENDING) { - destroy_sexp(sx); - pthread_mutex_unlock(&(smsg->wait)); - return r; - } else { - /* nobody want it */ - destroy_sexp(smsg->initial_sx); - __destroy_msg(smsg); - } - } - -__finish: - destroy_sexp(sx); - return r; -} - -static int __default_msg_rapid(void *cctx, sexp_t *sx) -{ - conn_t *co = (conn_t *)cctx; - usrtc_node_t *node = NULL; - chnl_t *chan = NULL; - int r = 0; - sexp_t *lsx = NULL, *sx_iter = NULL; - sexp_t *sx_sublist = NULL, *sx_value = NULL; - ulong_t chnl_id = -1; - ulong_t msg_id = -1; - sexp_t *msg = NULL; - sxmsg_t *smsg = NULL; - int idx; - - /* get parameters from the message */ - if(sexp_list_cdr(sx, &lsx)) { - r = ESXRCBADPROT; - goto __finish; - } - if(!SEXP_IS_LIST(lsx)) { - r = ESXRCBADPROT; - goto __finish; - } - - /* get parameters */ - SEXP_ITERATE_LIST(lsx, sx_iter, idx) { - if(SEXP_IS_LIST(sx_iter)) { - sx_sublist = sx_iter; - continue; - } else { - if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { - if(strcmp(sx_iter->val, ":chid")) { - continue; // ignore it - } - sx_value = sx_iter->next; - if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { - continue; - } - chnl_id = atol(sx_value->val); - } else continue; // ignore it - } - } - lsx = sx_sublist; - /* find message id */ - SEXP_ITERATE_LIST(lsx, sx_iter, idx) { - if(SEXP_IS_LIST(sx_iter)) { - msg = sx_iter; - continue; - } else { - if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { - if(strcmp(sx_iter->val, ":msgid")) { - continue; // ignore - } - sx_value = sx_iter->next; - if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { - continue; - } - msg_id = atol(sx_value->val); - } else continue; // ignore it - } - } - - if(msg_id < 0 || chnl_id < 0) { - r = ESXRCBADPROT; - goto __finish; - } - - if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT; - else chan = (chnl_t *)usrtc_node_getdata(node); - /* lookup for the message */ - pthread_rwlock_rdlock(&(chan->msglock)); - if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) { - pthread_rwlock_unlock(&(chan->msglock)); - r = ENOENT; - goto __finish; - } else { - pthread_rwlock_unlock(&(chan->msglock)); - smsg = (sxmsg_t *)usrtc_node_getdata(node); - smsg->opcode = ESXRAPIDREPLY; - smsg->payload = copy_sexp(msg); - smsg->flags |= ESXMSG_ISREPLY; - /* are we'are ready to remove this ? */ - if(smsg->flags & ESXMSG_RMONRETR) { - pthread_rwlock_wrlock(&(chan->msglock)); - usrtc_delete(chan->msgs_tree, &(smsg->pendingq_node)); - pthread_rwlock_unlock(&(chan->msglock)); - } - if(smsg->flags & ESXMSG_PENDING) { - destroy_sexp(sx); - pthread_mutex_unlock(&(smsg->wait)); - return r; - } else { - /* nobody want it */ - destroy_sexp(smsg->initial_sx); - __destroy_msg(smsg); - } - } - -__finish: - destroy_sexp(sx); - return r; -} - -static int __default_msg_reply(void *cctx, sexp_t *sx) -{ - conn_t *co = (conn_t *)cctx; - usrtc_node_t *node = NULL; - chnl_t *chan = NULL; - int r = 0; - sexp_t *lsx = NULL, *sx_iter = NULL; - sexp_t *sx_sublist = NULL, *sx_value = NULL; - ulong_t chnl_id = -1; - ulong_t msg_id = -1; - sexp_t *msg = NULL; - sxmsg_t *smsg = NULL; - int idx; - - /* get parameters from the message */ - if(sexp_list_cdr(sx, &lsx)) { - r = ESXRCBADPROT; - goto __finish; - } - if(!SEXP_IS_LIST(lsx)) { - r = ESXRCBADPROT; - goto __finish; - } - - /* get parameters */ - SEXP_ITERATE_LIST(lsx, sx_iter, idx) { - if(SEXP_IS_LIST(sx_iter)) { - sx_sublist = sx_iter; - continue; - } else { - if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { - if(strcmp(sx_iter->val, ":chid")) { - continue; // ignore it - } - sx_value = sx_iter->next; - if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { - continue; - } - chnl_id = atol(sx_value->val); - } else continue; // ignore it - } - } - lsx = sx_sublist; - /* find message id */ - SEXP_ITERATE_LIST(lsx, sx_iter, idx) { - if(SEXP_IS_LIST(sx_iter)) { - msg = sx_iter; - continue; - } else { - if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { - if(strcmp(sx_iter->val, ":msgid")) { - continue; // ignore - } - sx_value = sx_iter->next; - if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { - continue; - } - msg_id = atol(sx_value->val); - } else continue; // ignore it - } - } - - if(msg_id < 0 || chnl_id < 0) { - r = ESXRCBADPROT; - goto __finish; - } - - if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT; - else chan = (chnl_t *)usrtc_node_getdata(node); - /* lookup for the message */ - pthread_rwlock_rdlock(&(chan->msglock)); - if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) { - pthread_rwlock_unlock(&(chan->msglock)); - r = ENOENT; - goto __finish; - } else { - pthread_rwlock_unlock(&(chan->msglock)); - smsg = (sxmsg_t *)usrtc_node_getdata(node); - smsg->opcode = ESXOREPLYREQ; - smsg->payload = copy_sexp(msg); - smsg->flags |= ESXMSG_ISREPLY; - pthread_mutex_unlock(&(smsg->wait)); - } - -__finish: - destroy_sexp(sx); - return r; -} - -static int __init_systemrpc_tree(usrtc_t *rtree) -{ - /* security context functions */ - if(__insert_rpc_function(rtree, "auth-set-context", __default_auth_set_context)) goto __fail; - if(__insert_rpc_function(rtree, "auth-set-attr", __default_auth_set_attr)) goto __fail; - if(__insert_rpc_function(rtree, "auth-set-error", __default_auth_set_error)) goto __fail; - /* channels negotiation ops */ - if(__insert_rpc_function(rtree, "ch-get-types", __default_ch_get_types)) goto __fail; - if(__insert_rpc_function(rtree, "ch-gl-error", __default_ch_gl_error)) goto __fail; - if(__insert_rpc_function(rtree, "ch-set-types", __default_ch_set_types)) goto __fail; - if(__insert_rpc_function(rtree, "ch-open", __default_ch_open)) goto __fail; - if(__insert_rpc_function(rtree, "ch-open-ret", __default_ch_open_ret)) goto __fail; - if(__insert_rpc_function(rtree, "ch-close", __default_ch_close)) goto __fail; - if(__insert_rpc_function(rtree, "ch-close-ret", __default_ch_close_ret)) goto __fail; - /* messaging functions */ - if(__insert_rpc_function(rtree, "ch-msg", __default_msg)) goto __fail; - if(__insert_rpc_function(rtree, "ch-msg-rete", __default_msg_return)) goto __fail; - if(__insert_rpc_function(rtree, "ch-msg-rapid", __default_msg_rapid)) goto __fail; - if(__insert_rpc_function(rtree, "ch-msg-repl", __default_msg_reply)) goto __fail; - - return 0; - - __fail: - __destroy_rpc_list_tree(rtree); - return ENOMEM; -} - -static int __eval_cstr(char *cstr, cx_rpc_list_t *rpc_list, void *ctx) -{ - int r = ENOENT; - sexp_t *sx; - usrtc_node_t *node; - cx_rpc_t *rentry; - char *rpcf; - - if(!(sx = parse_sexp(cstr, strlen(cstr)))) return EBADE; - - if(sx->ty == SEXP_LIST) - rpcf = sx->list->val; - else goto __enoent; - - /* find an appropriate function */ - node = usrtc_lookup(rpc_list->rpc_tree, rpcf); - - if(!node) { - __enoent: - fprintf(stderr, "Invalid S-expression catched.\n"); - destroy_sexp(sx); - return ENOENT; - } - else rentry = (cx_rpc_t *)usrtc_node_getdata(node); - - /* call it */ - r = rentry->rpcf(ctx, sx); - //if(r) destroy_sexp(sx); - - return r; -} - -static void *__cxslave_thread_listener(void *wctx) -{ - conn_t *co = (conn_t *)wctx; - conn_sys_t *ssys = co->ssys; - char *buf = malloc(4096); - int r; - - while((r = __conn_read(co, buf, 4096)) != -1) { - buf[r] = '\0'; - r = __eval_cstr(buf, ssys->system_rpc, co); - } - co->flags &= ~CXCONN_ESTABL; - co->flags |= CXCONN_BROKEN; - - /* ok the first of all we're need to wake up all */ - __wake_up_waiters(co, ESXNOCONNECT); - /* now we need to end the poll */ - pth_dqtpoll_destroy(co->tpoll, 1); /* force */ - - __connection_free(co); - - free(buf); - - return NULL; -} - -static void *__cxmaster_thread_listener(void *wctx) -{ - conn_t *co = (conn_t *)wctx; - conn_sys_t *ssys = co->ssys; - char *buf = malloc(4096); - int r; - - while((r = __conn_read(co, buf, 4096)) != -1) { - buf[r] = '\0'; - r = __eval_cstr(buf, ssys->system_rpc, co); - } - co->flags &= ~CXCONN_ESTABL; - co->flags |= CXCONN_BROKEN; - - /* ok the first of all we're need to wake up all */ - __wake_up_waiters(co, ESXNOCONNECT); - /* now we need to end the poll */ - pth_dqtpoll_destroy(co->tpoll, 1); /* force */ - - __connection_free(co); - - free(buf); - - return NULL; -} - -static void *__rmsg_queue_thread(void *ctx) -{ - conn_t *co = (conn_t *)ctx; - pth_msg_t *tmp = malloc(sizeof(pth_msg_t)); - sxmsg_t *msg; - chnl_t *ch; - int r = 0; - char *rpcf; - sexp_t *sx; - usrtc_node_t *node = NULL; - cx_rpc_t *rpccall; - struct __rpc_job *rjob = NULL; - - if(!tmp) return NULL; - - while(1) { - r = pth_queue_get(co->rqueue, NULL, tmp); - if(r) { - __fini: - free(tmp); - return NULL; - } else if(tmp->msgtype == END_MSG) goto __fini; - msg = tmp->data; - if(!msg) continue; /* spurious !! */ - - /* check to right job */ - if(!(msg->flags & ESXMSG_USR)) { /* not a regular message */ - msg->flags |= ESXMSG_NOWAY; /* mark it's as undeliverable */ - msg->flags &= ~ESXMSG_PENDING; - - pthread_mutex_unlock(&(msg->wait)); /* wake up the waitee */ - continue; - } else { - /* now we're need to have a deal with the rpc calling, other - we don't care */ - ch = msg->pch; - sx = (sexp_t *)msg->payload; - - if(!sx) { - r = ESXRCBADPROT; - goto __err_ret; - } - /* get the function name */ - if((sx->ty == SEXP_LIST) && (sx->list != NULL)) - rpcf = sx->list->val; - else { - r = ESXRCBADPROT; - goto __err_ret; - } - - node = usrtc_lookup(ch->rpc_list->rpc_tree, rpcf); - if(!node) { - r = ENOENT; - __err_ret: - msg_return(msg, r); - } else { - rpccall = (cx_rpc_t *)usrtc_node_getdata(node); - /* call this ! */ - rjob = malloc(sizeof(struct __rpc_job)); // TODO: check it - rjob->msg = msg; - rjob->sx = sx; - rjob->rpcf = rpccall->rpcf; - pth_dqtpoll_add(co->tpoll, (void *)rjob, USR_MSG); // TODO: check it - } - } - } - - return NULL; -} - -static void *__msg_queue_thread(void *ctx) -{ - conn_t *co = (conn_t *)ctx; - pth_msg_t *tmp = malloc(sizeof(pth_msg_t)); - sxmsg_t *msg; - chnl_t *ch; - int r = 0, len; - char *buf = malloc(4096), *tb; - sexp_t *sx; - - if(!tmp || !buf) { - if(tmp) free(tmp); - if(buf) free(buf); - return NULL; - } - - while(1) { - r = pth_queue_get(co->mqueue, NULL, tmp); - if(r) { - __fini: - free(buf); - free(tmp); - return NULL; - } - - /* message workout */ - if(tmp->msgtype == END_MSG) goto __fini; - msg = tmp->data; - if(!msg) continue; /* spurious message */ - - if(!(msg->flags & ESXMSG_USR)) { /* not a regular message */ - msg->flags |= ESXMSG_NOWAY; /* mark it's as undeliverable */ - msg->flags &= ~ESXMSG_PENDING; - - pthread_mutex_unlock(&(msg->wait)); /* wake up the waitee */ - continue; - } else { - ch = msg->pch; - - /* now we need to complete the request */ - sx = (sexp_t *)msg->payload; tb = buf; - if(!(msg->flags & ESXMSG_PULSE)) { /* %s))) */ - if(!(msg->flags & ESXMSG_ISREPLY)) - snprintf(buf, 4096, "(ch-msg (:chid %lu (:msgid %lu ", ch->cid, - msg->mid); - else { - if(!sx) { - snprintf(buf, 4096, "(ch-msg-rete (:chid %lu (:msgid %lu (%d))))", ch->cid, - msg->mid, msg->opcode); - /* mark it to close */ - msg->flags |= ESXMSG_CLOSURE; - /* ok, here we will write it and wait, destroying dialog while reply */ - - goto __ssl_write; - } else { - if(msg->flags & ESXMSG_ISRAPID) { - msg->flags |= ESXMSG_CLOSURE; - pthread_mutex_unlock(&(msg->wait)); /* wake it up */ - snprintf(buf, 4096, "(ch-msg-rapid (:chid %lu (:msgid %lu ", ch->cid, - msg->mid); - } else - snprintf(buf, 4096, "(ch-msg-repl (:chid %lu (:msgid %lu ", ch->cid, - msg->mid); - } - } - - len = strlen(buf); - tb += len*sizeof(char); - if(print_sexp(tb, 4096 - (len + 4*sizeof(char)), sx) == -1) { - msg->opcode = ENOMEM; - /* we don't need to wake up anybody */ - if(msg->flags & ESXMSG_TIMEDOUT) { - /* clean up all the shit: - * 1. remove from message tree - * 2. destroy message itself - */ - destroy_sexp(msg->initial_sx); - msg->initial_sx = NULL; - msg->payload = NULL; - if(msg->flags & ESXMSG_ISREPLY) - destroy_sexp(msg->payload); - } else { - pthread_mutex_unlock(&(msg->wait)); - } - } - } - - len = strlen(tb); - tb += len*sizeof(char); - strcat(tb, ")))"); - - __ssl_write: - if(msg->flags & ESXMSG_CLOSURE) { - /* wake up it */ - pthread_mutex_unlock(&(msg->wait)); - /* first remove the message from tree */ - pthread_rwlock_wrlock(&(ch->msglock)); - usrtc_delete(ch->msgs_tree, &(msg->pendingq_node)); - pthread_rwlock_unlock(&(ch->msglock)); - /* destroy */ - destroy_sexp(msg->initial_sx); - if(msg->flags & ESXMSG_ISREPLY && msg->payload) - destroy_sexp((sexp_t *)msg->payload); - __destroy_msg(msg); - } - - /* write it */ - if(__conn_write(co, (void *)buf, strlen(buf) + sizeof(char)) < 0) { - co->flags &= ~CXCONN_ESTABL; - co->flags |= CXCONN_BROKEN; - __wake_up_waiters(co, ESXNOCONNECT); - } - - } - - len = 0; - } - - free(buf); - - return NULL; -} - -/* this function is an ugly implementation to get C string with uuid */ -extern char *__generate_uuid(void); - -/* this is a callback to perform a custom SSL certs chain validation, - * as I promised here the comments, a lot of ... - * The first shit: 0 means validation failed, 1 otherwise - * The second shit: X509 API, I guess u will love it ;-) - * openssl calls this function for each certificate in chain, - * since our case is a simple (depth of chain is one, since we're - * don't care for public certificates lists or I cannot find any reasons to - * do it ...), amount of calls reduced, and in this case we're interested - * only in top of chain i.e. actual certificate used on client side, - * the validity of signing for other certificates within chain is - * guaranteed by the ssl itself. - * u know, we need to lookup in database, or elsewhere... some information - * about client certificate, and decide - is it valid, or not?, if so - * yep I mean it's valid, we can assign it's long fucking number to - * security context, to use in ongoing full scaled connection handshaking. - */ -static int __verify_certcall(int preverify_ok, X509_STORE_CTX *ctx) -{ - // X509 *cert = X509_STORE_CTX_get_current_cert(ctx); - int err = X509_STORE_CTX_get_error(ctx), depth = X509_STORE_CTX_get_error_depth(ctx); - SSL *ssl = X509_STORE_CTX_get_ex_data(ctx, SSL_get_ex_data_X509_STORE_CTX_idx()); - conn_t *co = SSL_get_ex_data(ssl, ex_ssldata_index); /* this is a custom data we're set before */ - conn_sys_t *ssys = co->ssys; - - /* now we need to check for certificates with a long chain, - * so since we have a short one, reject long ones */ - if(depth > VERIFY_DEPTH) { /* longer than we expect */ - preverify_ok = 0; /* yep, 0 means error for those function callback in openssl, fucking set */ - err = X509_V_ERR_CERT_CHAIN_TOO_LONG; - X509_STORE_CTX_set_error(ctx, err); - } - - if(!preverify_ok) return 0; - - /* ok, now we're on top of SSL (depth == 0) certs chain, - * and we can validate client certificate */ - if(!depth) { - co->pctx->certid = - ASN1_INTEGER_get((const ASN1_INTEGER *)X509_get_serialNumber(ctx->current_cert)); - /* now we're need to check the ssl cert */ - if(ssys->validate_sslpem) { - if(ssys->validate_sslpem(co)) return 0; - else return 1; - } else return 0; - } - - return preverify_ok; -} - -/* dummy just to check the server side */ -static int __verify_certcall_dummy(int preverify_ok, X509_STORE_CTX *ctx) -{ - return preverify_ok; -} - -/* thread serve the whole connections set i.e. subsystem - * actually it works with system messages - */ -static void *__system_queue_listener(void *data) -{ - conn_sys_t *ssys = (conn_sys_t *)data; - int r; - pth_msg_t *tmp = malloc(sizeof(pth_msg_t)); - sxmsg_t *sysmsg; - sxpayload_t *payload; - chnl_t *chan; - conn_t *co; - - if(!tmp) return NULL; - - while(1) { - r = pth_queue_get(ssys->ioqueue, NULL, tmp); - if(r) { - free(tmp); - return NULL; - } - - /* ok message is delivered */ - sysmsg = tmp->data; - if(!sysmsg) continue; /* ignore dummy messages */ - - if(!(sysmsg->flags & ESXMSG_SYS)) { /* not a system message */ - sysmsg->flags |= ESXMSG_NOWAY; /* mark it's as undeliverable */ - sysmsg->flags &= ~ESXMSG_PENDING; - pthread_mutex_unlock(&(sysmsg->wait)); /* wake up the waitee */ - continue; - } else { - chan = sysmsg->pch; - co = chan->connection; - payload = (sxpayload_t *)sysmsg->payload; - /* write the buf */ - if(__conn_write(co, (void *)payload->cstr, strlen(payload->cstr) + 1) < 0) { - co->flags &= ~CXCONN_ESTABL; - co->flags |= CXCONN_BROKEN; - __wake_up_waiters(co, ESXNOCONNECT); - } - } - } - - return NULL; -} - -/* general initialization must be called within app uses connection layer */ -int connections_init(conn_sys_t *ssys) -{ - int r = 0; - - if(!ssys) return EINVAL; - else if(!(ssys->connections = malloc(sizeof(usrtc_t)))) { - r = ENOMEM; - goto __fail; - } - - /* zeroing */ - ssys->rootca = ssys->certkey = ssys->certpem = NULL; - ssys->validate_sslpem = NULL; - ssys->secure_check = NULL; - ssys->on_destroy = NULL; - /* init connections list */ - usrtc_init(ssys->connections, USRTC_REDBLACK, MAX_CONNECTIONS, - __cmp_cstr); - if((r = pthread_rwlock_init(&(ssys->rwlock), NULL))) - goto __fail_1; - - /* init queues */ - if(!(ssys->ioq = malloc(sizeof(pth_queue_t)))) { /* general io queue */ - r = ENOMEM; - goto __fail_2; - } - if((r = pth_queue_init(ssys->ioq))) goto __fail_3; - if(!(ssys->ioqueue = malloc(sizeof(pth_queue_t)))) { /* system io queue */ - r = ENOMEM; - goto __fail_2; - } - if((r = pth_queue_init(ssys->ioqueue))) goto __fail_3_1; - - /* init SSL certificates checking functions */ - /* init RPC list related functions */ - if(!(ssys->system_rpc = malloc(sizeof(cx_rpc_list_t)))) { - r = ENOMEM; - goto __fail_3; - } else { - if(!(ssys->system_rpc->rpc_tree = malloc(sizeof(usrtc_t)))) { - r = ENOMEM; - __fail_rpc: - free(ssys->system_rpc); - goto __fail_3_1; - } - usrtc_init(ssys->system_rpc->rpc_tree, USRTC_SPLAY, 256, __cmp_cstr); - r = __init_systemrpc_tree(ssys->system_rpc->rpc_tree); - if(r) { - free(ssys->system_rpc->rpc_tree); - goto __fail_rpc; - } - } - - /* init SSL library */ - SSL_library_init(); - - OpenSSL_add_all_algorithms(); - SSL_load_error_strings(); - - /* create threads for queue */ - if((r = pthread_create(&ssys->ios_thread, NULL, __system_queue_listener, (void *)ssys))) { - goto __fail_rpc; - } - - return 0; - - __fail_3_1: - free(ssys->ioqueue); - __fail_3: - free(ssys->ioq); - __fail_2: - pthread_rwlock_destroy(&(ssys->rwlock)); - __fail_1: - free(ssys->connections); - __fail: - - return r; -} - -/* load certificates */ -int connections_setsslserts(conn_sys_t *ssys, const char *rootca, - const char *certpem, const char *certkey) -{ - int r = ENOMEM; - - if(!ssys) return EINVAL; - /* simply copying */ - if(!(ssys->rootca = strdup(rootca))) return ENOMEM; - if(!(ssys->certkey = strdup(certkey))) goto __fail; - if(!(ssys->certpem = strdup(certpem))) goto __fail; - - r = 0; - return 0; - __fail: - if(ssys->rootca) free(ssys->rootca); - if(ssys->certkey) free(ssys->certkey); - if(ssys->certpem) free(ssys->certpem); - - return r; -} - -int connections_setrpclist_function(conn_sys_t *ssys, - usrtc_t* (*get_rpc_typed_list_tree) - (conn_t *)) -{ - if(!ssys) return EINVAL; - ssys->get_rpc_typed_list_tree = get_rpc_typed_list_tree; - - return 0; -} - -static void __connections_subsystem_connection_remove(conn_t *co) -{ - pthread_rwlock_wrlock(&(co->ssys->rwlock)); - usrtc_delete(co->ssys->connections, &(co->csnode)); - pthread_rwlock_unlock(&(co->ssys->rwlock)); - - return; -} - -#define __TMPBUFLEN 2048 - -/* connection_initiate: perform a connection thru the socket to the - * host with master certificate, i.e. it's a slave one for client. - */ -int connection_initiate_m(conn_sys_t *ssys, conn_t *co, const char *host, - int port, const char *SSL_cert, perm_ctx_t *pctx) -{ - int r = 0, sd; - int bytes = 0; - char *uuid; - char *buf = NULL; - struct hostent *host_; - struct sockaddr_in addr; -#ifdef WIN32 - WSADATA wsaData; -#endif - usrtc_t *ch_tree, *rpc_tree; - pth_queue_t *mqueue = malloc(sizeof(pth_queue_t)); - pth_queue_t *rqueue = malloc(sizeof(pth_queue_t)); - pth_dqtpoll_t *tpoll = malloc(sizeof(pth_dqtpoll_t)); - idx_allocator_t *idx_ch = malloc(sizeof(idx_allocator_t)); - - if(!mqueue) { - __fallenomem: - r = ENOMEM; - __fall0: - if(rqueue) free(rqueue); - if(mqueue) free(mqueue); - if(tpoll) free(tpoll); - if(idx_ch) free(idx_ch); - return r; - } - -#ifdef WIN32 - WSAStartup(MAKEWORD(2, 2), &wsaData); -#endif - - if(!tpoll) goto __fallenomem; - if(!idx_ch) goto __fallenomem; - - if(!co) { - __falleinval: - r = EINVAL; - goto __fall0; - } else if(!ssys) goto __falleinval; - - if(!host) goto __falleinval; - if(!SSL_cert) goto __falleinval; - if(!pctx) goto __falleinval; - - memset(co, 0, sizeof(conn_t)); - /* setup connections set */ - co->ssys = ssys; - - - pth_dqtpoll_init(tpoll, __rpc_callback); - - if(!idx_ch) return ENOMEM; - else r = idx_allocator_init(idx_ch, MAX_CHANNELS*MAX_MULTI, 0); - if(r) return r; - - if(!(uuid = __generate_uuid())) { - return ENOMEM; - } - - if(!(ch_tree = malloc(sizeof(usrtc_t)))) { - r = ENOMEM; - goto __fail; - } - - if(!(rpc_tree = malloc(sizeof(usrtc_t)))) { - r = ENOMEM; - goto __fail_1; - } - - if((r = pthread_mutex_init(&co->oplock, NULL))) goto __fail_2; - if((r = pthread_rwlock_init(&co->chnl_lock, NULL))) goto __fail_3; - - usrtc_init(rpc_tree, USRTC_REDBLACK, MAX_RPC_LIST, __cmp_int); - usrtc_init(ch_tree, USRTC_REDBLACK, MAX_CHANNELS, __cmp_ulong); - - co->idx_ch = idx_ch; - - /* assign message queue */ - if((r = pth_queue_init(mqueue))) goto __fail_3; - co->mqueue = mqueue; - /* assign repl message queue */ - if((r = pth_queue_init(rqueue))) goto __fail_3; - co->rqueue = rqueue; - - /* init SSL certificates and context */ - co->ctx = SSL_CTX_new(TLSv1_2_client_method()); - if(!co->ctx) { ERR_print_errors_fp(stderr); - r = EINVAL; goto __fail_3; } - else { - SSL_CTX_set_verify(co->ctx, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, - __verify_certcall_dummy); - SSL_CTX_set_verify_depth(co->ctx, 1); /* FIXME: use configuration */ - } - - /* load certificates */ - SSL_CTX_load_verify_locations(co->ctx, ssys->rootca, NULL); - /* set the local certificate from CertFile */ - if(SSL_CTX_use_certificate_file(co->ctx, SSL_cert, - SSL_FILETYPE_PEM)<=0) { - r = EINVAL; - goto __fail_3; - } - /* set the private key from KeyFile (may be the same as CertFile) */ - - if(SSL_CTX_use_PrivateKey_file(co->ctx, SSL_cert, - SSL_FILETYPE_PEM)<=0) { - r = EINVAL; - goto __fail_3; - } - - /* verify private key */ - if (!SSL_CTX_check_private_key(co->ctx)) { - r = EINVAL; - goto __fail_3; - } - - /* assign allocated memory */ - co->rpc_list = rpc_tree; - co->chnl_tree = ch_tree; - co->uuid = uuid; - - /* connect to the pointed server */ - /* resolve host */ - if(!(buf = malloc(__TMPBUFLEN))) { - r = ENOMEM; - goto __fail_3; - } - - //r=__resolvehost(host, buf, __TMPBUFLEN, &host_); -#ifdef WIN32 - host_ = gethostbyname(host); -#else - r = __resolvehost(host, buf, __TMPBUFLEN, &host_); -#endif - - if(r) { - r = ENOENT; - free(buf); - goto __fail_3; - } - - // /* create a socket */ - sd = socket(PF_INET, SOCK_STREAM, 0); - bzero(&addr, sizeof(addr)); - - /* try to connect it */ - addr.sin_family = AF_INET; - addr.sin_port = htons(port); - addr.sin_addr.s_addr = *(uint32_t*)(host_->h_addr); - r=connect(sd, (struct sockaddr*)&addr, sizeof(addr)); - - if ( r != 0) { - printf("connect error %d %s \n",r, strerror(errno)); - close(sd); - free(buf); - r = ENOENT; /* couldn't connect to the desired host */ - goto __fail_3; - } - - /* now we will create an SSL connection */ - co->ssl = SSL_new(co->ctx); - SSL_set_fd(co->ssl, sd); /* attach connected socket */ - // BIO_set_nbio(SSL_get_rbio(co->ssl), 1); - SSL_set_connect_state(co->ssl); - if(SSL_connect(co->ssl) == -1) { - r = EBADE; - free(buf); - /* shutdown connection */ - goto __fail_3; - } /* if success we're ready to use established SSL channel */ - - /* auth and RPC contexts sync */ - co->pctx = pctx; - snprintf(buf, __TMPBUFLEN, "(auth-set-context ((:user \"%s\")(:passwd \"%s\")))", - pctx->login, pctx->passwd); - /* send an auth request */ - if(__conn_write(co, buf, strlen(buf) + sizeof(char))) { - __finalize: - co->flags &= ~CXCONN_ESTABL; - co->flags |= CXCONN_BROKEN; - r = ESXNOCONNECT; - free(buf); - __retry_shut: - if(!SSL_shutdown(co->ssl)) { - usleep(100); - goto __retry_shut; - } - /* shutdown connection */ - goto __fail_3; - } - /* read the message reply */ - - bytes = __conn_read(co, buf, __TMPBUFLEN); - if(bytes == -1) { - // we've lost the connection - co->flags &= ~CXCONN_ESTABL; - r = ESXNOCONNECT; - co->flags |= CXCONN_BROKEN; - free(buf); - /* shutdown connection */ - goto __fail_3; - } - - buf[bytes] = 0; - - /* perform an rpc call */ - r = __eval_cstr(buf, ssys->system_rpc, (void *)co); - - if(!r) { /* all is fine security context is good */ - snprintf(buf, __TMPBUFLEN, "(ch-get-types)"); /* now we should receive possible channel types */ - if(__conn_write(co, buf, strlen(buf) + sizeof(char))) { - goto __finalize; - } - - /* read the message reply */ - bytes = __conn_read(co, buf, __TMPBUFLEN); - if(bytes == -1) { - goto __finalize; - } - - buf[bytes] = 0; - /* perform an rpc call */ - r = __eval_cstr(buf, ssys->system_rpc, (void *)co); - } - - free(buf); /* now we can free the temporary buffer */ - /* a listening thread creation (incoming messages) */ - if(!r) { /* success let's start a listening thread */ - r = pthread_create(&co->cthread, NULL, __cxslave_thread_listener, (void *)co); - if(!r) { - /* add connection to the list */ - usrtc_node_init(&co->csnode, co); - co->flags = (CXCONN_SLAVE | CXCONN_ESTABL); /* set the right flags */ - pthread_rwlock_wrlock(&ssys->rwlock); - usrtc_insert(ssys->connections, &co->csnode, (void *)co->uuid); - pthread_rwlock_unlock(&ssys->rwlock); - } - r = pthread_create(&co->msgthread, NULL, __msg_queue_thread, (void *)co); - if(r) goto __finalize; - r = pthread_create(&co->rmsgthread, NULL, __rmsg_queue_thread, (void *)co); - if(r) goto __finalize; - - pth_dqtpoll_run(tpoll); - co->tpoll = tpoll; - - return 0; - } - - __fail_3: - pthread_mutex_destroy(&co->oplock); - __fail_2: - free(rpc_tree); - __fail_1: - free(ch_tree); - __fail: - free(uuid); - return r; -} - -int connection_create_fapi_m(conn_sys_t *ssys, conn_t *co, int sck, - struct in_addr *addr) -{ - int r = 0; - int bytes = 0; - char *uuid; - char *buf = NULL; - usrtc_t *ch_tree, *rpc_tree; - pth_queue_t *rqueue = malloc(sizeof(pth_queue_t)); - pth_queue_t *mqueue = malloc(sizeof(pth_queue_t)); - pth_dqtpoll_t *tpoll = malloc(sizeof(pth_dqtpoll_t)); - if(!tpoll) return ENOMEM; // TODO: fallback - idx_allocator_t *idx_ch = malloc(sizeof(idx_allocator_t)); - - if(!co) return EINVAL; - else memset(co, 0, sizeof(conn_t)); - - pth_dqtpoll_init(tpoll, __rpc_callback); // TODO: check it - - if(!idx_ch) return ENOMEM; - else r = idx_allocator_init(idx_ch, MAX_CHANNELS*MAX_MULTI, 0); - if(r) return r; - - if(!(uuid = __generate_uuid())) return ENOMEM; - if(!(ch_tree = malloc(sizeof(usrtc_t)))) { - r = ENOMEM; - goto __fail; - } - if(!(rpc_tree = malloc(sizeof(usrtc_t)))) { - r = ENOMEM; - goto __fail_1; - } - if((r = pthread_mutex_init(&co->oplock, NULL))) goto __fail_2; - if((r = pthread_rwlock_init(&co->chnl_lock, NULL))) goto __fail_3; - - usrtc_init(rpc_tree, USRTC_REDBLACK, MAX_RPC_LIST, __cmp_int); - usrtc_init(ch_tree, USRTC_REDBLACK, MAX_CHANNELS, __cmp_ulong); - - co->idx_ch = idx_ch; - - /* setup connections set */ - co->ssys = ssys; - - /* assign message queue */ - r = pth_queue_init(rqueue); - if(r) goto __fail_3; - co->rqueue = rqueue; - /* assigned outbone message queue master also has this one */ - r = pth_queue_init(mqueue); - if(r) goto __fail_3; - co->mqueue = mqueue; - - /* init SSL certificates and context */ - co->ctx = SSL_CTX_new(TLSv1_2_server_method()); - if(!co->ctx) { r = EINVAL; printf("%s:%d\n", __FUNCTION__, __LINE__);goto __fail_3; } - else { - /* set verify context */ - SSL_CTX_set_verify(co->ctx, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, - __verify_certcall); - /* set verify depth */ - SSL_CTX_set_verify_depth(co->ctx, VERIFY_DEPTH); - } - - /* load certificates */ - SSL_CTX_load_verify_locations(co->ctx, ssys->rootca, NULL); - /* set the local certificate from CertFile */ - if(SSL_CTX_use_certificate_file(co->ctx, ssys->certpem, - SSL_FILETYPE_PEM)<=0) { - printf("certpem1 = %s\n", ssys->certpem); - ERR_print_errors_fp(stderr); - r = EINVAL; printf("%s:%d\n", __FUNCTION__, __LINE__); - goto __fail_3; - } - /* set the private key from KeyFile (may be the same as CertFile) */ - if(SSL_CTX_use_PrivateKey_file(co->ctx, ssys->certkey, - SSL_FILETYPE_PEM)<=0) { - r = EINVAL; printf("%s:%d\n", __FUNCTION__, __LINE__); - goto __fail_3; - } - /* verify private key */ - if (!SSL_CTX_check_private_key(co->ctx)) { - r = EINVAL; printf("%s:%d\n", __FUNCTION__, __LINE__); - goto __fail_3; - } - - /* assign allocated memory */ - co->rpc_list = rpc_tree; - co->chnl_tree = ch_tree; - co->uuid = uuid; - - if(!(buf = malloc(__TMPBUFLEN))) { - r = ENOMEM; - goto __fail_3; - } - - /* now we will create an SSL connection */ - co->ssl = SSL_new(co->ctx); - co->pctx = malloc(sizeof(perm_ctx_t)); - SSL_set_fd(co->ssl, sck); /* attach connected socket */ - /* ok now we need to initialize address */ - if(addr) { - co->pctx->addr = malloc(sizeof(struct in_addr)); - memcpy(co->pctx->addr, addr, sizeof(struct in_addr)); - } else co->pctx->addr = NULL; - - SSL_set_accept_state(co->ssl); - /* set the context to verify ssl connection */ - SSL_set_ex_data(co->ssl, ex_ssldata_index, (void *)co); - //BIO_set_nbio(SSL_get_rbio(co->ssl), 1); - SSL_set_accept_state(co->ssl); - if(SSL_accept(co->ssl) == -1) { - r = EBADE; - free(buf); - /* shutdown connection */ - goto __fail_3; - } /* if success we're ready to use established SSL channel */ - - /*******************************************/ - /*-=Protocol part of connection establish=-*/ - /*******************************************/ - while(!(co->flags & CXCONN_ESTABL)) { /* read the initiation stage connections */ - bytes = __conn_read(co, buf, __TMPBUFLEN); - if(bytes > 0) { - buf[bytes] = 0; - r = __eval_cstr(buf, ssys->system_rpc, (void *)co); - if(r) { - fprintf(stderr, "Initiation func return %d\n", r); - free(buf); - SSL_shutdown(co->ssl); - goto __fail_3; - } - } else { - if(bytes < 0) { - printf("Terminate SSL connection, the other end is lost.\n"); - co->flags &= ~CXCONN_ESTABL; - co->flags |= CXCONN_BROKEN; - free(buf); - if(ssys->on_destroy) ssys->on_destroy(co); - SSL_shutdown(co->ssl); - r = ESXNOCONNECT; - goto __fail_3; - } - } - } - - /* before it will be done assign rpc list */ - if(ssys->get_rpc_typed_list_tree) - co->rpc_list = ssys->get_rpc_typed_list_tree(co); - - free(buf); - r = pthread_create(&co->cthread, NULL, __cxmaster_thread_listener, (void *)co); - if(!r) { - /* add connection to the list */ - usrtc_node_init(&co->csnode, co); - co->flags |= CXCONN_MASTER; /* set the right flags */ - pthread_rwlock_wrlock(&ssys->rwlock); - usrtc_insert(ssys->connections, &co->csnode, (void *)co->uuid); - pthread_rwlock_unlock(&ssys->rwlock); - /* threads poll --- */ - r = pthread_create(&co->msgthread, NULL, __msg_queue_thread, (void *)co); - if(r) goto __fail_3; - r = pthread_create(&co->rmsgthread, NULL, __rmsg_queue_thread, (void *)co); - if(r) goto __fail_3; - - pth_dqtpoll_run(tpoll); - co->tpoll = tpoll; - } - - return r; - - __fail_3: - pthread_mutex_destroy(&co->oplock); - __fail_2: - free(rpc_tree); - __fail_1: - free(ch_tree); - __fail: - free(uuid); - return r; -} - -int connection_close(conn_t *co) -{ - void *nil; - - pthread_cancel(co->cthread); - /* wait for the main listener */ - pthread_join(co->cthread, &nil); - /* ok the first of all we're need to wake up all */ - __wake_up_waiters(co, ESXNOCONNECT); - /* now we need to end the poll */ - pth_dqtpoll_destroy(co->tpoll, 1); /* force */ - - __connection_free(co); - - return 0; -} - -extern int __create_reg_msg(sxmsg_t **msg, chnl_t *ch); - -extern int __create_sys_msg(sxmsg_t **msg, char *uuid, chnl_t *ch, sxpayload_t *data); - -static void __connection_free(conn_t *co) -{ - pth_msg_t msg; - void *nil; - - /* since we doesn't works with IN queue and job dispatching - * close the thread - */ - msg.msgtype = END_MSG; - msg.data = NULL; - pth_queue_add(co->rqueue, &msg, END_MSG); /* it will drop the thread */ - pthread_join(co->rmsgthread, &nil); /* wait for it */ - /* message sending dispatch thread must be finished too */ - pth_queue_add(co->mqueue, &msg, END_MSG); /* it will drop the thread */ - pthread_join(co->msgthread, &nil); /* wait for it */ - /* since we don't have any threads working with channels destroy them */ - __destroy_all_channels(co); - /* permission context and callback of exists */ - if(co->ssys->on_destroy) co->ssys->on_destroy(co); - else { /* we don't have a handler */ - if(co->pctx->login) free(co->pctx->login); - if(co->pctx->passwd) free(co->pctx->passwd); - } - __connections_subsystem_connection_remove(co); - /* now we're ready to free other resources */ - if(co->uuid) free(co->uuid); - /* idx allocator */ - idx_allocator_destroy(co->idx_ch); - free(co->idx_ch); - free(co->chnl_tree); - /* kill SSL context */ - SSL_shutdown(co->ssl); - close(SSL_get_fd(co->ssl)); - SSL_free(co->ssl); - SSL_CTX_free(co->ctx); - /* destroy queues */ - pth_queue_destroy(co->mqueue, 0, NULL); - pth_queue_destroy(co->rqueue, 0, NULL); - /* locks */ - pthread_rwlock_destroy(&(co->chnl_lock)); - pthread_mutex_destroy(&(co->oplock)); - /* kill permission context */ - if(co->pctx) free(co->pctx); - - return; -} diff --git a/lib/message.c b/lib/message.c deleted file mode 100644 index 9df5d3f..0000000 --- a/lib/message.c +++ /dev/null @@ -1,286 +0,0 @@ -/* - * Secure Network Transport Layer Library implementation. - * This is a proprietary software. See COPYING for further details. - * - * (c) Askele Group 2013-2015 - * - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include - - -#ifdef WIN32 -#include -#else -#include -#include -#include -#include -#endif - -#include -#include - -#include -#include - -#include - -void __destroy_msg(sxmsg_t *msg) -{ - chnl_t *ch = msg->pch; - - if(msg->flags & ESXMSG_USR) { - pthread_mutex_lock(&(ch->oplock)); - idx_free(ch->idx_msg, msg->mid); - pthread_mutex_unlock(&(ch->oplock)); - } else if(msg->flags & ESXMSG_SYS) { - //if(msg->uuid) free(msg->uuid); - } - - pthread_mutex_unlock(&(msg->wait)); - pthread_mutex_destroy(&(msg->wait)); - free(msg); - return; -} - -sxmsg_t *__allocate_msg(int *res) -{ - sxmsg_t *msg = malloc(sizeof(sxmsg_t)); - int r = 0; - - if(!msg) { - *res = ENOMEM; - return NULL; - } else { - memset(msg, 0, sizeof(sxmsg_t)); - if((r = pthread_mutex_init(&(msg->wait), NULL))) { - free(msg); - *res = r; - return NULL; - } - - usrtc_node_init(&(msg->pendingq_node), msg); - } - - *res = 0; - - return msg; -} - -int __create_reg_msg(sxmsg_t **msg, chnl_t *ch) -{ - int r = 0; - sxmsg_t *sm = __allocate_msg(&r); - - if(r) return r; - else { - sm->pch = ch; - sm->flags = (ESXMSG_USR | ESXMSG_PENDING); - - /* ok allocate message ID */ - pthread_mutex_lock(&(ch->oplock)); - sm->mid = idx_allocate(ch->idx_msg); - pthread_mutex_unlock(&(ch->oplock)); - - pthread_mutex_lock(&(sm->wait)); - *msg = sm; - } - - return 0; -} - -int __create_sys_msg(sxmsg_t **msg, char *uuid, chnl_t *ch, sxpayload_t *data) -{ - int r = 0; - sxmsg_t *m = __allocate_msg(&r); - - if(r) return r; - else { - /* fill values */ - m->pch = ch; - m->uuid = uuid; - m->payload = data; - /* set the right flags */ - m->flags = (ESXMSG_SYS | ESXMSG_PENDING); - /* we need to lock the wait mutex */ - pthread_mutex_lock(&(m->wait)); - - *msg = m; - } - - return 0; -} - -/* message passing */ - -/* - * How message sending works: - * 1. Create a message structure assigned to the channel, - * 2. Put S-expression context to it - * 3. Put the message to the queue - * 4. expect the result waiting on the lock mutex - */ -static int __message_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec *tio) -{ - int r = 0; - sxmsg_t *m = NULL; - conn_t *co = ch->connection; - - if(!(co->flags & CXCONN_ESTABL)) { - destroy_sexp(sx); - return ESXNOCONNECT; - } - - *msg = NULL; - - r = __create_reg_msg(&m, ch); - if(r) return r; - else { - /* put the message to the search tree */ - pthread_rwlock_wrlock(&(ch->msglock)); - usrtc_insert(ch->msgs_tree, &(m->pendingq_node), &(m->mid)); - pthread_rwlock_unlock(&(ch->msglock)); - - /* message assign */ - m->opcode = 0; - m->payload = (void *)sx; - /* assign initial sx */ - m->initial_sx = sx; - - /* put the message to the run queue */ - r = pth_queue_add(co->mqueue, (void *)m, USR_MSG); - if(r) return r; /* FIXME: better give up */ - - if(m->flags & ESXMSG_PENDING) { - if(!tio) pthread_mutex_lock(&(m->wait)); - else pthread_mutex_timedlock(&(m->wait), tio); - } - if(tio && (m->flags & ESXMSG_PENDING)) - return ESXOTIMEDOUT; - if(!m->payload) { - r = m->opcode; - /* first remove the message from tree */ - pthread_rwlock_wrlock(&(ch->msglock)); - usrtc_delete(ch->msgs_tree, &(m->pendingq_node)); - pthread_rwlock_unlock(&(ch->msglock)); - /* destroy s expression */ - destroy_sexp(m->initial_sx); - /* destroy */ - __destroy_msg(m); - } else { - *msg = m; - if(m->opcode == ESXNOCONNECT || m->opcode == ESXRAPIDREPLY) - r = m->opcode; - else r = ESXOREPLYREQ; - /* FIXME: remove ugly code */ - if(m->opcode == ESXRAPIDREPLY) { - /* first remove the message from tree */ - pthread_rwlock_wrlock(&(ch->msglock)); - usrtc_delete(ch->msgs_tree, &(m->pendingq_node)); - pthread_rwlock_unlock(&(ch->msglock)); - } - } - } - - return r; -} - -int msg_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg) -{ - return __message_send(ch, sx, msg, NULL); -} - -int msg_send_timed(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec *tio) -{ - return __message_send(ch, sx, msg, tio); -} - -static int __msg_reply(sxmsg_t *msg, sexp_t *sx, struct timespec *tio, int opcode, - int israpid) -{ - int r = 0; - chnl_t *ch = msg->pch; - conn_t *co = ch->connection; - - if(!(co->flags & CXCONN_ESTABL)) { - destroy_sexp(sx); - return ESXNOCONNECT; - } - - if(msg->flags & ESXMSG_ISREPLY) - destroy_sexp((sexp_t *)msg->payload); - - msg->payload = sx; - msg->opcode = opcode; - msg->flags |= ESXMSG_PENDING; /* pending */ - msg->flags |= ESXMSG_ISREPLY; /* this is a reply */ - if(israpid) msg->flags |= ESXMSG_ISRAPID; /* message is a rapid message */ - - if(!sx || israpid) msg->flags &= ~ESXMSG_PENDING; - else msg->flags |= ESXMSG_RMONRETR; - - /* put the message to the queue */ - r = pth_queue_add(co->mqueue, (void *)msg, USR_MSG); - if(r) return r; /* FIXME: better give up */ - if(!sx || israpid) { - /* wait for write */ - //pthread_mutex_lock(&(msg->wait)); - return 0; - } - - if(msg->flags & ESXMSG_PENDING) { - if(!tio) pthread_mutex_lock(&(msg->wait)); - else pthread_mutex_timedlock(&(msg->wait), tio); - } - - if(tio && (msg->flags & ESXMSG_PENDING)) { - msg->flags &= ~ESXMSG_PENDING; /* we will not wait for it */ - return ESXOTIMEDOUT; - } - - r = msg->opcode; - - if(msg->flags & ESXMSG_CLOSURE) { - __destroy_msg(msg); - } - - return r; -} - -int msg_return(sxmsg_t *msg, int opcode) -{ - return __msg_reply(msg, NULL, NULL, opcode, 0); -} - -int msg_reply(sxmsg_t *msg, sexp_t *sx) -{ - return __msg_reply(msg, sx, NULL, 0, 0); -} - -int msg_reply_timed(sxmsg_t *msg, sexp_t *sx, struct timespec *tio) -{ - return __msg_reply(msg, sx, tio, 0, 0); -} - -int msg_reply_rapid(sxmsg_t *msg, sexp_t *sx) -{ - return __msg_reply(msg, sx, NULL, 0, 1); -} - -int msg_rapid_clean(sxmsg_t *msg) -{ - destroy_sexp(msg->initial_sx); - if(msg->payload) destroy_sexp(msg->payload); - __destroy_msg(msg); - - return 0; -} diff --git a/lib/queue.c b/lib/queue.c deleted file mode 100644 index 2dce4e1..0000000 --- a/lib/queue.c +++ /dev/null @@ -1,465 +0,0 @@ -/* - * This is a proprietary software. See COPYING for further details. - * - * - * - * (c) Askele Group 2013-2015 - */ - -#include -#include -#include -#include -#include -#include -#include -/**/ -#ifdef WIN32 -#include -#else - - -#include - -#endif - -/**/ -// #include -#include -#include - -#include - -#include - -#define MAX_QUEUE_SIZE 4096 -#define MAX_QUEUE_POOL 256 - -static long __cmp_uint(const void *a, const void *b) -{ - return (long)(*(unsigned int *)a - *(unsigned int *)b); -} - -static inline pth_msg_t *__get_newmsg(pth_queue_t *queue) -{ - usrtc_t *tree = &queue->msgcache; - usrtc_node_t *node; - pth_msg_t *tmp; - - if(usrtc_count(tree)) { - node = usrtc_first(tree); - tmp = (pth_msg_t *)usrtc_node_getdata(node); - usrtc_delete(tree, node); - } else { - tmp = malloc(sizeof(pth_msg_t)); - tree = &queue->qtree; - node = &tmp->node; - usrtc_node_init(node, tmp); - } - /* insert it */ - tree = &queue->qtree; - tmp->qlength = usrtc_count(tree); - usrtc_insert(tree, node, (void *)(&tmp->qlength)); - - return tmp; -} - -static inline void __release_msg(pth_queue_t *queue, pth_msg_t *msg) -{ - usrtc_node_t *node = &msg->node; - usrtc_t *tree = &queue->qtree; - - tree = &queue->qtree; /* remove from queue */ - usrtc_delete(tree, node); - - tree = &queue->msgcache; - - if(usrtc_count(tree) >= MAX_QUEUE_POOL) - free(msg); - else { - msg->data = NULL; - msg->msgtype = NIL_MSG; - usrtc_insert(tree, node, (void *)&msg->qlength); - } - - return; -} - -int pth_queue_init(pth_queue_t *queue) -{ - int r = 0; - - memset(queue, 0, sizeof(pth_queue_t)); - if((r = pthread_cond_init(&queue->cond, NULL))) - return r; - - if((r = pthread_mutex_init(&queue->mutex, NULL))) { - pthread_cond_destroy(&queue->cond); - return r; - } - - usrtc_init(&queue->qtree, USRTC_AVL, MAX_QUEUE_SIZE, __cmp_uint); - usrtc_init(&queue->msgcache, USRTC_AVL, MAX_QUEUE_POOL, __cmp_uint); - - return r; -} - -int pth_queue_add(pth_queue_t *queue, void *data, unsigned int msgtype) -{ - pth_msg_t *newmsg; - - pthread_mutex_lock(&queue->mutex); - newmsg = __get_newmsg(queue); - if (newmsg == NULL) { - pthread_mutex_unlock(&queue->mutex); - return ENOMEM; - } - - newmsg->data = data; - newmsg->msgtype = msgtype; - - if(queue->length == 0) - pthread_cond_broadcast(&queue->cond); - queue->length++; - pthread_mutex_unlock(&queue->mutex); - - return 0; -} - -int pth_queue_get(pth_queue_t *queue, const struct timespec *timeout, pth_msg_t *msg) -{ - usrtc_t *tree; - usrtc_node_t *node = NULL; - pth_msg_t *tmp; - int r = 0; - struct timespec abstimeout; - - if (queue == NULL || msg == NULL) - return EINVAL; - else - tree = &queue->qtree; - - if (timeout) { /* setup timeout */ - struct timeval now; - - gettimeofday(&now, NULL); - abstimeout.tv_sec = now.tv_sec + timeout->tv_sec; - abstimeout.tv_nsec = (now.tv_usec * 1000) + timeout->tv_nsec; - if (abstimeout.tv_nsec >= 1000000000) { - abstimeout.tv_sec++; - abstimeout.tv_nsec -= 1000000000; - } - } - - pthread_mutex_lock(&queue->mutex); - - /* Will wait until awakened by a signal or broadcast */ - while ((node = usrtc_first(tree)) == NULL && r != ETIMEDOUT) { /* Need to loop to handle spurious wakeups */ - if (timeout) - r = pthread_cond_timedwait(&queue->cond, &queue->mutex, &abstimeout); - else - pthread_cond_wait(&queue->cond, &queue->mutex); - } - if (r == ETIMEDOUT) { - pthread_mutex_unlock(&queue->mutex); - return r; - } - - tmp = (pth_msg_t *)usrtc_node_getdata(node); - queue->length--; - - msg->data = tmp->data; - msg->msgtype = tmp->msgtype; - msg->qlength = tmp->qlength; /* we will hold the msg id instead of size here */ - - __release_msg(queue, tmp); - pthread_mutex_unlock(&queue->mutex); - - return 0; -} - -int pth_queue_destroy(pth_queue_t *queue, int freedata, void (*free_msg)(void *)) -{ - int r = 0; - usrtc_t *tree = &queue->qtree; - usrtc_node_t *node = NULL; - pth_msg_t *msg; - - if (queue == NULL) return EINVAL; - - pthread_mutex_lock(&queue->mutex); - - for (node = usrtc_first(tree); node != NULL; node = usrtc_first(tree)) { - usrtc_delete(tree, node); - msg = (pth_msg_t *)usrtc_node_getdata(node); - - if(freedata) free(msg->data); - else if(free_msg) free_msg(msg->data); - - free(msg); - } - /* free cache */ - tree = &queue->msgcache; - for (node = usrtc_first(tree); node != NULL; node = usrtc_first(tree)) { - usrtc_delete(tree, node); - free(usrtc_node_getdata(node)); - } - - pthread_mutex_unlock(&queue->mutex); - r = pthread_mutex_destroy(&queue->mutex); - pthread_cond_destroy(&queue->cond); - - return r; -} - -unsigned int pth_queue_length(pth_queue_t *queue) -{ - unsigned int c; - - pthread_mutex_lock(&queue->mutex); - c = queue->length; - pthread_mutex_unlock(&queue->mutex); - - return c; -} - -/* dynamic queue thread poll */ - -struct __pthrd_data { - pth_dqtpoll_t *pthrd; - int myid; -}; - -static void *__poll_thread(void *poll) -{ - int r = 0; - struct __pthrd_data *thrdata = (struct __pthrd_data *)poll; - struct __pthrd_data *npoll = NULL; - pth_msg_t msgbuf, sysbuf; - pth_dqtpoll_t *p = thrdata->pthrd; - pth_queue_t *q = p->queue; - ulong_t myid = thrdata->myid; - struct timeval now; - int resched = 0; - long tusec, si, mdrate; - - while(1) { - resched = 0; - r = pth_queue_get(q, NULL, &msgbuf); - pthread_rwlock_wrlock(&(p->stats_lock)); - if(p->flags & DQTPOLL_DEADSTAGE) { /* poll going to be killed */ - pthread_rwlock_unlock(&(p->stats_lock)); - idx_free(p->idx, myid); - p->poll_value--; - - return NULL; - } - - /* now get the time */ - gettimeofday(&now, NULL); - if((now.tv_sec >= p->sched_time.tv_sec) && - (now.tv_usec >= p->sched_time.tv_usec)) { - resched = 1; - /* set the new schedule time */ - si = 0; - tusec = DQTPOLL_DELTAMS + now.tv_usec; - if(tusec > 1000000) { - tusec -= 1000000; - si++; - } - p->sched_time.tv_sec += si + DQTPOLL_DELTASE; - p->sched_time.tv_usec = tusec; - } - - if(resched) { /* ok now we need to resched and descrease/increase thread poll volume */ - if(p->msgop) mdrate = ((DQTPOLL_DELTASE*1000000) + DQTPOLL_DELTAMS)/p->msgop; - else mdrate = 0; - if((mdrate > p->poll_value) && (p->poll_value < MAX_POLL_VALUE)) { /* increase ! */ - if((npoll = malloc(sizeof(struct __pthrd_data)))) { - npoll->myid = idx_allocate(p->idx); - npoll->pthrd = p; - p->poll_value++; - /* create thread here */ - if(pthread_create(&(p->poll[npoll->myid]), NULL, __poll_thread, npoll)) { - idx_free(p->idx, npoll->myid); - p->poll_value--; - free(npoll); - } - } - } else if((p->poll_value > 2) && (mdrate < p->poll_value)) /* decrease */ { - memset(&sysbuf, 0, sizeof(pth_msg_t)); - pth_queue_add(p->queue, &sysbuf, POLL_DECREASE); - } - - /* init all other stuff */ - p->msgop = 0; - } - - if(r) { - pthread_rwlock_unlock(&(p->stats_lock)); - continue; - } else p->msgop++; - pthread_rwlock_unlock(&(p->stats_lock)); - - switch(msgbuf.msgtype) { - case USR_MSG: - /* do the job */ - p->jobdata_callback(msgbuf.data); - break; - case POLL_DECREASE: - pthread_rwlock_rdlock(&(p->stats_lock)); - if(p->poll_value > 2) r = 1; /* exit now */ - pthread_rwlock_unlock(&(p->stats_lock)); - if(r) { - pthread_rwlock_wrlock(&(p->stats_lock)); - idx_free(p->idx, myid); - p->poll_value--; - pthread_rwlock_unlock(&(p->stats_lock)); - free(poll); - return NULL; - } - break; - default: - /* TODO: do something ... */ - break; - } - } - - return NULL; -} - -/* init poll, structure must be allocated */ -int pth_dqtpoll_init(pth_dqtpoll_t *tpoll, int (*jobdata_callback)(void *)) -{ - int r = 0; - pth_queue_t *queue = malloc(sizeof(pth_queue_t)); - pthread_t *poll = malloc(sizeof(pthread_t)*MAX_POLL_VALUE); - idx_allocator_t *idx = malloc(sizeof(idx_allocator_t)); - struct __pthrd_data *ndata = malloc(sizeof(struct __pthrd_data)); - - /* check it for allocation */ - if(!ndata) goto __enomem; - if(!idx) goto __enomem; - if(!queue) goto __enomem; - if(!poll) goto __enomem; - - /* init all the stuff */ - if(idx_allocator_init(idx, MAX_POLL_VALUE*16, 0)) { - __enomem: - r = ENOMEM; - goto __finish; - } - if(pth_queue_init(queue)) goto __enomem; /* init queue */ - if(pthread_rwlock_init(&(tpoll->stats_lock), NULL)) goto __enomem; - - /* set parameters */ - memset(poll, 0, sizeof(pthread_t)*MAX_POLL_VALUE); - tpoll->flags = 0; - tpoll->idx = idx; - tpoll->poll = poll; - tpoll->queue = queue; - tpoll->poll_value = 2; - tpoll->spurious_wakeups = 0; - tpoll->msgop = 0; - tpoll->jobdata_callback = jobdata_callback; - - /* first thread initiation */ - idx_reserve(idx, 0); - ndata->myid = 0; - ndata->pthrd = tpoll; - if(pthread_create(&(poll[0]), NULL, __poll_thread, ndata)) { - __eadd: - pthread_rwlock_destroy(&(tpoll->stats_lock)); - goto __enomem; - } - /* second thread initiation */ - ndata = malloc(sizeof(struct __pthrd_data)); - if(!ndata) goto __eadd; - idx_reserve(idx, 1); - ndata->myid = 1; - ndata->pthrd = tpoll; - if(pthread_create(&(poll[1]), NULL, __poll_thread, ndata)) { - pthread_rwlock_destroy(&(tpoll->stats_lock)); - goto __enomem; - } - - gettimeofday(&(tpoll->sched_time), NULL); - - __finish: - if(r) { - if(ndata) free(ndata); - if(idx) free(idx); - if(queue) { - pth_queue_destroy(queue, 0, NULL); - free(queue); - } - if(poll) free(poll); - } - return r; -} - -/* run poll: poll */ -int pth_dqtpoll_run(pth_dqtpoll_t *tpoll) -{ - int r = 0; - - pthread_rwlock_wrlock(&(tpoll->stats_lock)); - if((tpoll->flags & DQTPOLL_RUNNING) || (tpoll->flags & DQTPOLL_DEADSTAGE)) r = EINVAL; - else { - tpoll->flags |= DQTPOLL_RUNNING; - } - pthread_rwlock_unlock(&(tpoll->stats_lock)); - - return r; -} - -/* add the job to the queue: poll, job data, message type */ -int pth_dqtpoll_add(pth_dqtpoll_t *tpoll, void *job, unsigned int type) -{ - int r = 0; - - r = pth_queue_add(tpoll->queue, job, type); - - return r; -} - -/* destroy the poll: poll, force flag - * if force flag is set (!= 0), give up - * about jobs, if no, do the job, but don't - * accept the new ones, and destroy all poll - * with last thread. - */ -int pth_dqtpoll_destroy(pth_dqtpoll_t *tpoll, int force) -{ - int r = 0; - pth_msg_t tmpmsg; - - pthread_rwlock_wrlock(&(tpoll->stats_lock)); - tpoll->flags |= DQTPOLL_DEADSTAGE; - pthread_rwlock_unlock(&(tpoll->stats_lock)); - - /* now we need to wait */ - while(1) { - pthread_rwlock_rdlock(&(tpoll->stats_lock)); - if(!tpoll->poll_value) { - pthread_rwlock_unlock(&(tpoll->stats_lock)); - break; - } else { - pthread_rwlock_unlock(&(tpoll->stats_lock)); - pth_queue_add(tpoll->queue, &tmpmsg, 0); /* spurious */ - } - usleep(100); /* just to sleep and free timeslice to others */ - } - - /* free all */ - pth_queue_destroy(tpoll->queue, 0, NULL); - idx_allocator_destroy(tpoll->idx); - pthread_rwlock_destroy(&(tpoll->stats_lock)); - - free(tpoll->poll); - free(tpoll->queue); - free(tpoll->idx); - - return r; -} - diff --git a/lib/support.c b/lib/uuid.c similarity index 71% rename from lib/support.c rename to lib/uuid.c index a6e5883..873addd 100644 --- a/lib/support.c +++ b/lib/uuid.c @@ -31,14 +31,6 @@ #include -#include -#include - -#include -#include - -#include - #ifdef WIN32 #define UUID_T_LENGTH 16 @@ -101,25 +93,3 @@ char *__generate_uuid(void) #endif } -/* networking helpers */ -#ifndef WIN32 -int __resolvehost(const char *hostname, char *buf, int buf_len, - struct hostent **rhp) -{ - struct hostent *hostbuf ;//= malloc(sizeof(struct hostent)); - struct hostent *hp = *rhp = NULL; - int herr = 0, hres = 0; - - - hostbuf = malloc(sizeof(struct hostent)); - if(!hostbuf) return NO_ADDRESS; - hres = gethostbyname_r(hostname, hostbuf, - buf, buf_len, &hp, &herr); - - if(hres) return NO_ADDRESS; - *rhp = hp; - - return NETDB_SUCCESS; -} -#endif -