fixed race condition bug, added few entries to destroy connection;
This commit is contained in:
parent
647d7bb7b8
commit
0c458e0819
@ -64,8 +64,10 @@ static int __rpc_callback(void *data)
|
||||
{
|
||||
struct __rpc_job *job = (struct __rpc_job *)data;
|
||||
int rc = 0;
|
||||
|
||||
rc = job->rpcf((void *)(job->msg), job->sx);
|
||||
free(job);
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
@ -302,6 +304,35 @@ static void __wake_up_waiters(conn_t *co, int opcode)
|
||||
return;
|
||||
}
|
||||
|
||||
/* (!) NOTE: this call use only after all threads are dead ! */
|
||||
static void __destroy_all_channels(conn_t *co)
|
||||
{
|
||||
usrtc_node_t *node = NULL;
|
||||
chnl_t *ch;
|
||||
|
||||
for(node = usrtc_first(co->chnl_tree); node != NULL; node =
|
||||
usrtc_first(co->chnl_tree)) {
|
||||
ch = (chnl_t *)usrtc_node_getdata(node);
|
||||
|
||||
/* free allocated resources */
|
||||
if(ch->uuid) free(ch->uuid);
|
||||
idx_allocator_destroy(ch->idx_msg); /* allocator */
|
||||
free(ch->idx_msg);
|
||||
free(ch->msgs_tree);
|
||||
/* locks */
|
||||
pthread_mutex_destroy(&(ch->oplock));
|
||||
pthread_rwlock_destroy(&(ch->msglock));
|
||||
|
||||
/* remove it */
|
||||
usrtc_delete(co->chnl_tree, node);
|
||||
|
||||
/* free */
|
||||
free(ch);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
static int __default_auth_set_context(void *cctx, sexp_t *sx)
|
||||
{
|
||||
conn_t *co = (conn_t *)cctx;
|
||||
@ -1506,11 +1537,16 @@ static void *__rmsg_queue_thread(void *ctx)
|
||||
sx = (sexp_t *)msg->payload;
|
||||
/* get the function name */
|
||||
if(sx->ty == SEXP_LIST) rpcf = sx->list->val;
|
||||
else rpcf = sx->val;
|
||||
else {
|
||||
//rpcf = sx->val;
|
||||
r = ESXRCBADPROT;
|
||||
goto __err_ret;
|
||||
}
|
||||
|
||||
node = usrtc_lookup(ch->rpc_list->rpc_tree, rpcf);
|
||||
if(!node) {
|
||||
r = ENOENT;
|
||||
__err_ret:
|
||||
msg_return(msg, r);
|
||||
} else {
|
||||
rpccall = (cx_rpc_t *)usrtc_node_getdata(node);
|
||||
@ -1550,7 +1586,6 @@ static void *__msg_queue_thread(void *ctx)
|
||||
free(tmp);
|
||||
return NULL;
|
||||
}
|
||||
;
|
||||
|
||||
/* message workout */
|
||||
msg = tmp->data;
|
||||
@ -1559,7 +1594,7 @@ static void *__msg_queue_thread(void *ctx)
|
||||
if(!(msg->flags & ESXMSG_USR)) { /* not a regular message */
|
||||
msg->flags |= ESXMSG_NOWAY; /* mark it's as undeliverable */
|
||||
msg->flags &= ~ESXMSG_PENDING;
|
||||
;
|
||||
|
||||
pthread_mutex_unlock(&(msg->wait)); /* wake up the waitee */
|
||||
continue;
|
||||
} else {
|
||||
@ -1631,13 +1666,6 @@ static void *__msg_queue_thread(void *ctx)
|
||||
strcat(tb, ")))");
|
||||
|
||||
__ssl_write:
|
||||
/* write it */
|
||||
if(__conn_write(co, (void *)buf, strlen(buf) + sizeof(char)) < 0) {
|
||||
co->flags &= ~CXCONN_ESTABL;
|
||||
co->flags |= CXCONN_BROKEN;
|
||||
__wake_up_waiters(co, ESXNOCONNECT);
|
||||
}
|
||||
|
||||
if(msg->flags & ESXMSG_CLOSURE) {
|
||||
/* first remove the message from tree */
|
||||
pthread_rwlock_wrlock(&(ch->msglock));
|
||||
@ -1649,6 +1677,13 @@ static void *__msg_queue_thread(void *ctx)
|
||||
destroy_sexp((sexp_t *)msg->payload);
|
||||
__destroy_msg(msg);
|
||||
}
|
||||
|
||||
/* write it */
|
||||
if(__conn_write(co, (void *)buf, strlen(buf) + sizeof(char)) < 0) {
|
||||
co->flags &= ~CXCONN_ESTABL;
|
||||
co->flags |= CXCONN_BROKEN;
|
||||
__wake_up_waiters(co, ESXNOCONNECT);
|
||||
}
|
||||
}
|
||||
|
||||
len = 0;
|
||||
@ -2275,9 +2310,9 @@ int connection_close(conn_t *co) /* TODO: */
|
||||
return 0;
|
||||
}
|
||||
|
||||
int connection_reinit(conn_t *co) /* TODO: */
|
||||
int connection_reinit(conn_t *co) /* TODO: the next version */
|
||||
{
|
||||
return 0;
|
||||
return ENOSYS;
|
||||
}
|
||||
|
||||
static sxmsg_t *__allocate_msg(int *res)
|
||||
|
34
lib/queue.c
34
lib/queue.c
@ -232,6 +232,13 @@ static void *__poll_thread(void *poll)
|
||||
while(1) {
|
||||
r = pth_queue_get(q, NULL, &msgbuf);
|
||||
pthread_rwlock_wrlock(&(p->stats_lock));
|
||||
if(p->flags & DQTPOLL_DEADSTAGE) { /* poll going to be killed */
|
||||
pthread_rwlock_unlock(&(p->stats_lock));
|
||||
idx_free(p->idx, myid);
|
||||
p->poll_value--;
|
||||
|
||||
return NULL;
|
||||
}
|
||||
if(r) {
|
||||
p->sleep_value++;
|
||||
pthread_rwlock_unlock(&(p->stats_lock));
|
||||
@ -390,6 +397,33 @@ int pth_dqtpoll_add(pth_dqtpoll_t *tpoll, void *job, unsigned int type)
|
||||
int pth_dqtpoll_destroy(pth_dqtpoll_t *tpoll, int force)
|
||||
{
|
||||
int r = 0;
|
||||
pth_msg_t tmpmsg;
|
||||
|
||||
pthread_rwlock_wrlock(&(tpoll->stats_lock));
|
||||
tpoll->flags |= DQTPOLL_DEADSTAGE;
|
||||
pthread_rwlock_unlock(&(tpoll->stats_lock));
|
||||
|
||||
/* now we need to wait */
|
||||
while(1) {
|
||||
pthread_rwlock_rdlock(&(tpoll->stats_lock));
|
||||
if(!tpoll->poll_value) {
|
||||
pthread_rwlock_unlock(&(tpoll->stats_lock));
|
||||
break;
|
||||
} else {
|
||||
pthread_rwlock_unlock(&(tpoll->stats_lock));
|
||||
pth_queue_add(tpoll->queue, &tmpmsg, 0); /* spurious */
|
||||
}
|
||||
}
|
||||
|
||||
/* free all */
|
||||
pth_queue_destroy(tpoll->queue, 0, NULL);
|
||||
idx_allocator_destroy(tpoll->idx);
|
||||
pthread_rwlock_destroy(&(tpoll->stats_lock));
|
||||
|
||||
free(tpoll->poll);
|
||||
free(tpoll->queue);
|
||||
free(tpoll->idx);
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user