diff --git a/include/sntl/connection.h b/include/sntl/connection.h index c5cd7b5..e8a2bcd 100644 --- a/include/sntl/connection.h +++ b/include/sntl/connection.h @@ -23,12 +23,13 @@ #include /* error codes */ -#define SXOREPLYREQ 44 /* protocol require reply with expression, - * or expression return for the request */ -#define SXOTIMEDOUT 45 /* timedout */ -#define ESXRCBADPROT 46 /* invalid protocol */ -#define ESXNOCONNECT 47 /* connection is lost */ -#define ESXNOCHANSUP 48 +#define ESXOREPLYREQ 44 /* protocol require reply with expression, + * or expression return for the request */ +#define ESXOTIMEDOUT 45 /* timedout */ +#define ESXRCBADPROT 46 /* invalid protocol */ +#define ESXNOCONNECT 47 /* connection is lost */ +#define ESXNOCHANSUP 48 +#define ESXRAPIDREPLY 49 /* sexp helpers */ #define SEXP_IS_LIST(sx) \ @@ -136,6 +137,7 @@ typedef struct __sexp_payload_t { #define ESXMSG_CLOSURE (1 << 9) #define ESXMSG_RMONRETR (1 << 10) #define ESXMSG_KILLTHRD (1 << 11) +#define ESXMSG_ISRAPID (1 << 12) /** * \brief Message used in sntl message passing @@ -147,8 +149,6 @@ typedef struct __message_t { chnl_t *pch; /** < channel of the message(if applicable) */ ulong_t mid; /** < unique ID within connection context */ char *uuid; /** < UUID of the message, used for special messages */ - usrtc_node_t chnl_node; /** < node for channel search tree */ - usrtc_node_t poll_node; /** < node for the poll of the messages */ usrtc_node_t pendingq_node; /** < node for the pending queue */ pthread_mutex_t wait; /** < special wait mutex, used for sync */ void *payload; /** < payload */ @@ -251,6 +251,12 @@ int msg_reply(sxmsg_t *msg, sexp_t *sx); int msg_reply_timed(sxmsg_t *msg, sexp_t *sx, struct timespec *tio); +/* reply with S expression without confirmation of delivery and applying */ +int msg_reply_rapid(sxmsg_t *msg, sexp_t *sx); + +/* this is required to clean the message in case if it's a rapid message */ +int msg_rapid_clean(sxmsg_t *msg); + int msg_send_pulse(chnl_t *ch, sexp_t *sx); int msg_send_pulse_timed(chnl_t *ch, sexp_t *sx, struct timespec *tio); diff --git a/lib/connection.c b/lib/connection.c index 4b94228..e5989c9 100644 --- a/lib/connection.c +++ b/lib/connection.c @@ -70,6 +70,7 @@ static int __rpc_callback(void *data) int rc = 0; rc = job->rpcf((void *)(job->msg), job->sx); + free(job); return rc; @@ -81,47 +82,61 @@ extern int __alloc_channel(ulong_t cid, conn_t *co, rpc_typed_list_t *rlist, static int __conn_read(conn_t *co, void *buf, size_t buf_len) { int rfd = SSL_get_fd(co->ssl), r; - fd_set readset; - - //fprintf(stderr, "\tListening ... on %s\n", co->uuid); - /* get prepare to select */ - FD_ZERO(&readset); - FD_SET(rfd, &readset); - /* waits until something will be ready to read */ - r = select(FD_SETSIZE, &readset, NULL, NULL, NULL); - if(r < 0) { - printf("select (%d)\n", errno); - return -1; - } - if(!r) { - printf("Nothing to wait for\n"); - return 0; + fd_set readset, writeset; + int ofcmode, read_blocked = 0, read_blocked_on_write = 0; + + /* First we make the socket nonblocking */ + ofcmode = fcntl(rfd, F_GETFL,0); + ofcmode |= O_NDELAY; + if(fcntl(rfd, F_SETFL, ofcmode)) + fprintf(stderr, "Couldn't make socket nonblocking"); + + __retry: + + do { + r = SSL_read(co->ssl, buf, (int)buf_len); + switch(SSL_get_error (co->ssl, r)) { + case SSL_ERROR_NONE: + return r; + break; + case SSL_ERROR_WANT_READ: + /* get prepare to select */ + read_blocked = 1; + break; + case SSL_ERROR_WANT_WRITE: /* here we blocked on write */ + read_blocked_on_write = 1; + break; + default: + fprintf(stderr, "(RD)Unknown error on %s\n", co->uuid); + return -1; + } + } while(SSL_pending(co->ssl) && !read_blocked); + + if(read_blocked) { + FD_ZERO(&readset); + FD_SET(rfd, &readset); + /* waits until something will be ready to read */ + r = select(rfd + 1, &readset, NULL, NULL, NULL); + if(r < 0) { + printf("select (%d)\n", errno); + return -1; + } + if(!r) { + printf("Nothing to wait for\n"); + return 0; + } + read_blocked = 0; + if(r && FD_ISSET(rfd, &readset)) goto __retry; /* try to read again */ } + if(read_blocked_on_write) { /* we was blocked on write */ + FD_ZERO(&readset); + FD_ZERO(&writeset); + FD_SET(rfd, &readset); + FD_SET(rfd, &writeset); - if(r && FD_ISSET(rfd, &readset)) { - do { - //pthread_mutex_lock(&(co->oplock)); - /* ok, now we're ready to perform SSL_read */ - r = SSL_read(co->ssl, buf, (int)buf_len); - switch(SSL_get_error(co->ssl, r)) { - case SSL_ERROR_NONE: - //printf("Read done (f:%d)\n", rfd); - /* this is means we're get ridden it all */ - return r; break; - case SSL_ERROR_ZERO_RETURN: - printf("No data to read\n"); - /* no data to read ... */ - return 0; break; - case SSL_ERROR_WANT_READ: - case SSL_ERROR_WANT_WRITE: - printf("Bypass until SSL buffer not ready.\n"); - return 0; - default: /* seems the connection lost */ - fprintf(stderr, "(RD)Unknown error on %s\n", co->uuid); - return -1; - } - //pthread_mutex_unlock(&(co->oplock)); - } while(SSL_pending(co->ssl)); + r = select(rfd + 1, &readset, &writeset, NULL, NULL); + read_blocked_on_write = 0; + if(r && FD_ISSET(rfd, &writeset)) goto __retry; } return 0; @@ -129,24 +144,29 @@ static int __conn_read(conn_t *co, void *buf, size_t buf_len) static int __conn_write(conn_t *co, void *buf, size_t buf_len) { - int r; + int r, rfd = SSL_get_fd(co->ssl); + fd_set writeset; - pthread_mutex_lock(&(co->oplock)); + //pthread_mutex_lock(&(co->oplock)); __retry: r = SSL_write(co->ssl, buf, (int)buf_len); switch(SSL_get_error(co->ssl, r)) { case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_WRITE: - goto __retry; + /* here we should block */ + FD_ZERO(&writeset); + FD_SET(rfd, &writeset); + r = select(rfd + 1, NULL, &writeset, NULL, NULL); + if(r && FD_ISSET(rfd, &writeset)) goto __retry; break; default: - pthread_mutex_unlock(&(co->oplock)); + //pthread_mutex_unlock(&(co->oplock)); if(r < 0) { fprintf(stderr, "(WR)Unknown error on %s (%d)\n", co->uuid, r); return -1; } else return 0; } - pthread_mutex_unlock(&(co->oplock)); + //pthread_mutex_unlock(&(co->oplock)); return 0; } @@ -861,7 +881,7 @@ static int __default_ch_close_ret(void *cctx, sexp_t *sx) /* take length of the list */ llen = sexp_list_length(lsx); if(!llen) return EINVAL; /* !! other side will not set any security attributes */ - ; + SEXP_ITERATE_LIST(lsx, sx_iter, idx) { if(SEXP_IS_LIST(sx_iter)) { sexp_list_car(sx_iter, &sx_in); @@ -892,7 +912,7 @@ static int __default_ch_close_ret(void *cctx, sexp_t *sx) } } else continue; /* ignore */ } - ; + /* try to find desired channel to intercept message */ pthread_rwlock_rdlock(&(co->chnl_lock)); node = usrtc_lookup(co->chnl_tree, (void *)&id); @@ -902,9 +922,9 @@ static int __default_ch_close_ret(void *cctx, sexp_t *sx) chan = (chnl_t *)usrtc_node_getdata(node); sms = chan->sysmsg; } - ; + __mark_msg: - ; + if(!sms) return r; sms->flags &= ~ESXMSG_PENDING; /* the message is done */ sms->opcode = err; @@ -1134,7 +1154,6 @@ static int __default_msg(void *cctx, sexp_t *sx) usrtc_insert(chan->msgs_tree, &(smsg->pendingq_node), &(smsg->mid)); pthread_rwlock_unlock(&(chan->msglock)); } else { - //printf(">>>>>>>>>>>>>>>>>>>msg_id = %lu\n", msg_id); pthread_rwlock_unlock(&(chan->msglock)); smsg = (sxmsg_t *)usrtc_node_getdata(node); msg_return(smsg, EEXIST); @@ -1230,7 +1249,7 @@ static int __default_msg_return(void *cctx, sexp_t *sx) r = ESXRCBADPROT; goto __finish; } - //printf("chnl_id = %ld\n", chnl_id); + if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT; else chan = (chnl_t *)usrtc_node_getdata(node); /* lookup for the message */ @@ -1256,8 +1275,114 @@ static int __default_msg_return(void *cctx, sexp_t *sx) usrtc_delete(chan->msgs_tree, &(smsg->pendingq_node)); pthread_rwlock_unlock(&(chan->msglock)); } - if(smsg->flags & ESXMSG_PENDING) pthread_mutex_unlock(&(smsg->wait)); - else { + if(smsg->flags & ESXMSG_PENDING) { + destroy_sexp(sx); + pthread_mutex_unlock(&(smsg->wait)); + return r; + } else { + /* nobody want it */ + destroy_sexp(smsg->initial_sx); + __destroy_msg(smsg); + } + } + +__finish: + destroy_sexp(sx); + return r; +} + +static int __default_msg_rapid(void *cctx, sexp_t *sx) +{ + conn_t *co = (conn_t *)cctx; + usrtc_node_t *node = NULL; + chnl_t *chan = NULL; + int r = 0; + sexp_t *lsx = NULL, *sx_iter = NULL; + sexp_t *sx_sublist = NULL, *sx_value = NULL; + ulong_t chnl_id = -1; + ulong_t msg_id = -1; + sexp_t *msg = NULL; + sxmsg_t *smsg = NULL; + int idx; + + /* get parameters from the message */ + if(sexp_list_cdr(sx, &lsx)) { + r = ESXRCBADPROT; + goto __finish; + } + if(!SEXP_IS_LIST(lsx)) { + r = ESXRCBADPROT; + goto __finish; + } + + /* get parameters */ + SEXP_ITERATE_LIST(lsx, sx_iter, idx) { + if(SEXP_IS_LIST(sx_iter)) { + sx_sublist = sx_iter; + continue; + } else { + if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { + if(strcmp(sx_iter->val, ":chid")) { + continue; // ignore it + } + sx_value = sx_iter->next; + if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { + continue; + } + chnl_id = atol(sx_value->val); + } else continue; // ignore it + } + } + lsx = sx_sublist; + /* find message id */ + SEXP_ITERATE_LIST(lsx, sx_iter, idx) { + if(SEXP_IS_LIST(sx_iter)) { + msg = sx_iter; + continue; + } else { + if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) { + if(strcmp(sx_iter->val, ":msgid")) { + continue; // ignore + } + sx_value = sx_iter->next; + if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) { + continue; + } + msg_id = atol(sx_value->val); + } else continue; // ignore it + } + } + + if(msg_id < 0 || chnl_id < 0) { + r = ESXRCBADPROT; + goto __finish; + } + + if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT; + else chan = (chnl_t *)usrtc_node_getdata(node); + /* lookup for the message */ + pthread_rwlock_rdlock(&(chan->msglock)); + if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) { + pthread_rwlock_unlock(&(chan->msglock)); + r = ENOENT; + goto __finish; + } else { + pthread_rwlock_unlock(&(chan->msglock)); + smsg = (sxmsg_t *)usrtc_node_getdata(node); + smsg->opcode = ESXRAPIDREPLY; + smsg->payload = copy_sexp(msg); + smsg->flags |= ESXMSG_ISREPLY; + /* are we'are ready to remove this ? */ + if(smsg->flags & ESXMSG_RMONRETR) { + pthread_rwlock_wrlock(&(chan->msglock)); + usrtc_delete(chan->msgs_tree, &(smsg->pendingq_node)); + pthread_rwlock_unlock(&(chan->msglock)); + } + if(smsg->flags & ESXMSG_PENDING) { + destroy_sexp(sx); + pthread_mutex_unlock(&(smsg->wait)); + return r; + } else { /* nobody want it */ destroy_sexp(smsg->initial_sx); __destroy_msg(smsg); @@ -1347,7 +1472,7 @@ static int __default_msg_reply(void *cctx, sexp_t *sx) } else { pthread_rwlock_unlock(&(chan->msglock)); smsg = (sxmsg_t *)usrtc_node_getdata(node); - smsg->opcode = SXOREPLYREQ; + smsg->opcode = ESXOREPLYREQ; smsg->payload = copy_sexp(msg); smsg->flags |= ESXMSG_ISREPLY; pthread_mutex_unlock(&(smsg->wait)); @@ -1377,6 +1502,7 @@ static int __init_systemrpc_tree(usrtc_t *rtree) if(__insert_rpc_function(rtree, "ch-msg-pulse-ret", __default_msg_pulse_ret)) goto __fail; if(__insert_rpc_function(rtree, "ch-msg", __default_msg)) goto __fail; if(__insert_rpc_function(rtree, "ch-msg-rete", __default_msg_return)) goto __fail; + if(__insert_rpc_function(rtree, "ch-msg-rapid", __default_msg_rapid)) goto __fail; if(__insert_rpc_function(rtree, "ch-msg-repl", __default_msg_reply)) goto __fail; return 0; @@ -1442,7 +1568,6 @@ static void *__cxmaster_thread_listener(void *wctx) while((r = __conn_read(co, buf, 4096)) != -1) { buf[r] = '\0'; - // if(r) printf("Got the message %s (%d bytes)\n", buf, r); r = __eval_cstr(buf, conn_sys->system_rpc, co); } co->flags &= ~CXCONN_ESTABL; @@ -1499,7 +1624,6 @@ static void *__rmsg_queue_thread(void *ctx) /* get the function name */ if(sx->ty == SEXP_LIST) rpcf = sx->list->val; else { - //rpcf = sx->val; r = ESXRCBADPROT; goto __err_ret; } @@ -1562,6 +1686,7 @@ static void *__msg_queue_thread(void *ctx) continue; } else { ch = msg->pch; + /* now we need to complete the request */ sx = (sexp_t *)msg->payload; tb = buf; if(!(msg->flags & ESXMSG_PULSE)) { /* %s))) */ @@ -1577,8 +1702,15 @@ static void *__msg_queue_thread(void *ctx) /* ok, here we will write it and wait, destroying dialog while reply */ goto __ssl_write; - } else snprintf(buf, 4096, "(ch-msg-repl (:chid %lu (:msgid %lu ", ch->cid, - msg->mid); + } else { + if(msg->flags & ESXMSG_ISRAPID) { + msg->flags |= ESXMSG_CLOSURE; + snprintf(buf, 4096, "(ch-msg-rapid (:chid %lu (:msgid %lu ", ch->cid, + msg->mid); + } else + snprintf(buf, 4096, "(ch-msg-repl (:chid %lu (:msgid %lu ", ch->cid, + msg->mid); + } } len = strlen(buf); @@ -1629,13 +1761,15 @@ static void *__msg_queue_thread(void *ctx) __ssl_write: if(msg->flags & ESXMSG_CLOSURE) { + /* wake up it */ + pthread_mutex_unlock(&(msg->wait)); /* first remove the message from tree */ pthread_rwlock_wrlock(&(ch->msglock)); usrtc_delete(ch->msgs_tree, &(msg->pendingq_node)); pthread_rwlock_unlock(&(ch->msglock)); /* destroy */ destroy_sexp(msg->initial_sx); - if(msg->flags & ESXMSG_ISREPLY && msg->payload) + if(msg->flags & ESXMSG_ISREPLY && msg->payload) destroy_sexp((sexp_t *)msg->payload); __destroy_msg(msg); } @@ -1646,6 +1780,7 @@ static void *__msg_queue_thread(void *ctx) co->flags |= CXCONN_BROKEN; __wake_up_waiters(co, ESXNOCONNECT); } + } len = 0; @@ -2024,12 +2159,14 @@ int connection_initiate(conn_t *co, const char *host, int port, /* now we will create an SSL connection */ co->ssl = SSL_new(co->ctx); SSL_set_fd(co->ssl, sd); /* attach connected socket */ + BIO_set_nbio(SSL_get_rbio(co->ssl), 1); if(SSL_connect(co->ssl) == -1) { r = EBADE; free(buf); /* shutdown connection */ goto __fail_3; } /* if success we're ready to use established SSL channel */ + BIO_set_nbio(SSL_get_rbio(co->ssl), 1); /* auth and RPC contexts sync */ co->pctx = pctx; snprintf(buf, __TMPBUFLEN, "(auth-set-context ((:user \"%s\")(:passwd \"%s\")))", @@ -2201,6 +2338,7 @@ int connection_create(conn_t *co, int sck) SSL_set_fd(co->ssl, sck); /* attach connected socket */ /* set the context to verify ssl connection */ SSL_set_ex_data(co->ssl, conn_sys->ex_ssldata_index, (void *)co); + BIO_set_nbio(SSL_get_rbio(co->ssl), 1); if(SSL_accept(co->ssl) == -1) { r = EBADE; free(buf); @@ -2217,10 +2355,8 @@ int connection_create(conn_t *co, int sck) if(bytes > 0) { buf[bytes] = 0; r = __eval_cstr(buf, conn_sys->system_rpc, (void *)co); - printf("%s return %d (bytes %d)\n", buf, r, bytes); if(r) goto __fail_3; } else { - printf("bytes = %d\n", bytes); if(bytes < 0) { printf("Terminate SSL connection, the other end is lost.\n"); co->flags &= ~CXCONN_ESTABL; diff --git a/lib/message.c b/lib/message.c index 39818bf..9496f22 100644 --- a/lib/message.c +++ b/lib/message.c @@ -62,8 +62,6 @@ sxmsg_t *__allocate_msg(int *res) return NULL; } - usrtc_node_init(&(msg->chnl_node), msg); - usrtc_node_init(&(msg->poll_node), msg); usrtc_node_init(&(msg->pendingq_node), msg); } @@ -161,7 +159,7 @@ static int __message_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec else pthread_mutex_timedlock(&(m->wait), tio); } if(tio && (m->flags & ESXMSG_PENDING)) - return SXOTIMEDOUT; + return ESXOTIMEDOUT; if(!m->payload) { r = m->opcode; /* first remove the message from tree */ @@ -174,9 +172,16 @@ static int __message_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec __destroy_msg(m); } else { *msg = m; - if(m->opcode == ESXNOCONNECT) + if(m->opcode == ESXNOCONNECT || m->opcode == ESXRAPIDREPLY) r = m->opcode; - else r = SXOREPLYREQ; + else r = ESXOREPLYREQ; + /* FIXME: remove ugly code */ + if(m->opcode == ESXRAPIDREPLY) { + /* first remove the message from tree */ + pthread_rwlock_wrlock(&(ch->msglock)); + usrtc_delete(ch->msgs_tree, &(m->pendingq_node)); + pthread_rwlock_unlock(&(ch->msglock)); + } } } @@ -193,7 +198,8 @@ int msg_send_timed(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec *tio) return __message_send(ch, sx, msg, tio); } -static int __msg_reply(sxmsg_t *msg, sexp_t *sx, struct timespec *tio, int opcode) +static int __msg_reply(sxmsg_t *msg, sexp_t *sx, struct timespec *tio, int opcode, + int israpid) { int r = 0; chnl_t *ch = msg->pch; @@ -211,14 +217,19 @@ static int __msg_reply(sxmsg_t *msg, sexp_t *sx, struct timespec *tio, int opcod msg->opcode = opcode; msg->flags |= ESXMSG_PENDING; /* pending */ msg->flags |= ESXMSG_ISREPLY; /* this is a reply */ + if(israpid) msg->flags |= ESXMSG_ISRAPID; /* message is a rapid message */ - if(!sx) msg->flags &= ~ESXMSG_PENDING; + if(!sx || israpid) msg->flags &= ~ESXMSG_PENDING; else msg->flags |= ESXMSG_RMONRETR; /* put the message to the queue */ r = pth_queue_add(co->mqueue, (void *)msg, USR_MSG); if(r) return r; /* FIXME: better give up */ - if(!sx) return 0; + if(!sx || israpid) { + /* wait for write */ + pthread_mutex_lock(&(msg->wait)); + return 0; + } if(msg->flags & ESXMSG_PENDING) { if(!tio) pthread_mutex_lock(&(msg->wait)); @@ -227,7 +238,7 @@ static int __msg_reply(sxmsg_t *msg, sexp_t *sx, struct timespec *tio, int opcod if(tio && (msg->flags & ESXMSG_PENDING)) { msg->flags &= ~ESXMSG_PENDING; /* we will not wait for it */ - return SXOTIMEDOUT; + return ESXOTIMEDOUT; } r = msg->opcode; @@ -243,17 +254,22 @@ static int __msg_reply(sxmsg_t *msg, sexp_t *sx, struct timespec *tio, int opcod int msg_return(sxmsg_t *msg, int opcode) { - return __msg_reply(msg, NULL, NULL, opcode); + return __msg_reply(msg, NULL, NULL, opcode, 0); } int msg_reply(sxmsg_t *msg, sexp_t *sx) { - return __msg_reply(msg, sx, NULL, 0); + return __msg_reply(msg, sx, NULL, 0, 0); } int msg_reply_timed(sxmsg_t *msg, sexp_t *sx, struct timespec *tio) { - return __msg_reply(msg, sx, tio, 0); + return __msg_reply(msg, sx, tio, 0, 0); +} + +int msg_reply_rapid(sxmsg_t *msg, sexp_t *sx) +{ + return __msg_reply(msg, sx, NULL, 0, 1); } /* @@ -300,7 +316,7 @@ static int __message_send_pulse(chnl_t *ch, sexp_t *sx, struct timespec *tio, else pthread_mutex_timedlock(&(m->wait), tio); } if(tio && (m->flags & ESXMSG_PENDING)) - return SXOTIMEDOUT; + return ESXOTIMEDOUT; } if(!m->payload) { @@ -314,7 +330,7 @@ static int __message_send_pulse(chnl_t *ch, sexp_t *sx, struct timespec *tio, /* destroy */ __destroy_msg(m); } else { - r = SXOREPLYREQ; + r = ESXOREPLYREQ; } } @@ -335,3 +351,12 @@ int msg_send_pulse_nowait(chnl_t *ch, sexp_t *sx) { return __message_send_pulse(ch, sx, NULL, 1); } + +int msg_rapid_clean(sxmsg_t *msg) +{ + destroy_sexp(msg->initial_sx); + if(msg->payload) destroy_sexp(msg->payload); + __destroy_msg(msg); + + return 0; +}