diff --git a/examples/sntld.c b/examples/sntld.c index 3603a53..7915122 100644 --- a/examples/sntld.c +++ b/examples/sntld.c @@ -166,10 +166,16 @@ static int __ar_multiply(void *data, sexp_t *sx) /* define a little bit */ #define DEFAULT_PORT 13133 +static void sigpipe_handler(int a) +{ + //fprintf(stderr, "\n\n\n\n\nBroken pipe\n\n\n\n"); + return; +} + int main(int argc, char **argv) { // set detailed signal handler - struct sigaction sigact; + struct sigaction sigact, sigpipe; sigact.sa_flags = SA_SIGINFO; sigact.sa_sigaction = signal_error; sigemptyset(&sigact.sa_mask); @@ -177,6 +183,10 @@ int main(int argc, char **argv) sigaction(SIGILL, &sigact, 0); sigaction(SIGSEGV, &sigact, 0); sigaction(SIGBUS, &sigact, 0); + + memset(&sigpipe, 0, sizeof(struct sigaction)); + sigpipe.sa_handler = sigpipe_handler; + sigaction(SIGPIPE, &sigpipe, NULL); char *rootca = NULL, *cert = NULL; int port = DEFAULT_PORT; @@ -272,7 +282,9 @@ int main(int argc, char **argv) int client = accept(srv, (struct sockaddr*)&addr, &len); /* accept connection as usual */ opt = connection_create(co, client); /* create connection, that's all */ - if(opt) return opt; + if(opt) { + fprintf(stderr, "Cannot create connetion (%d)\n", opt); + } } return 0; diff --git a/include/sntl/connection.h b/include/sntl/connection.h index cca9908..d319a9d 100644 --- a/include/sntl/connection.h +++ b/include/sntl/connection.h @@ -68,6 +68,7 @@ typedef struct __perm_context_type { #define CXCONN_MASTER (1 << 1) #define CXCONN_SLAVE (1 << 2) #define CXCONN_ESTABL (1 << 3) +#define CXCONN_BROKEN (1 << 4) /* * älä jätä kommentteja omalla kielellä! yksinkertaisia englanti sijaan! @@ -133,6 +134,7 @@ typedef struct __sexp_payload_t { #define ESXMSG_NOWAIT (1 << 7) #define ESXMSG_ISREPLY (1 << 8) #define ESXMSG_CLOSURE (1 << 9) +#define ESXMSG_RMONRETR (1 << 10) /** * \brief Message used in sntl message passing diff --git a/lib/connection.c b/lib/connection.c index e830ddc..6e0ec82 100644 --- a/lib/connection.c +++ b/lib/connection.c @@ -43,6 +43,23 @@ static long __cmp_ulong(const void *a, const void *b); static sxmsg_t *__allocate_msg(int *res); static void __destroy_msg(sxmsg_t *msg); +/* examination */ +static inline int __exam_connection(conn_t *co) +{ + int r = 0; + + pthread_mutex_lock(&co->oplock); + if(co->flags | CXCONN_BROKEN) { + /* wake up all */ + /* destroy thread poll */ + /* free all memory and sync primitives */ + r = 1; + } + pthread_mutex_unlock(&co->oplock); + + return r; +} + static int __rpc_callback(void *data) { struct __rpc_job *job = (struct __rpc_job *)data; @@ -242,35 +259,47 @@ static int __insert_rpc_function(usrtc_t *tree, const char *name, int (*rpcf)(vo return 0; } +/* wake up all waiters on messages with given opcode */ static void __wake_up_waiters(conn_t *co, int opcode) { usrtc_node_t *node = NULL, *last_node = NULL; usrtc_node_t *msg_node = NULL, *last_msg_node = NULL; chnl_t *ch; sxmsg_t *smsg = NULL; - + pthread_rwlock_wrlock(&(co->chnl_lock)); node = usrtc_first(co->chnl_tree); last_node = usrtc_last(co->chnl_tree); + + /* going through channels tree */ while(!usrtc_isempty(co->chnl_tree)) { ch = (chnl_t *)usrtc_node_getdata(node); + pthread_rwlock_rdlock(&(ch->msglock)); msg_node = usrtc_first(ch->msgs_tree); last_msg_node = usrtc_last(ch->msgs_tree); - while(!usrtc_isempty(ch->msgs_tree)) { + + while(!usrtc_isempty(ch->msgs_tree)) { /* messages bypassing */ smsg = (sxmsg_t *)usrtc_node_getdata(msg_node); smsg->opcode = opcode; + + /* wake up waiting thread */ pthread_mutex_unlock(&(smsg->wait)); + if(msg_node == last_msg_node) break; msg_node = usrtc_next(ch->msgs_tree, msg_node); } + pthread_rwlock_unlock(&(ch->msglock)); + if(node == last_node) break; node = usrtc_next(co->chnl_tree, node); } pthread_rwlock_unlock(&(co->chnl_lock)); + + return; } static int __default_auth_set_context(void *cctx, sexp_t *sx) @@ -339,6 +368,7 @@ __reply: /* we will send it */ if(__conn_write(co, tbuf, strlen(tbuf)) < 0) { co->flags &= ~CXCONN_ESTABL; + co->flags |= CXCONN_BROKEN; __wake_up_waiters(co, ESXNOCONNECT); } destroy_sexp(sx); @@ -449,6 +479,7 @@ static int __default_ch_get_types(void *cctx, sexp_t *sx) /* reply to this rpc */ if(__conn_write(co, tbuf, strlen(tbuf)) < 0) { co->flags &= ~CXCONN_ESTABL; + co->flags |= CXCONN_BROKEN; __wake_up_waiters(co, ESXNOCONNECT); } free(tbuf); @@ -512,6 +543,7 @@ __send_reply: snprintf(buf, 1024, "(ch-gl-error (%d))", r); if(__conn_write(co, buf, strlen(buf)) < 0) { co->flags &= ~CXCONN_ESTABL; + co->flags |= CXCONN_BROKEN; __wake_up_waiters(co, ESXNOCONNECT); } destroy_sexp(sx); @@ -640,6 +672,7 @@ static int __default_ch_open(void *cctx, sexp_t *sx) uuid, cid); if(__conn_write(co, buf, strlen(buf)) < 0) { co->flags &= ~CXCONN_ESTABL; + co->flags |= CXCONN_BROKEN; __wake_up_waiters(co, ESXNOCONNECT); } destroy_sexp(sx); @@ -823,6 +856,7 @@ __send_repl: destroy_sexp(sx); if(__conn_write(co, buf, strlen(buf)) < 0) { co->flags &= ~CXCONN_ESTABL; + co->flags |= CXCONN_BROKEN; __wake_up_waiters(co, ESXNOCONNECT); } free(buf); @@ -1241,12 +1275,13 @@ static int __default_msg_return(void *cctx, sexp_t *sx) smsg->flags |= ESXMSG_CLOSURE; /* Q: can we remove the message from the tree there??? */ - /* A: yep */ + /* A: actually no */ /* first remove the message from tree */ - pthread_rwlock_wrlock(&(chan->msglock)); - usrtc_delete(chan->msgs_tree, &(smsg->pendingq_node)); - pthread_rwlock_unlock(&(chan->msglock)); - + if(smsg->flags & ESXMSG_RMONRETR) { + pthread_rwlock_wrlock(&(chan->msglock)); + usrtc_delete(chan->msgs_tree, &(smsg->pendingq_node)); + pthread_rwlock_unlock(&(chan->msglock)); + } pthread_mutex_unlock(&(smsg->wait)); } @@ -1403,10 +1438,10 @@ static void *__cxslave_thread_listener(void *wctx) while((r = __conn_read(co, buf, 4096)) != -1) { buf[r] = '\0'; - //if(r) printf("Got the message %s (%d bytes)\n", buf, r); r = __eval_cstr(buf, conn_sys->system_rpc, co); } co->flags &= ~CXCONN_ESTABL; + co->flags |= CXCONN_BROKEN; __wake_up_waiters(co, ESXNOCONNECT); free(buf); @@ -1426,6 +1461,7 @@ static void *__cxmaster_thread_listener(void *wctx) r = __eval_cstr(buf, conn_sys->system_rpc, co); } co->flags &= ~CXCONN_ESTABL; + co->flags |= CXCONN_BROKEN; __wake_up_waiters(co, ESXNOCONNECT); free(buf); @@ -1598,6 +1634,7 @@ static void *__msg_queue_thread(void *ctx) /* write it */ if(__conn_write(co, (void *)buf, strlen(buf) + sizeof(char)) < 0) { co->flags &= ~CXCONN_ESTABL; + co->flags |= CXCONN_BROKEN; __wake_up_waiters(co, ESXNOCONNECT); } @@ -1671,6 +1708,8 @@ static int __verify_certcall(int preverify_ok, X509_STORE_CTX *ctx) X509_STORE_CTX_set_error(ctx, err); } + if(!preverify_ok) return 0; + /* ok, now we're on top of SSL (depth == 0) certs chain, * and we can validate client certificate */ if(!depth) { @@ -1688,6 +1727,12 @@ static int __verify_certcall(int preverify_ok, X509_STORE_CTX *ctx) return preverify_ok; } +/* dummy just to check the server side */ +static int __verify_certcall_dummy(int preverify_ok, X509_STORE_CTX *ctx) +{ + return preverify_ok; +} + /* subsystem: here u can told me about how it's ugly to use global pointers, * yep, it's a business of fucking morons, btw it works (heh, openssl uses this * ancient shit method too, many many and many others too, trust me ...). @@ -1731,6 +1776,7 @@ void *__system_queue_listener(void *data) /* write the buf */ if(__conn_write(co, (void *)payload->cstr, strlen(payload->cstr) + 1) < 0) { co->flags &= ~CXCONN_ESTABL; + co->flags |= CXCONN_BROKEN; __wake_up_waiters(co, ESXNOCONNECT); } } @@ -1919,10 +1965,14 @@ int connection_initiate(conn_t *co, const char *host, int port, co->mqueue = mqueue; /* init SSL certificates and context */ - co->ctx = SSL_CTX_new(SSLv3_client_method()); + co->ctx = SSL_CTX_new(TLSv1_2_client_method()); if(!co->ctx) { ERR_print_errors_fp(stderr); r = EINVAL; goto __fail_3; } - else SSL_CTX_set_verify_depth(co->ctx, 1); /* FIXME: use configuration */ + else { + SSL_CTX_set_verify(co->ctx, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, + __verify_certcall_dummy); + SSL_CTX_set_verify_depth(co->ctx, 1); /* FIXME: use configuration */ + } /* load certificates */ SSL_CTX_load_verify_locations(co->ctx, conn_sys->rootca, NULL); @@ -1996,7 +2046,8 @@ int connection_initiate(conn_t *co, const char *host, int port, // we've lost the connection co->flags &= ~CXCONN_ESTABL; r = ESXNOCONNECT; - __wake_up_waiters(co, ESXNOCONNECT); + co->flags |= CXCONN_BROKEN; + //__wake_up_waiters(co, ESXNOCONNECT); free(buf); /* shutdown connection */ goto __fail_3; @@ -2012,8 +2063,9 @@ int connection_initiate(conn_t *co, const char *host, int port, if(bytes == -1) { // we've lost the connection co->flags &= ~CXCONN_ESTABL; + co->flags |= CXCONN_BROKEN; r = ESXNOCONNECT; - __wake_up_waiters(co, ESXNOCONNECT); + //__wake_up_waiters(co, ESXNOCONNECT); free(buf); /* shutdown connection */ goto __fail_3; @@ -2104,7 +2156,7 @@ int connection_create(conn_t *co, int sck) co->mqueue = mqueue; /* init SSL certificates and context */ - co->ctx = SSL_CTX_new(SSLv3_server_method()); + co->ctx = SSL_CTX_new(TLSv1_2_server_method()); if(!co->ctx) { r = EINVAL; printf("%s:%d\n", __FUNCTION__, __LINE__);goto __fail_3; } else { /* set verify context */ @@ -2174,7 +2226,8 @@ int connection_create(conn_t *co, int sck) if(bytes < 0) { printf("Terminate SSL connection, the other end is lost.\n"); co->flags &= ~CXCONN_ESTABL; - __wake_up_waiters(co, ESXNOCONNECT); + co->flags |= CXCONN_BROKEN; + //__wake_up_waiters(co, ESXNOCONNECT); r = ESXNOCONNECT; goto __fail_3; } @@ -2539,7 +2592,7 @@ static int __message_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec int r = 0; sxmsg_t *m = NULL; conn_t *co = ch->connection; - + if(!(co->flags & CXCONN_ESTABL)) { destroy_sexp(sx); return ESXNOCONNECT; @@ -2605,7 +2658,7 @@ static int __msg_reply(sxmsg_t *msg, sexp_t *sx, struct timespec *tio, int opcod int r = 0; chnl_t *ch = msg->pch; conn_t *co = ch->connection; - + if(!(co->flags & CXCONN_ESTABL)) { destroy_sexp(sx); return ESXNOCONNECT; @@ -2620,6 +2673,7 @@ static int __msg_reply(sxmsg_t *msg, sexp_t *sx, struct timespec *tio, int opcod msg->flags |= ESXMSG_ISREPLY; /* this is a reply */ if(!sx) msg->flags &= ~ESXMSG_PENDING; + else msg->flags |= ESXMSG_RMONRETR; /* put the message to the queue */ r = pth_queue_add(co->mqueue, (void *)msg, USR_MSG); @@ -2641,7 +2695,6 @@ static int __msg_reply(sxmsg_t *msg, sexp_t *sx, struct timespec *tio, int opcod destroy_sexp(msg->initial_sx); __destroy_msg(msg); } - return r; }