From cdee55108b1b52f83654fd4d25f41f2a9e8f6056 Mon Sep 17 00:00:00 2001 From: Alexander Vdolainen Date: Wed, 3 Dec 2014 03:05:45 +0200 Subject: [PATCH] safe commit; --- include/sntl/connection.h | 6 +++ lib/connection.c | 107 +++++++++++++++++++++++++++++++++++--- 2 files changed, 105 insertions(+), 8 deletions(-) diff --git a/include/sntl/connection.h b/include/sntl/connection.h index 58aa50f..572558c 100644 --- a/include/sntl/connection.h +++ b/include/sntl/connection.h @@ -22,6 +22,11 @@ #include +/* error codes */ +#define SXOREPLYREQ 44 /* protocol require reply with expression, + * or expression return for the request */ +#define SXOTIMEDOUT 45 /* timedout */ + /* sexp helpers */ #define SEXP_IS_LIST(sx) \ ((sx)->ty == SEXP_LIST) ? 1 : 0 @@ -116,6 +121,7 @@ typedef struct __sexp_payload_t { #define ESXMSG_PULSE (1 << 6) #define ESXMSG_NOWAIT (1 << 7) #define ESXMSG_ISREPLY (1 << 8) +#define ESXMSG_CLOSURE (1 << 9) typedef struct __message_t { chnl_t *pch; /** < channel of the message(if applicable) */ diff --git a/lib/connection.c b/lib/connection.c index cedb4f8..b32d2a8 100644 --- a/lib/connection.c +++ b/lib/connection.c @@ -780,8 +780,21 @@ static void *__msg_queue_thread(void *ctx) /* now we need to complete the request */ sx = (sexp_t *)msg->payload; tb = buf; if(!(msg->flags & ESXMSG_PULSE)) { /* %s))) */ - snprintf(buf, 4096, "(ch-msg (:chid %lu (:msgid %lu ", ch->cid, - msg->mid); + if(!(msg->flags & ESXMSG_ISREPLY)) + snprintf(buf, 4096, "(ch-msg (:chid %lu (:msgid %lu ", ch->cid, + msg->mid); + else { + if(!sx) { + snprintf(buf, 4096, "(ch-msg-rete (:chid %lu (:msgid %lu (%d))))", ch->cid, + msg->mid, msg->opcode); + /* mark it to close */ + msg->flags |= ESXMSG_CLOSURE; + /* ok, here we will write it and wait, destroying dialog while reply */ + goto __ssl_write; + } else snprintf(buf, 4096, "(ch-msg-repl (:chid %lu (:msgid %lu ", ch->cid, + msg->mid); + } + len = strlen(buf); tb += len*sizeof(char); if(print_sexp(tb, 4096 - (len + 4*sizeof(char)), sx)) { @@ -798,11 +811,31 @@ static void *__msg_queue_thread(void *ctx) } len = strlen(tb); tb += len*sizeof(char); - strcat(tb, ")))\0"); + strcat(tb, ")))"); } else { /* pulse messages */ - tb = tb; + /* here we're shouldn't process reply procedure */ + snprintf(buf, 4096, "(ch-msg-pulse (:chid %lu (:msgid %lu ", ch->cid, + msg->mid); + len = strlen(buf); /* FIXME: code double shit ! */ + tb += len*sizeof(char); + if(print_sexp(tb, 4096 - (len + 4*sizeof(char)), sx)) { + msg->opcode = ENOMEM; + /* we don't need to wake up anybody */ + if(msg->flags & ESXMSG_TIMEDOUT) { + /* clean up all the shit: + * 1. remove from message tree + * 2. destroy message itself + */ + /* TODO: destroy the message */ + } else + pthread_mutex_unlock(&(msg->wait)); + } + len = strlen(tb); + tb += len*sizeof(char); + strcat(tb, ")))"); } + __ssl_write: /* write it */ pthread_mutex_lock(&(co->oplock)); /* exclusive write */ SSL_write(co->ssl, (void *)buf, strlen(buf) + sizeof(char)); /* TODO: SSL*/ @@ -1493,7 +1526,7 @@ int channel_open(conn_t *co, chnl_t **ch, int type) } else { /* put the channel to the channels search tree */ pthread_rwlock_wrlock(&(co->chnl_lock)); - printf("inserting cid = %d\n", nch->cid); + printf("inserting cid = %d\n", nch->cid); usrtc_insert(co->chnl_tree, &nch->node, &nch->cid); pthread_rwlock_unlock(&(co->chnl_lock)); @@ -1557,6 +1590,9 @@ static int __message_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec { int r = 0; sxmsg_t *m = NULL; + conn_t *co = ch->connection; + + *msg = NULL; r = __create_reg_msg(&m, ch); if(r) return r; @@ -1565,7 +1601,30 @@ static int __message_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec pthread_rwlock_wrlock(&(ch->msglock)); usrtc_insert(ch->msgs_tree, &(m->pendingq_node), &(m->mid)); pthread_rwlock_unlock(&(ch->msglock)); + + /* message assign */ + m->opcode = 0; + m->payload = (void *)sx; + /* put the message to the run queue */ + r = pth_queue_add(co->mqueue, (void *)m, USR_MSG); + if(r) return r; /* FIXME: better give up */ + + if(m->flags & ESXMSG_PENDING) { + if(!tio) pthread_mutex_lock(&(m->wait)); + else pthread_mutex_timedlock(&(m->wait), tio); + } + + if(tio && (m->flags & ESXMSG_PENDING)) + return SXOTIMEDOUT; + + if(!m->payload) { + /* TODO: destroy the message */ + r = m->opcode; + } else { + *msg = m; + r = SXOREPLYREQ; + } } return r; @@ -1581,19 +1640,51 @@ int msg_send_timed(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec *tio) return __message_send(ch, sx, msg, tio); } +static int __msg_reply(sxmsg_t *msg, sexp_t *sx, struct timespec *tio, int opcode) +{ + int r = 0; + chnl_t *ch = msg->pch; + conn_t *co = ch->connection; + + msg->payload = sx; + msg->opcode = opcode; + msg->flags |= ESXMSG_PENDING; /* pending */ + msg->flags |= ESXMSG_ISREPLY; /* this is a reply */ + + /* put the message to the queue */ + r = pth_queue_add(co->mqueue, (void *)msg, USR_MSG); + if(r) return r; /* FIXME: better give up */ + + if(msg->flags & ESXMSG_PENDING) { + if(!tio) pthread_mutex_lock(&(msg->wait)); + else pthread_mutex_timedlock(&(msg->wait), tio); + } + + if(tio && (msg->flags & ESXMSG_PENDING)) + return SXOTIMEDOUT; + + r = msg->opcode; + + if(msg->flags & ESXMSG_CLOSURE) { + /* TODO: destroy message */ + } + + return r; +} + int msg_return(sxmsg_t *msg, int opcode) { - return 0; + return __msg_reply(msg, NULL, NULL, opcode); } int msg_reply(sxmsg_t *msg, sexp_t *sx) { - return 0; + return __msg_reply(msg, sx, NULL, 0); } int msg_reply_timed(sxmsg_t *msg, sexp_t *sx, struct timespec *tio) { - return 0; + return __msg_reply(msg, sx, tio, 0); } int msg_send_pulse(chnl_t *ch, sexp_t *sx)