From 0c458e081975dafadb95bad8197158e31eab1879 Mon Sep 17 00:00:00 2001 From: Alexander Vdolainen Date: Mon, 2 Feb 2015 13:44:23 +0200 Subject: [PATCH] fixed race condition bug, added few entries to destroy connection; --- lib/connection.c | 59 ++++++++++++++++++++++++++++++++++++++---------- lib/queue.c | 34 ++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+), 12 deletions(-) diff --git a/lib/connection.c b/lib/connection.c index 6e0ec82..c314eca 100644 --- a/lib/connection.c +++ b/lib/connection.c @@ -64,8 +64,10 @@ static int __rpc_callback(void *data) { struct __rpc_job *job = (struct __rpc_job *)data; int rc = 0; + rc = job->rpcf((void *)(job->msg), job->sx); free(job); + return rc; } @@ -302,6 +304,35 @@ static void __wake_up_waiters(conn_t *co, int opcode) return; } +/* (!) NOTE: this call use only after all threads are dead ! */ +static void __destroy_all_channels(conn_t *co) +{ + usrtc_node_t *node = NULL; + chnl_t *ch; + + for(node = usrtc_first(co->chnl_tree); node != NULL; node = + usrtc_first(co->chnl_tree)) { + ch = (chnl_t *)usrtc_node_getdata(node); + + /* free allocated resources */ + if(ch->uuid) free(ch->uuid); + idx_allocator_destroy(ch->idx_msg); /* allocator */ + free(ch->idx_msg); + free(ch->msgs_tree); + /* locks */ + pthread_mutex_destroy(&(ch->oplock)); + pthread_rwlock_destroy(&(ch->msglock)); + + /* remove it */ + usrtc_delete(co->chnl_tree, node); + + /* free */ + free(ch); + } + + return; +} + static int __default_auth_set_context(void *cctx, sexp_t *sx) { conn_t *co = (conn_t *)cctx; @@ -1506,11 +1537,16 @@ static void *__rmsg_queue_thread(void *ctx) sx = (sexp_t *)msg->payload; /* get the function name */ if(sx->ty == SEXP_LIST) rpcf = sx->list->val; - else rpcf = sx->val; + else { + //rpcf = sx->val; + r = ESXRCBADPROT; + goto __err_ret; + } node = usrtc_lookup(ch->rpc_list->rpc_tree, rpcf); if(!node) { r = ENOENT; + __err_ret: msg_return(msg, r); } else { rpccall = (cx_rpc_t *)usrtc_node_getdata(node); @@ -1550,7 +1586,6 @@ static void *__msg_queue_thread(void *ctx) free(tmp); return NULL; } - ; /* message workout */ msg = tmp->data; @@ -1559,7 +1594,7 @@ static void *__msg_queue_thread(void *ctx) if(!(msg->flags & ESXMSG_USR)) { /* not a regular message */ msg->flags |= ESXMSG_NOWAY; /* mark it's as undeliverable */ msg->flags &= ~ESXMSG_PENDING; - ; + pthread_mutex_unlock(&(msg->wait)); /* wake up the waitee */ continue; } else { @@ -1631,13 +1666,6 @@ static void *__msg_queue_thread(void *ctx) strcat(tb, ")))"); __ssl_write: - /* write it */ - if(__conn_write(co, (void *)buf, strlen(buf) + sizeof(char)) < 0) { - co->flags &= ~CXCONN_ESTABL; - co->flags |= CXCONN_BROKEN; - __wake_up_waiters(co, ESXNOCONNECT); - } - if(msg->flags & ESXMSG_CLOSURE) { /* first remove the message from tree */ pthread_rwlock_wrlock(&(ch->msglock)); @@ -1649,6 +1677,13 @@ static void *__msg_queue_thread(void *ctx) destroy_sexp((sexp_t *)msg->payload); __destroy_msg(msg); } + + /* write it */ + if(__conn_write(co, (void *)buf, strlen(buf) + sizeof(char)) < 0) { + co->flags &= ~CXCONN_ESTABL; + co->flags |= CXCONN_BROKEN; + __wake_up_waiters(co, ESXNOCONNECT); + } } len = 0; @@ -2275,9 +2310,9 @@ int connection_close(conn_t *co) /* TODO: */ return 0; } -int connection_reinit(conn_t *co) /* TODO: */ +int connection_reinit(conn_t *co) /* TODO: the next version */ { - return 0; + return ENOSYS; } static sxmsg_t *__allocate_msg(int *res) diff --git a/lib/queue.c b/lib/queue.c index f59a3f6..d270dc6 100644 --- a/lib/queue.c +++ b/lib/queue.c @@ -232,6 +232,13 @@ static void *__poll_thread(void *poll) while(1) { r = pth_queue_get(q, NULL, &msgbuf); pthread_rwlock_wrlock(&(p->stats_lock)); + if(p->flags & DQTPOLL_DEADSTAGE) { /* poll going to be killed */ + pthread_rwlock_unlock(&(p->stats_lock)); + idx_free(p->idx, myid); + p->poll_value--; + + return NULL; + } if(r) { p->sleep_value++; pthread_rwlock_unlock(&(p->stats_lock)); @@ -390,6 +397,33 @@ int pth_dqtpoll_add(pth_dqtpoll_t *tpoll, void *job, unsigned int type) int pth_dqtpoll_destroy(pth_dqtpoll_t *tpoll, int force) { int r = 0; + pth_msg_t tmpmsg; + + pthread_rwlock_wrlock(&(tpoll->stats_lock)); + tpoll->flags |= DQTPOLL_DEADSTAGE; + pthread_rwlock_unlock(&(tpoll->stats_lock)); + + /* now we need to wait */ + while(1) { + pthread_rwlock_rdlock(&(tpoll->stats_lock)); + if(!tpoll->poll_value) { + pthread_rwlock_unlock(&(tpoll->stats_lock)); + break; + } else { + pthread_rwlock_unlock(&(tpoll->stats_lock)); + pth_queue_add(tpoll->queue, &tmpmsg, 0); /* spurious */ + } + } + + /* free all */ + pth_queue_destroy(tpoll->queue, 0, NULL); + idx_allocator_destroy(tpoll->idx); + pthread_rwlock_destroy(&(tpoll->stats_lock)); + + free(tpoll->poll); + free(tpoll->queue); + free(tpoll->idx); + return r; }