From c86846e81b3f9abd7427297dc9cb82e91a8d93a5 Mon Sep 17 00:00:00 2001 From: Alexander Vdolainen Date: Thu, 18 Dec 2014 01:55:14 +0200 Subject: [PATCH] bug fixes, added implementation, some debug; --- include/sntl/connection.h | 10 +++ lib/connection.c | 136 +++++++++++++++++++++++++++++++++++--- 2 files changed, 136 insertions(+), 10 deletions(-) diff --git a/include/sntl/connection.h b/include/sntl/connection.h index c9a05de..e16ec6f 100644 --- a/include/sntl/connection.h +++ b/include/sntl/connection.h @@ -66,6 +66,11 @@ typedef struct __perm_context_type { #define CXCONN_SLAVE (1 << 2) #define CXCONN_ESTABL (1 << 3) +/* + * älä jätä kommentteja omalla kielellä! yksinkertaisia englanti sijaan! + * i found somebody who write comments and messages in non-english, + * itäs a fucking practice - forget it. + */ typedef struct __connection_t { char *uuid; /** < uuid of the connection */ idx_allocator_t *idx_ch; /** < index allocation for channels */ @@ -76,8 +81,10 @@ typedef struct __connection_t { int ssl_data_index; /** < SSL index for the custom data */ perm_ctx_t *pctx; /** < higher layer authentification context */ pthread_t cthread; /** < thread for listening the connection socket */ + pthread_t rmsgthread; /** < thread for message queue (1) */ pthread_t msgthread; /** < thread for message queue (2) */ pth_queue_t *mqueue; /** < message queue (2) */ + pth_queue_t *rqueue; /** < message queue (1) */ pthread_mutex_t oplock; /** < mutex used to sync operations on connection */ pthread_rwlock_t chnl_lock; /** < rwlock used to sync ops with channels */ int flags; /** < flags of the connection */ @@ -242,5 +249,8 @@ int sntl_rpclist_add_function(usrtc_t *tree, int type, const char *fu_name, int sntl_rpclist_filter(usrtc_t *source, usrtc_t **dest, int flag, int *filter); +/* for DEBUG purposes */ +#define __DBGLINE fprintf(stderr, "%s:%d at %s\n", __FILE__, __LINE__, __FUNCTION__) + #endif /* __ESXC_CONNECTION_H_ */ diff --git a/lib/connection.c b/lib/connection.c index a4f0b5a..fde32b3 100644 --- a/lib/connection.c +++ b/lib/connection.c @@ -31,6 +31,10 @@ conn_sys_t *conn_sys = NULL; static long __cmp_ulong(const void *a, const void *b); +/* message alloc and destroy */ +static sxmsg_t *__allocate_msg(int *res); +static void __destroy_msg(sxmsg_t *msg); + int __alloc_channel(ulong_t cid, conn_t *co, rpc_typed_list_t *rlist, chnl_t **channel) { int r = 0; @@ -631,6 +635,30 @@ static int __default_ch_close(void *cctx, sexp_t *sx) return 0; } +/* create a nould of the message */ +static int __create_reg_msg_mould(sxmsg_t **msg, chnl_t *ch, ulong_t mid) +{ + int r = 0; + sxmsg_t *sm = __allocate_msg(&r); + + if(r) return r; + else { + sm->pch = ch; + sm->flags = (ESXMSG_USR | ESXMSG_PENDING); + sm->mid = mid; + + /* ok reserve message ID */ + pthread_mutex_lock(&(ch->oplock)); + idx_reserve(ch->idx_msg, mid); + pthread_mutex_unlock(&(ch->oplock)); + + pthread_mutex_lock(&(sm->wait)); + *msg = sm; + } + + return 0; +} + static int __default_msg_pulse(void *cctx, sexp_t *sx) { return 0; @@ -643,7 +671,74 @@ static int __default_msg_pulse_ret(void *cctx, sexp_t *sx) static int __default_msg(void *cctx, sexp_t *sx) { - return 0; + conn_t *co = (conn_t *)cctx; + usrtc_node_t *node = NULL; + chnl_t *chan = NULL; + char *key; int r = 0; + sexp_t *lsx = NULL; + ulong_t chnl_id = 0; + ulong_t msg_id = 0x00; + sexp_t *msg = NULL; + sxmsg_t *smsg = NULL; + + /* get parameters from the message */ + if(sexp_list_cdr(sx, &lsx)) return EINVAL; + if(!SEXP_IS_LIST(lsx)) return EINVAL; + + /* FIXME: make it via iteraction, to cover the case with different arguments placement */ + key = lsx->list->val; + if(strcmp(key, ":chid")) return EINVAL; + lsx = lsx->list->next; + if(!lsx) return EINVAL; + if(!SEXP_IS_TYPE(lsx, SEXP_BASIC)) return EINVAL; + chnl_id = atol(lsx->val); + + lsx = lsx->next; + if(!SEXP_IS_LIST(lsx)) return EINVAL; + key = lsx->list->val; + if(strcmp(key, ":msgid")) return EINVAL; + lsx = lsx->list->next; + if(!lsx) return EINVAL; + if(!SEXP_IS_TYPE(lsx, SEXP_BASIC)) return EINVAL; + msg_id = atol(lsx->val); /* message id */ + + lsx = lsx->next; + if(!SEXP_IS_LIST(lsx)) return EINVAL; + msg = lsx; + + /* find channel */ + printf("chnl_id = %ld\n", chnl_id); + if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT; + else chan = (chnl_t *)usrtc_node_getdata(node); + /* lookup for the message */ + if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) { + /* here, rpc lookup has no sense, just put this job to the queue */ + /* btw, we're need to create a message first */ + r = __create_reg_msg_mould(&smsg, chan, msg_id); + if(r) return r; /* TODO: reply with right error code */ + + /* assign the message */ + smsg->opcode = 0; + smsg->payload = (void *)msg; + + /* put the message to the search tree */ + pthread_rwlock_wrlock(&(chan->msglock)); + usrtc_insert(chan->msgs_tree, &(smsg->pendingq_node), &(smsg->mid)); + pthread_rwlock_unlock(&(chan->msglock)); + } else { + /* TODO: reply fdr this message with an error EEXIST */ + return EEXIST; + } + + /* put job to the queue and give up */ + r = pth_queue_add(co->rqueue, (void *)msg, USR_MSG); + if(r) { /* cannot put job to the queue */ + /* TODO: remove message and reply with error code */ + return r; + } + + /* put to the IN queue */ + return r; } static int __default_msg_return(void *cctx, sexp_t *sx) @@ -705,7 +800,7 @@ static int __eval_cstr(char *cstr, cx_rpc_list_t *rpc_list, void *ctx) printf("rentry->rpcf = %p\n", rentry->rpcf); r = rentry->rpcf(ctx, sx); /* free s-expression */ - destroy_sexp(sx); + //destroy_sexp(sx); FIXME: i guess rpc call should take care of sx return r; } @@ -757,7 +852,7 @@ static void *__msg_queue_thread(void *ctx) if(buf) free(buf); return NULL; } - + __DBGLINE; while(1) { r = pth_queue_get(co->mqueue, NULL, tmp); if(r) { @@ -765,6 +860,7 @@ static void *__msg_queue_thread(void *ctx) free(tmp); return NULL; } + __DBGLINE; /* message workout */ msg = tmp->data; @@ -773,6 +869,7 @@ static void *__msg_queue_thread(void *ctx) if(!(msg->flags & ESXMSG_USR)) { /* not a regular message */ msg->flags |= ESXMSG_NOWAY; /* mark it's as undeliverable */ msg->flags &= ~ESXMSG_PENDING; + __DBGLINE; pthread_mutex_unlock(&(msg->wait)); /* wake up the waitee */ continue; } else { @@ -790,14 +887,15 @@ static void *__msg_queue_thread(void *ctx) /* mark it to close */ msg->flags |= ESXMSG_CLOSURE; /* ok, here we will write it and wait, destroying dialog while reply */ + __DBGLINE; goto __ssl_write; } else snprintf(buf, 4096, "(ch-msg-repl (:chid %lu (:msgid %lu ", ch->cid, msg->mid); } - + __DBGLINE; len = strlen(buf); tb += len*sizeof(char); - if(print_sexp(tb, 4096 - (len + 4*sizeof(char)), sx)) { + if(print_sexp(tb, 4096 - (len + 4*sizeof(char)), sx) == -1) { msg->opcode = ENOMEM; /* we don't need to wake up anybody */ if(msg->flags & ESXMSG_TIMEDOUT) { @@ -806,19 +904,22 @@ static void *__msg_queue_thread(void *ctx) * 2. destroy message itself */ /* TODO: destroy the message */ - } else + } else { + __DBGLINE; pthread_mutex_unlock(&(msg->wait)); + } } len = strlen(tb); tb += len*sizeof(char); strcat(tb, ")))"); + __DBGLINE; } else { /* pulse messages */ /* here we're shouldn't process reply procedure */ snprintf(buf, 4096, "(ch-msg-pulse (:chid %lu (:msgid %lu ", ch->cid, msg->mid); len = strlen(buf); /* FIXME: code double shit ! */ tb += len*sizeof(char); - if(print_sexp(tb, 4096 - (len + 4*sizeof(char)), sx)) { + if(print_sexp(tb, 4096 - (len + 4*sizeof(char)), sx) == -1) { msg->opcode = ENOMEM; /* we don't need to wake up anybody */ if(msg->flags & ESXMSG_TIMEDOUT) { @@ -836,6 +937,7 @@ static void *__msg_queue_thread(void *ctx) } __ssl_write: + __DBGLINE; /* write it */ pthread_mutex_lock(&(co->oplock)); /* exclusive write */ SSL_write(co->ssl, (void *)buf, strlen(buf) + sizeof(char)); /* TODO: SSL*/ @@ -1094,8 +1196,10 @@ int connection_initiate(conn_t *co, const char *host, int port, struct hostent *host_; struct sockaddr_in addr; usrtc_t *ch_tree, *rpc_tree; + pth_queue_t *mqueue = malloc(sizeof(pth_queue_t)); idx_allocator_t *idx_ch = malloc(sizeof(idx_allocator_t)); + /* TODO: check for mqueue allocation */ if(!co) return EINVAL; if(!host) return EINVAL; if(!SSL_cert) return EINVAL; @@ -1124,6 +1228,10 @@ int connection_initiate(conn_t *co, const char *host, int port, co->idx_ch = idx_ch; + /* assign message queue */ + pth_queue_init(mqueue); /* TODO: check for initialization */ + co->mqueue = mqueue; + /* init SSL certificates and context */ co->ctx = SSL_CTX_new(SSLv3_client_method()); if(!co->ctx) { ERR_print_errors_fp(stderr); @@ -1224,8 +1332,11 @@ int connection_initiate(conn_t *co, const char *host, int port, pthread_rwlock_wrlock(&conn_sys->rwlock); usrtc_insert(conn_sys->connections, &co->csnode, (void *)co->uuid); pthread_rwlock_unlock(&conn_sys->rwlock); - return 0; + //return 0; /* FIXME: */ } + r = pthread_create(&co->msgthread, NULL, __msg_queue_thread, (void *)co); + /* TODO: check for thread creation */ + return 0; } __fail_3: @@ -1594,9 +1705,11 @@ static int __message_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec *msg = NULL; + __DBGLINE; r = __create_reg_msg(&m, ch); if(r) return r; else { + __DBGLINE; /* put the message to the search tree */ pthread_rwlock_wrlock(&(ch->msglock)); usrtc_insert(ch->msgs_tree, &(m->pendingq_node), &(m->mid)); @@ -1608,16 +1721,19 @@ static int __message_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec /* put the message to the run queue */ r = pth_queue_add(co->mqueue, (void *)m, USR_MSG); + __DBGLINE; if(r) return r; /* FIXME: better give up */ if(m->flags & ESXMSG_PENDING) { + __DBGLINE; if(!tio) pthread_mutex_lock(&(m->wait)); else pthread_mutex_timedlock(&(m->wait), tio); + __DBGLINE; } - + __DBGLINE; if(tio && (m->flags & ESXMSG_PENDING)) return SXOTIMEDOUT; - + __DBGLINE; if(!m->payload) { /* TODO: destroy the message */ r = m->opcode;