diff --git a/include/sntl/connection.h b/include/sntl/connection.h index 054843a..58aa50f 100644 --- a/include/sntl/connection.h +++ b/include/sntl/connection.h @@ -71,6 +71,8 @@ typedef struct __connection_t { int ssl_data_index; /** < SSL index for the custom data */ perm_ctx_t *pctx; /** < higher layer authentification context */ pthread_t cthread; /** < thread for listening the connection socket */ + pthread_t msgthread; /** < thread for message queue (2) */ + pth_queue_t *mqueue; /** < message queue (2) */ pthread_mutex_t oplock; /** < mutex used to sync operations on connection */ pthread_rwlock_t chnl_lock; /** < rwlock used to sync ops with channels */ int flags; /** < flags of the connection */ @@ -106,10 +108,14 @@ typedef struct __sexp_payload_t { #define ESX_SYSMSG_SIZE 512 -#define ESXMSG_SYS (1 << 1) -#define ESXMSG_USR (1 << 2) -#define ESXMSG_PENDING (1 << 3) -#define ESXMSG_NOWAY (1 << 4) +#define ESXMSG_SYS (1 << 1) +#define ESXMSG_USR (1 << 2) +#define ESXMSG_PENDING (1 << 3) +#define ESXMSG_NOWAY (1 << 4) +#define ESXMSG_TIMEDOUT (1 << 5) +#define ESXMSG_PULSE (1 << 6) +#define ESXMSG_NOWAIT (1 << 7) +#define ESXMSG_ISREPLY (1 << 8) typedef struct __message_t { chnl_t *pch; /** < channel of the message(if applicable) */ diff --git a/lib/connection.c b/lib/connection.c index 6f1608d..dff7ccd 100644 --- a/lib/connection.c +++ b/lib/connection.c @@ -395,15 +395,15 @@ static int __default_ch_set_types(void *cctx, sexp_t *sx) if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) { return EINVAL; /* TODO: return correct error code, clean up*/ } else val = sx_in->val; - + if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */ - + sexp_list_cdr(sx_iter, &sx_in); - + if(!SEXP_IS_TYPE(sx_in, SEXP_DQUOTE)) { return EINVAL; /* TODO: return correct error code, clean up*/ } else var = sx_in->val; - + /* ok, now we need to analyze parameters */ if(*val != ':') { return EINVAL; /* TODO: clean up all the shit */ @@ -564,7 +564,7 @@ static int __default_ch_open_ret(void *cctx, sexp_t *sx) lsx = sx->list->next; /* now we expect a list of lists */ if(lsx->ty != SEXP_LIST) { - printf("%s:%d\n", __FUNCTION__, __LINE__); + //printf("%s:%d\n", __FUNCTION__, __LINE__); r = EINVAL; /* TODO: right opcode */ goto __mark_msg; } @@ -716,8 +716,6 @@ static void *__cxslave_thread_listener(void *wctx) char *buf = malloc(4096); int r; - printf("Slave listening thread\n"); - while((r = __conn_read(co, buf, 4096)) != -1) { if(r) printf("Got the message %s \n", buf); r = __eval_cstr(buf, conn_sys->system_rpc, co); @@ -734,8 +732,6 @@ static void *__cxmaster_thread_listener(void *wctx) char *buf = malloc(4096); int r; - printf("Master listening thread\n"); - while((r = __conn_read(co, buf, 4096)) != -1) { if(r) printf("Got the message %s \n", buf); r = __eval_cstr(buf, conn_sys->system_rpc, co); @@ -746,6 +742,81 @@ static void *__cxmaster_thread_listener(void *wctx) return NULL; } +static void *__msg_queue_thread(void *ctx) +{ + conn_t *co = (conn_t *)ctx; + pth_msg_t *tmp = malloc(sizeof(pth_msg_t)); + sxmsg_t *msg; + chnl_t *ch; + int r = 0, len; + char *buf = malloc(4096), *tb; + sexp_t *sx; + + if(!tmp || !buf) { + if(tmp) free(tmp); + if(buf) free(buf); + return NULL; + } + + while(1) { + r = pth_queue_get(co->mqueue, NULL, tmp); + if(r) { + free(buf); + free(tmp); + return NULL; + } + + /* message workout */ + msg = tmp->data; + if(!msg) continue; /* spurious message */ + + if(!(sysmsg->flags & ESXMSG_USR)) { /* not a regular message */ + msg->flags |= ESXMSG_NOWAY; /* mark it's as undeliverable */ + msg->flags &= ~ESXMSG_PENDING; + pthread_mutex_unlock(&(msg->wait)); /* wake up the waitee */ + continue; + } else { + ch = msg->pch; + /* now we need to complete the request */ + sx = (sexp_t *)msg->payload; tb = buf; + if(!(msg->flags & ESXMSG_PULSE)) { /* %s))) */ + snprintf(buf, 4096, "(ch-msg (:chid %lu (:msgid %lu ", ch->cid, + msg->mid); + len = strlen(buf); + tb += len*sizeof(char); + if(print_sexp(tb, 4096 - (len + 4*sizeof(char)), sx)) { + sx->opcode = ENOMEM; + /* we don't need to wake up anybody */ + if(sx->flags & ESXMSG_TIMEDOUT) { + /* clean up all the shit: + * 1. remove from message tree + * 2. destroy message itself + */ + /* TODO: destroy the message */ + } else + pthread_mutex_unlock(&(msg->wait)); + } + len = strlen(tb); + tb += len*sizeof(char); + strcat(tb, ")))\0"); + } else { /* pulse messages */ + } + + /* write it */ + pthread_mutex_lock(&(co->oplock)); /* exclusive write */ + SSL_write(co->ssl, (void *)buf, strlen(buf) + sizeof(char); /* TODO: SSL*/ + pthread_mutex_unlock(&(co->oplock)); + } + + len = 0; + } + + free(msg); + free(buf); + + return NULL; +} + /* this function is an ugly implementation to get C string with uuid */ static char *__generate_uuid(void) { @@ -846,7 +917,7 @@ void *__system_queue_listener(void *data) if(!(sysmsg->flags & ESXMSG_SYS)) { /* not a system message */ sysmsg->flags |= ESXMSG_NOWAY; /* mark it's as undeliverable */ - sysmsg->flags &= ~ESXMSG_PENDING; + sysmsg->flags &= ~ESXMSG_PENDING; pthread_mutex_unlock(&(sysmsg->wait)); /* wake up the waitee */ continue; } else { @@ -919,7 +990,6 @@ int connections_subsystem_init(void) /* init SSL library */ SSL_library_init(); - printf("here\n"); OpenSSL_add_all_algorithms(); SSL_load_error_strings(); @@ -1307,6 +1377,44 @@ static sxmsg_t *__allocate_msg(int *res) return msg; } +static void __destroy_msg(sxmsg_t *msg) +{ + chnl_t *ch = msg->nch; + + if(sm->flags & ESXMSG_USR) { + pthread_mutex_lock(&(ch->oplock)); + idx_free(ch->idx_msg, sm->mid); + pthread_mutex_unlock(&(ch->oplock)); + } + + pthread_mutex_unlock(&(msg->wait)); + pthread_mutex_destroy(&(msg->wait)); + free(msg); + return; +} + +static int __create_reg_msg(sxmsg_t **msg, chnl_t *ch) +{ + int r = 0; + sxmsg_t *sm = __allocate_msg(&r); + + if(r) return r; + else { + sm->pch = ch; + sm->flags = (ESXMSG_USR | ESXMSG_PENDING); + + /* ok allocate message ID */ + pthread_mutex_lock(&(ch->oplock)); + sm->mid = idx_allocate(ch->idx_msg); + pthread_mutex_unlock(&(ch->oplock)); + + pthread_mutex_lock(&(sm->wait)); + *msg = sm; + } + + return 0; +} + static int __create_sys_msg(sxmsg_t **msg, char *uuid, chnl_t *ch, sxpayload_t *data) { int r = 0; @@ -1436,11 +1544,29 @@ int channel_close(conn_t *co) } /* message passing */ + +/* + * How message sending works: + * 1. Create a message structure assigned to the channel, + * 2. Put S-expression context to it + * 3. Put the message to the queue + * 4. expect the result waiting on the lock mutex + */ static int __message_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec *tio) { int r = 0; sxmsg_t *m = NULL; + r = __create_reg_msg(&m, ch); + if(r) return r; + else { + /* put the message to the search tree */ + pthread_rwlock_wrlock(&(ch->msglock)); + usrtc_insert(ch->msgs_tree, &(m->pendinq_node), &(m->mid)); + pthread_rwlock_unlock(&(ch->msglock)); + /* put the message to the run queue */ + } + return r; }