diff --git a/include/sntl/connection.h b/include/sntl/connection.h index e6ee696..f1b65f7 100644 --- a/include/sntl/connection.h +++ b/include/sntl/connection.h @@ -33,17 +33,9 @@ #include #include +#include #include -/* TODO: remove to the special separate file - error codes */ -#define ESXOREPLYREQ 44 /* protocol require reply with expression, - * or expression return for the request */ -#define ESXOTIMEDOUT 45 /* timedout */ -#define ESXRCBADPROT 46 /* invalid protocol */ -#define ESXNOCONNECT 47 /* connection is lost */ -#define ESXNOCHANSUP 48 -#define ESXRAPIDREPLY 49 - #define VERIFY_DEPTH 1 /* FIXME: */ #define MAX_CONNECTIONS 32768 @@ -88,9 +80,9 @@ typedef struct __connection_t { struct __connections_subsys_type *ssys; /* < connections subsystem */ char *uuid; /** < uuid of the connection */ /* Channels section */ - idx_allocator_t *idx_ch; /** < index allocation for channels */ + idx_allocator_t idx_ch; /** < index allocation for channels */ pthread_mutex_t idx_ch_lock; /** < mutex for allocating and deallocating channels */ - struct __channel_t **channels; /** < channels O(1) storage */ + volatile struct __channel_t **channels; /** < channels O(1) storage */ /* RPC section */ usrtc_t *rpc_list; /** < search tree of possible RPC typed lists */ /* SSL related section */ @@ -101,15 +93,15 @@ typedef struct __connection_t { /* Security section */ perm_ctx_t *pctx; /** < higher layer authentification context */ /* Messages section */ - struct __message_t **messages; /** < messages O(1) storage */ - idx_allocator_t *idx_msg; + struct __message_t** volatile messages; /** < messages O(1) storage */ + idx_allocator_t idx_msg; pthread_mutex_t idx_msg_lock; list_head_t write_pending; /** < list of messages waiting for write */ pthread_mutex_t write_pending_lock; - uint8_t unused_messages; /** < unused message count */ + volatile uint8_t unused_messages; /** < unused message count */ /* Other stuff */ pthread_t thrd_poll[8]; - uint8_t flags; /** < flags of the connection */ + volatile uint8_t flags; /** < flags of the connection */ usrtc_node_t csnode; /** < node to store the connection within list */ } conn_t; @@ -155,11 +147,8 @@ typedef struct __sexp_payload_t { typedef struct __message_t { chnl_t *pch; /** < channel of the message(if applicable) */ pthread_mutex_t wait; /** < special wait mutex, used for pending list and sync */ + sntllv2_head_t mhead; void *payload; /** < payload */ - uint16_t payload_length; /** < payload length */ - uint8_t opcode; /** < opcode for system and pulse messages */ - uint16_t flags; /** < flags of the message (type, state etc ...)*/ - uint16_t idx; } sxmsg_t; typedef struct __connection_rpc_entry_type { diff --git a/include/sntl/errno.h b/include/sntl/errno.h index 0572936..e6e0f4c 100644 --- a/include/sntl/errno.h +++ b/include/sntl/errno.h @@ -32,6 +32,12 @@ #define SNE_IGNORED 213 #define SNE_REPLYREQ 214 #define SNE_RAPIDMSG 215 +#define SNE_ESSL 216 +#define SNE_NOCHANNELS 217 +#define SNE_MCHANNELS 218 +#define SNE_MMESSAGES 219 +#define SNE_LINKBROKEN 220 +#define SNE_INVALINDEX 221 /* some old errors for compatibility */ #define ESXOREPLYREQ SNE_REPLYREQ /* protocol require reply with expression, diff --git a/lib/connex.c b/lib/connex.c index bb846e7..d3570e5 100644 --- a/lib/connex.c +++ b/lib/connex.c @@ -126,6 +126,11 @@ static int __get_channels_list(void *cctx, sexp_t *sx) /* we will avoid S-exp scanning here */ + /* call the function */ + if(ssys->get_rpc_typed_list_tree) + co->rpc_list = ssys->get_rpc_typed_list_tree(co); + if(!co->rpc_list) return SNE_EPERM; + buf += sizeof(sntllv2_head_t); ulen += snprintf(buf + ulen, maxlen - ulen, "(set-channels-list "); for(node = usrtc_first(co->rpc_list); node != NULL; diff --git a/lib/sntllv2.c b/lib/sntllv2.c index d1c41a0..3381197 100644 --- a/lib/sntllv2.c +++ b/lib/sntllv2.c @@ -40,6 +40,36 @@ #include +typedef struct __sntll_bundle_type { + void *buf; + conn_t *conn; +} sntllv2_bundle_t; + +static sntllv2_bundle_t *__sntll_bundle_create(conn_t *co) +{ + sntllv2_bundle_t *n = malloc(sizeof(sntllv2_bundle_t)); + + if(!n) return NULL; + else memset(n, 0, sizeof(sntllv2_bundle_t)); + + n->buf = mmap(NULL, 65536, PROT_READ|PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); + if(n->buf == MAP_FAILED) { + free(n); + return NULL; + } + + n->conn = co; + + return n; +} + +static void __sntll_bundle_destroy(sntllv2_bundle_t *n) +{ + munmap(n->buf, 65536) + free(n); + return; +} + static int ex_ssldata_index; /** < index used to work with additional data * provided to the special call during SSL handshake */ @@ -150,6 +180,49 @@ conn_t *__connection_minimal_alloc(struct in_addr *addr) return NULL; } +static int __connection_second_alloc(conn_t *co) +{ + usrtc_node_init(&co->csnode, co); + + if((idx_allocator_init(&co->idx_ch, 512, 0))) goto __fail; + if((idx_allocator_init(&co->idx_msg, 1024, 0))) goto __fail; + + if(!(co->channels = malloc(sizeof(uintptr_t)*512))) goto __fail; + else memset(co->channels, 0, sizeof(uintptr_t)*512); + + /* init mutexes */ + co->idx_ch_lock = PTHREAD_MUTEX_INITIALIZER; + co->idx_msg_lock = PTHREAD_MUTEX_INITIALIZER; + co->write_pending_lock = PTHREAD_MUTEX_INITIALIZER; + co->sslinout[0] = PTHREAD_MUTEX_INITIALIZER; + co->sslinout[1] = PTHREAD_MUTEX_INITIALIZER; + + /* init list */ + list_head_init(&co->write_pending); + + return SNE_SUCCESS; + + __fail: + idx_allocator_destroy(&co->idx_msg); + idx_allocator_destroy(&co->idx_ch); + return SNE_ENOMEM; +} + +static void __connection_second_free(conn_t *co) +{ + if(co->channels) free(co->channels); + idx_allocator_destroy(&co->idx_msg); + idx_allocator_destroy(&co->idx_ch); + + pthread_mutex_destroy(co->idx_ch_lock); + pthread_mutex_destroy(co->idx_msg_lock); + pthread_mutex_destroy(co->write_pending_lock); + pthread_mutex_destroy(co->sslinout[0]); + pthread_mutex_destroy(co->sslinout[1]); + + return; +} + static void __connection_minimal_free(conn_t *co) { if(co) { @@ -186,6 +259,227 @@ static int __eval_syssexp(conn_t *co, sexp_t *sx) return rentry->rpcf((void *)co, sx); } +static void *__sntll_thread(void *b) +{ + sntllv2_bundle_t *bun = (sntllv2_bundle_t *)b; + conn_t *co = bun->co; + void *buf = bun->buf; + char *bbuf = (char*)buf; + sntllv2_head_t *mhead = (sntllv2_head_t *)buf; + sxmsg_t *msg; + chnl_t *channel; + pthread_t self = pthread_self(); + int dispatch = 0; + + /* byte buffer is following head */ + bbuf += sizeof(sntllv2_head_t); + + __wait_alive: + /* flag test - FIXME: make it atomic (it will works atomically on x86, btw on others not) */ + if(!(co->flags & SNSX_ALIVE)) { + if(co->flags & SNSX_CLOSED) goto __finish; + else { + usleep(20); + goto __wait_alive; + } + } + + /* check up a thread */ + if(pthread_equal(self, co->thrd_poll[7])) /* dispatcher */ + dispatch = 1; + + /* the following logic : (except dispatcher) + * 1. check up pending write -> if exists write one and start again., otherwise go next + * 2. read from ssl connection (we will sleep if other already acquire the lock) + */ + while(1) { + __again: + pthread_mutex_lock(&(co->sslinout[0])); + rd = __conn_read(co, mhead, sizeof(sntllv2_head_t)); + if(rd != sizeof(sntllv2_bundle_t)) { + __sslproto_error: + co->flags |= SNSX_CLOSED; + pthread_mutex_unlock(&(co->sslinout[0])); + goto __finish; + } else { + /* check up if we can read or not */ + if(mhead->payload_length) { + rd = __conn_read(co, bbuf, mhead->payload_length); + if(rd < 0) goto __sslproto_error; + else pthread_mutex_unlock(&(co->sslinout[0])); + if(rd != mhead->payload_length) { + /* if we're need to do something */ + if(mhead->msgid >= 1024) { + mhead->opcode = SNE_INVALINDEX; + goto __return_error; + } else msg = co->messages[mhead->msgid]; + if(!msg) { + if(mhead->attr & SXMSG_OPEN) mhead->opcode = SNE_BADPROTO; + else { + if((mhead->attr & SXMSG_PROTO) || (mhead->attr & SXMSG_LINK)) + mhead->opcode = SNE_BADPROTO; + else mhead->opcode = SNE_NOSUCHMSG; + } + } + __return_error: + mhead->attr |= SXMSG_CLOSED; + mhead->payload_length = 0; + pthread_mutex_lock(&(co->sslinout[1])); + wr = __conn_write(co, mhead, sizeof(sntllv2_head_t)); + pthread_mutex_unlock(&(co->sslinout[1])); + if(wr < 0) goto __finish; + else goto __again; + } + } else pthread_mutex_unlock(&(co->sslinout[0])); + /* take a message */ + if(mhead->attr & SXMSG_PROTO) { /* protocol message i.e. channel open/close */ + /* ok, check up the side */ + if(mhead->attr & SXMSG_REPLYREQ) { /* means we're not initiators and we don't need to allocate a message */ + if(mhead->attr & SXMSG_OPEN) + mhead->opcode = _channel_open(co, &mhead->reserve); + else mhead->opcode = _channel_close(co, mhead->reserve); + + /* set flags */ + mhead->payload_length = 0; + mhead->attr &= ~SXMSG_REPLYREQ; + pthread_mutex_lock(&(co->sslinout[1])); + wr = __conn_write(co, mhead, sizeof(sntllv2_head_t)); + pthread_mutex_unlock(&(co->sslinout[1])); + if(wr < 0) goto __finish; + } else { /* it's came back */ + /* reply came ... */ + if(mhead->msgid >= 1024) { + __inval_idx_nor: + fprintf(stderr, "[sntllv2] Invalid index of the message.\n"); + goto __again; + } + msg = co->messages[mhead->msgid]; + if(!msg) goto __inval_idx_nor; + + /* ok now we'are copy data and unlock wait mutex */ + memcpy(&msg->mhead, mhead, sizeof(sntllv2_head_t)); + pthread_mutex_unlock(&msg->wait); + } + } else if(mhead->attr & SXMSG_LINK) { /* link layer messages */ + if(mhead->attr & SXMSG_CLOSE) goto __finish; /* close the link */ + if(mhead->attr & SXMSG_PULSE) { /* it's a link pulse messages */ + /* TODO: syncronization and so on */ + } + } else { /* regular messages */ + if((mhead->attr & SXMSG_OPEN) && (mhead->attr & SXMSG_REPLYREQ)) { /* dialog initiation */ + channel = co->channels[mhead->reserve]; + if(!channel) { /* ok, we'are failed */ + mhead->opcode = SNE_NOSUCHCHAN; + __ret_regerr: + mhead->payload_length = 0; + mhead->attr &= ~SXMSG_REPLYREQ; + mhead->attr &= ~SXMSG_OPEN; + mhead->attr |= SXMSG_CLOSE; + pthread_mutex_lock(&(co->sslinout[1])); + wr = __conn_write(co, mhead, sizeof(sntllv2_head_t)); + pthread_mutex_unlock(&(co->sslinout[1])); + if(wr < 0) goto __finish; + else goto __again; + } + /* if message is busy - fails */ + msg = co->messages[mhead->msgid]; + if(msg) { mhead->opcode = SNE_EBUSY; goto __ret_regerr; } + + /* now we will take a deal */ + if(!(msg = malloc(sizeof(sxmsg_t)))) { + mhead->opcode = SNE_ENOMEM; goto __ret_regerr; + } else { + /* set mutex and channel */ + msg->wait = PTHREAD_MUTEX_INITIALIZER; + msg->pch = channel; + /* copy header only */ + memcpy(&msg->mhead, mhead, sizeof(sntllv2_head_t)); + msg->payload = bbuf; + } + + pthread_mutex_lock(&co->idx_ch_lock); + idx_reserve(&co->idx_ch, mhead->msgid); + co->messages[mhead->msgid] = msg; + pthread_mutex_unlock(&co->idx_ch_lock); + + /* now we are able to process the message */ + __message_process(msg); + } else if(mhead->attr & SXMSG_CLOSE) { + /* check for the message */ + if(mhead->msgid >= 1024) goto __inval_idx_nor; + msg = co->messages[mhead->msgid]; + if(!msg) goto __inval_idx_nor; + + if(msg->mhead.attr & SXMSG_TIMEDOUT) { /* nobody wait for it */ + pthread_mutex_lock(&co->idx_ch_lock); + idx_free(&co->idx_ch, mhead->msgid); + co->messages[mhead->msgid] = NULL; + pthread_mutex_unlock(&co->idx_ch_lock); + + /* now just free it */ + pthread_mutex_destroy(&msg->wait); + free(msg); + } else { + memcpy(&msg->mhead, mhead, sizeof(sntllv2_head_t)); + if(mhead->payload_length) { + msg->payload = malloc(mhead->payload_length); + if(msg->payload) memcpy(msg->payload, bbuf, mhead->payload_length); + else msg->mhead.opcode = SNE_ENOMEM; + } + pthread_mutex_unlock(&msg->wait); /* wake up thread waiting for */ + } + } else if((!(mhead->attr & SXMSG_CLOSE) && !(mhead->attr & SXMSG_OPEN)) && + (mhead->attr & SXMSG_REPLYREQ)) { /* ongoing dialog */ + /* check for the message */ + if(mhead->msgid >= 1024) goto __inval_idx_nor; + msg = co->messages[mhead->msgid]; + if(!msg) goto __inval_idx_nor; + + if(msg->mhead.attr & SXMSG_TIMEDOUT) { /* nobody wait for it */ + pthread_mutex_lock(&co->idx_ch_lock); + idx_free(&co->idx_ch, mhead->msgid); + co->messages[mhead->msgid] = NULL; + pthread_mutex_unlock(&co->idx_ch_lock); + + /* now just free it */ + pthread_mutex_destroy(&msg->wait); + free(msg); + + /* we must reply */ + mhead->opcode = SNE_ETIMEDOUT; + goto __ret_regerr; + } else { + memcpy(&msg->mhead, mhead, sizeof(sntllv2_head_t)); + if(mhead->payload_length) { + msg->payload = malloc(mhead->payload_length); + if(msg->payload) memcpy(msg->payload, bbuf, mhead->payload_length); + else { + mhead->opcode = msg->mhead.opcode = SNE_ENOMEM; /* we will return it to waitee */ + msg->mhead.attr &= ~SXMSG_REPLYREQ; /* doesn't need to reply */ + /* reply here now */ + mhead->payload_length = 0; + mhead->attr &= ~SXMSG_REPLYREQ; + mhead->attr &= ~SXMSG_OPEN; + mhead->attr |= SXMSG_CLOSE; + pthread_mutex_lock(&(co->sslinout[1])); + wr = __conn_write(co, mhead, sizeof(sntllv2_head_t)); + pthread_mutex_unlock(&(co->sslinout[1])); + if(wr < 0) goto __finish; + } + } + pthread_mutex_unlock(&msg->wait); /* wake up thread waiting for */ + } else + { mhead->opcode = SNE_BADPROTO; goto __ret_regerr; } + } + } + } + } + + __finish: + __sntll_bundle_destroy(b); /* destroy bundle */ + return NULL; +} + int connection_create_fapi_m(conn_sys_t *ssys, conn_t *co, int sck, struct in_addr *addr) { @@ -194,7 +488,9 @@ int connection_create_fapi_m(conn_sys_t *ssys, conn_t *co, int sck, conn_t *coc = __connection_minimal_alloc(addr); sx_msg_t *msg = NULL; sntllv2_head_t *head; + sntllv2_bundle_t *bundle; size_t rd; + int r = SNE_FAILED; if(!coc) return SNE_ENOMEM; @@ -203,7 +499,7 @@ int connection_create_fapi_m(conn_sys_t *ssys, conn_t *co, int sck, /* init SSL certificates and context */ co->ctx = SSL_CTX_new(TLSv1_2_server_method()); - if(!co->ctx) { goto __fail; } + if(!co->ctx) { r = SNE_ENOMEM; goto __fail; } else { /* set verify context */ SSL_CTX_set_verify(co->ctx, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, @@ -218,37 +514,40 @@ int connection_create_fapi_m(conn_sys_t *ssys, conn_t *co, int sck, if(SSL_CTX_use_certificate_file(co->ctx, ssys->certpem, SSL_FILETYPE_PEM)<=0) { ERR_print_errors_fp(stderr); + r = SNE_ESSL; goto __fail; } /* set the private key from KeyFile (may be the same as CertFile) */ if(SSL_CTX_use_PrivateKey_file(co->ctx, ssys->certkey, SSL_FILETYPE_PEM)<=0) { + r = SNE_ESSL; goto __fail; } /* verify private key */ if (!SSL_CTX_check_private_key(co->ctx)) { + r = SNE_ESSL; goto __fail; } /* now we will create an SSL connection */ co->ssl = SSL_new(co->ctx); - if(!co->ssl) goto __fail; + if(!co->ssl) { r = SNE_ENOMEM; goto __fail; } else SSL_set_fd(co->ssl, sck); /* attach connected socket */ SSL_set_accept_state(co->ssl); /* set the context to verify ssl connection */ SSL_set_ex_data(co->ssl, ex_ssldata_index, (void *)co); SSL_set_accept_state(co->ssl); - if(SSL_accept(co->ssl) == -1) goto __fail; + if(SSL_accept(co->ssl) == -1) { r = SNE_EPERM; goto __fail; } /* ok, now we are able to allocate and so on */ /* set connection to the batch mode */ co->flags |= SNSX_BATCHMODE; /* allocate our first buffer */ buf = mmap(NULL, 65536, PROT_READ|PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); - if(buf == MAP_FAILED) goto __fail2; + if(buf == MAP_FAILED) { r = SNE_ENOMEM; goto __fail2; } /* allocate first message */ - if(!(msg = malloc(sizeof(sx_msg_t)))) goto __fail2; + if(!(msg = malloc(sizeof(sx_msg_t)))) { r = SNE_ENOMEM; goto __fail2; } else { memset(msg, 0, sizeof(sx_msg_t)); coc->messages[0] = msg; @@ -263,7 +562,7 @@ int connection_create_fapi_m(conn_sys_t *ssys, conn_t *co, int sck, head = (sntllv2_head_t *)buf; /* check for returns */ - if(head->opcode != SNE_SUCCESS) goto __fail3; + if(head->opcode != SNE_SUCCESS) { r = head->opcode; goto __fail3; } else { /* opcode is fine */ /* if we're ready for messaging mode, turn off batch mode */ if(co->flags & SNSX_MESSAGINGMODE) co->flags &= ~SNSX_BATCHMODE; @@ -272,7 +571,7 @@ int connection_create_fapi_m(conn_sys_t *ssys, conn_t *co, int sck, if(!head->payload_length) continue; /* pass the following check up */ rd = __conn_read(co, bbuf, head->payload_length); - if(rd != head->payload_length) goto __fail3; + if(rd != head->payload_length) { r = SNE_LINKERROR; goto __fail3; } bbuf[rd] = '\0'; sx = parse_sexp(bbuf, rd); if(!sx) goto __fail3; @@ -296,17 +595,35 @@ int connection_create_fapi_m(conn_sys_t *ssys, conn_t *co, int sck, } destroy_sexp(sx); - } else goto __fail3; + } else { r = SNE_LINKERROR; goto __fail3; } } /* if we're there - negotiation is done, going to init messaging mode */ r = __connection_second_alloc(co); - if(r != SNE_SUCCESS) goto __fail3; + if(r != SNE_SUCCESS) goto __fail3; /* and now we're need to create a thread poll */ + if(!(bundle = malloc(sizeof(sntllv2_bundle_t)))) { r = SNE_ENOMEM; goto __fail4; } + else { + bundle->buf = buf; + bundle->conn = co; + } + for(i = 0; i < 8; i++) { + if(bundle == 0xdead) bundle = __sntll_bundle_create(co); + if(!bundle) goto __fail5; + r = pthread_create(&thrd_poll[i], NULL, __sntll_thread, bundle); + if(r) goto __fail5; + else bundle = 0xdead; + } + + /* all is done, connection now ready */ + co->flags |= SNSX_ALIVE; return SNE_SUCCESS; + __fail5: + r = SNE_ENOMEM; + /* bundles will be freed by the threads when SSL_read will fails. */ __fail4: __connection_second_free(co); __fail3: @@ -322,5 +639,5 @@ int connection_create_fapi_m(conn_sys_t *ssys, conn_t *co, int sck, __connection_minimal_free(coc); } close(sck); - return SNE_FAILED; + return r; }