numerious bug fixes, memory leaks fixes;

v0.5.xx
Alexander Vdolainen 10 years ago
parent 00406e054c
commit 2b0df25d2e

@ -87,12 +87,10 @@ static int __conn_read(conn_t *co, void *buf, size_t buf_len)
{
int rfd = SSL_get_fd(co->ssl), r;
fd_set readset;
printf("__conn_read\n");
fprintf(stderr, "\tListening ... on %s\n", co->uuid);
/* get prepare to select */
FD_ZERO(&readset);
FD_SET(rfd, &readset);
/* waits until something will be ready to read */
r = select(FD_SETSIZE, &readset, NULL, NULL, NULL);
if(r < 0) {
@ -106,12 +104,12 @@ static int __conn_read(conn_t *co, void *buf, size_t buf_len)
if(r && FD_ISSET(rfd, &readset)) {
do {
//pthread_mutex_lock(&(co->oplock));
/* ok, now we're ready to perform SSL_read */
r = SSL_read(co->ssl, buf, (int)buf_len);
switch(SSL_get_error(co->ssl, r)) {
case SSL_ERROR_NONE:
printf("Read done (f:%d)\n", rfd);
//printf("Read done (f:%d)\n", rfd);
/* this is means we're get ridden it all */
return r; break;
case SSL_ERROR_ZERO_RETURN:
@ -123,9 +121,10 @@ static int __conn_read(conn_t *co, void *buf, size_t buf_len)
printf("Bypass until SSL buffer not ready.\n");
return 0;
default: /* seems the connection lost */
printf("Unknown error!\n");
fprintf(stderr, "(RD)Unknown error on %s\n", co->uuid);
return -1;
}
//pthread_mutex_unlock(&(co->oplock));
} while(SSL_pending(co->ssl));
}
@ -134,6 +133,24 @@ static int __conn_read(conn_t *co, void *buf, size_t buf_len)
static int __conn_write(conn_t *co, void *buf, size_t buf_len)
{
int r;
__retry:
pthread_mutex_lock(&(co->oplock));
r = SSL_write(co->ssl, buf, (int)buf_len);
switch(SSL_get_error(co->ssl, r)) {
case SSL_ERROR_WANT_READ:
case SSL_ERROR_WANT_WRITE:
printf("--------------NEED to retry the write\n");
goto __retry;
break;
default:
pthread_mutex_unlock(&(co->oplock));
fprintf(stderr, "(WR)Unknown error on %s (%d)\n", co->uuid, r);
return -1;
}
pthread_mutex_unlock(&(co->oplock));
return 0;
}
@ -149,7 +166,7 @@ static long __cmp_int(const void *a, const void *b)
static long __cmp_ulong(const void *a, const void *b)
{
printf("(??cmp_ulong)a = %ld b = %ld\n", *(ulong_t *)a , *(ulong_t *)b);
//printf("(??cmp_ulong)a = %ld b = %ld\n", *(ulong_t *)a , *(ulong_t *)b);
return *(ulong_t *)a - *(ulong_t *)b;
}
@ -547,7 +564,8 @@ static int __default_ch_open(void *cctx, sexp_t *sx)
buf = malloc(2048);
snprintf(buf, 2048, "(ch-open-ret ((:error %d)(:uuid %s)(:id %ld)))", r,
uuid, cid);
SSL_write(co->ssl, buf, strlen(buf));
__conn_write(co, buf, strlen(buf));
destroy_sexp(sx);
free(buf);
return r;
@ -614,7 +632,6 @@ static int __default_ch_open_ret(void *cctx, sexp_t *sx)
//printf("channels (%d)\n", usrtc_count(co->chnl_tree));
pthread_rwlock_unlock(&(co->chnl_lock));
if(node) {
printf("found channel!\n");
chan = (chnl_t *)usrtc_node_getdata(node);
sms = chan->sysmsg;
}
@ -624,6 +641,8 @@ static int __default_ch_open_ret(void *cctx, sexp_t *sx)
sms->flags &= ~ESXMSG_PENDING; /* the message is done */
sms->opcode = err;
destroy_sexp(sx);
/* unlock mutex to wake up the waiting thread */
pthread_mutex_unlock(&(sms->wait));
@ -631,7 +650,7 @@ static int __default_ch_open_ret(void *cctx, sexp_t *sx)
}
static int __default_ch_close(void *cctx, sexp_t *sx)
{
{
conn_t *co = (conn_t *)cctx;
usrtc_node_t *node;
char *val, *var, *buf;
@ -670,7 +689,7 @@ static int __default_ch_close(void *cctx, sexp_t *sx)
printf("%s:%d\n", __FUNCTION__, __LINE__);
goto __send_repl;
} else var = sx_in->val;
/* ok, now we need to analyze parameters */
if(*val != ':') {
r = EINVAL; /* TODO: clean up all the shit */
@ -683,8 +702,8 @@ static int __default_ch_close(void *cctx, sexp_t *sx)
}
}
printf("%s(%ld)\n", __FUNCTION__, cid);
//printf("%s(%ld)\n", __FUNCTION__, cid);
/* additional check for type of the channel */
node = usrtc_lookup(co->chnl_tree, &cid);
if(!node) {
@ -698,29 +717,36 @@ static int __default_ch_close(void *cctx, sexp_t *sx)
printf("there is no channel with id=%ld\n", cid);
goto __send_repl;
}
// TODO free it correctly
// free(channel->uuid);
// pthread_rwlock_wrlock(&(co->chnl_lock));
// idx_free(co->idx_ch, channel->cid);
// pthread_rwlock_unlock(&(co->chnl_lock));
// free(channel);
usrtc_delete(co->chnl_tree, node);
__send_repl:
buf = malloc(2048);
snprintf(buf, 2048, "(ch-close-ret ((:id %ld) (:error %d)))",
channel->cid, r);
int nbytes = SSL_write(co->ssl, buf, strlen(buf));
printf("%s: replied %s (%d bytes)\n", __FUNCTION__, buf, nbytes);
__conn_write(co, buf, strlen(buf));
//printf("%s: replied %s (%d bytes)\n", __FUNCTION__, buf, nbytes);
free(buf);
/* remove channel from the search tree */
pthread_rwlock_wrlock(&(co->chnl_lock));
usrtc_delete(co->chnl_tree, &(channel->node));
/* free index */
idx_free(co->idx_ch, channel->cid);
pthread_rwlock_unlock(&(co->chnl_lock));
idx_allocator_destroy(channel->idx_msg);
free(channel->msgs_tree);
pthread_mutex_destroy(&(channel->oplock));
pthread_rwlock_destroy(&(channel->msglock));
free(channel);
destroy_sexp(sx);
return 0;
}
static int __default_ch_close_ret(void *cctx, sexp_t *sx)
{
__DBGLINE;
;
conn_t *co = (conn_t *)cctx;
chnl_t *chan;
usrtc_node_t *node;
@ -741,7 +767,7 @@ static int __default_ch_close_ret(void *cctx, sexp_t *sx)
/* take length of the list */
llen = sexp_list_length(lsx);
if(!llen) return EINVAL; /* TODO: !! other side will not set any security attributes */
__DBGLINE;
;
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
sexp_list_car(sx_iter, &sx_in);
@ -772,27 +798,27 @@ static int __default_ch_close_ret(void *cctx, sexp_t *sx)
}
} else continue; /* ignore */
}
__DBGLINE;
;
/* try to find desired channel to intercept message */
pthread_rwlock_rdlock(&(co->chnl_lock));
node = usrtc_lookup(co->chnl_tree, (void *)&id);
//printf("channels (%d)\n", usrtc_count(co->chnl_tree));
pthread_rwlock_unlock(&(co->chnl_lock));
if(node) {
printf("found channel!\n");
chan = (chnl_t *)usrtc_node_getdata(node);
sms = chan->sysmsg;
}
__DBGLINE;
;
__mark_msg:
__DBGLINE;
;
if(!sms) return r;
sms->flags &= ~ESXMSG_PENDING; /* the message is done */
sms->opcode = err;
destroy_sexp(sx);
/* unlock mutex to wake up the waiting thread */
pthread_mutex_unlock(&(sms->wait));
__DBGLINE;
return 0;
}
@ -847,7 +873,7 @@ static int __default_msg(void *cctx, sexp_t *sx)
/* get parameters from the message */
if(sexp_list_cdr(sx, &lsx)) return EINVAL;
if(!SEXP_IS_LIST(lsx)) return EINVAL;
/* TODO: optimize it, i.e. do it with one pass iteraction !! */
/* find channel id */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
@ -890,11 +916,12 @@ static int __default_msg(void *cctx, sexp_t *sx)
return EINVAL;
}
/* find channel */
printf("chnl_id = %ld\n", chnl_id);
if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT;
else chan = (chnl_t *)usrtc_node_getdata(node);
/* lookup for the message */
pthread_rwlock_rdlock(&(chan->msglock));
if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) {
pthread_rwlock_unlock(&(chan->msglock));
/* here, rpc lookup has no sense, just put this job to the queue */
/* btw, we're need to create a message first */
r = __create_reg_msg_mould(&smsg, chan, msg_id);
@ -912,7 +939,8 @@ static int __default_msg(void *cctx, sexp_t *sx)
usrtc_insert(chan->msgs_tree, &(smsg->pendingq_node), &(smsg->mid));
pthread_rwlock_unlock(&(chan->msglock));
} else {
// TODO is it correct?
//printf(">>>>>>>>>>>>>>>>>>>msg_id = %lu\n", msg_id);
pthread_rwlock_unlock(&(chan->msglock));
smsg = (sxmsg_t *)usrtc_node_getdata(node);
msg_return(smsg, EEXIST);
return EEXIST;
@ -948,11 +976,11 @@ static int __default_msg_return(void *cctx, sexp_t *sx)
sexp_t *msg = NULL;
sxmsg_t *smsg = NULL;
int idx, opcode;
__DBGLINE;
/* get parameters from the message */
if(sexp_list_cdr(sx, &lsx)) return EINVAL;
if(!SEXP_IS_LIST(lsx)) return EINVAL;
__DBGLINE;
/* get parameters */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
@ -997,16 +1025,19 @@ static int __default_msg_return(void *cctx, sexp_t *sx)
if(msg_id < 0 || chnl_id < 0) {
return EINVAL;
}
printf("chnl_id = %ld\n", chnl_id);
//printf("chnl_id = %ld\n", chnl_id);
if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT;
else chan = (chnl_t *)usrtc_node_getdata(node);
/* lookup for the message */
pthread_rwlock_rdlock(&(chan->msglock));
if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) {
pthread_rwlock_unlock(&(chan->msglock));
/* TODO: return gently an opcode about absent message */
} else {
pthread_rwlock_unlock(&(chan->msglock));
smsg = (sxmsg_t *)usrtc_node_getdata(node);
smsg->opcode = opcode;
//destroy_sexp((sexp_t *)smsg->payload);
destroy_sexp(sx);
smsg->payload = NULL;
pthread_mutex_unlock(&(smsg->wait));
}
@ -1078,9 +1109,12 @@ static int __default_msg_reply(void *cctx, sexp_t *sx)
if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT;
else chan = (chnl_t *)usrtc_node_getdata(node);
/* lookup for the message */
pthread_rwlock_rdlock(&(chan->msglock));
if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) {
pthread_rwlock_unlock(&(chan->msglock));
/* TODO: return gently an opcode about absent message */
} else {
pthread_rwlock_unlock(&(chan->msglock));
smsg = (sxmsg_t *)usrtc_node_getdata(node);
smsg->opcode = SXOREPLYREQ;
//destroy_sexp((sexp_t *)smsg->payload);
@ -1121,6 +1155,7 @@ static int __init_systemrpc_tree(usrtc_t *rtree)
static int __eval_cstr(char *cstr, cx_rpc_list_t *rpc_list, void *ctx)
{
;
int r = ENOENT;
sexp_t *sx;
usrtc_node_t *node;
@ -1133,15 +1168,13 @@ static int __eval_cstr(char *cstr, cx_rpc_list_t *rpc_list, void *ctx)
else rpcf = sx->val;
/* find an appropriate function */
printf("rpcf = %s\n", rpcf);
//printf("rpcf = %s (sx = %p)\n", rpcf, sx);
node = usrtc_lookup(rpc_list->rpc_tree, rpcf);
if(!node) return ENOENT;
else rentry = (cx_rpc_t *)usrtc_node_getdata(node);
/* call it */
printf("rentry->rpcf = %p\n", rentry->rpcf);
//printf("rentry->rpcf = %p\n", rentry->rpcf);
r = rentry->rpcf(ctx, sx);
/* free s-expression */
//destroy_sexp(sx); FIXME: i guess rpc call should take care of sx
return r;
}
@ -1208,7 +1241,7 @@ static void *__rmsg_queue_thread(void *ctx)
if(!(msg->flags & ESXMSG_USR)) { /* not a regular message */
msg->flags |= ESXMSG_NOWAY; /* mark it's as undeliverable */
msg->flags &= ~ESXMSG_PENDING;
__DBGLINE;
;
pthread_mutex_unlock(&(msg->wait)); /* wake up the waitee */
continue;
} else {
@ -1218,7 +1251,7 @@ static void *__rmsg_queue_thread(void *ctx)
/* get the function name */
if(sx->ty == SEXP_LIST) rpcf = sx->list->val;
else rpcf = sx->val;
printf("Inbound queue RPC call = '%s'\n", rpcf);
//printf("Inbound queue RPC call = '%s'\n", rpcf);
node = usrtc_lookup(ch->rpc_list->rpc_tree, rpcf);
if(!node) {
@ -1250,7 +1283,7 @@ static void *__msg_queue_thread(void *ctx)
if(buf) free(buf);
return NULL;
}
__DBGLINE;
;
while(1) {
r = pth_queue_get(co->mqueue, NULL, tmp);
if(r) {
@ -1258,7 +1291,7 @@ static void *__msg_queue_thread(void *ctx)
free(tmp);
return NULL;
}
__DBGLINE;
;
/* message workout */
msg = tmp->data;
@ -1267,7 +1300,7 @@ static void *__msg_queue_thread(void *ctx)
if(!(msg->flags & ESXMSG_USR)) { /* not a regular message */
msg->flags |= ESXMSG_NOWAY; /* mark it's as undeliverable */
msg->flags &= ~ESXMSG_PENDING;
__DBGLINE;
;
pthread_mutex_unlock(&(msg->wait)); /* wake up the waitee */
continue;
} else {
@ -1285,12 +1318,12 @@ static void *__msg_queue_thread(void *ctx)
/* mark it to close */
msg->flags |= ESXMSG_CLOSURE;
/* ok, here we will write it and wait, destroying dialog while reply */
__DBGLINE;
;
goto __ssl_write;
} else snprintf(buf, 4096, "(ch-msg-repl (:chid %lu (:msgid %lu ", ch->cid,
msg->mid);
}
__DBGLINE;
;
len = strlen(buf);
tb += len*sizeof(char);
if(print_sexp(tb, 4096 - (len + 4*sizeof(char)), sx) == -1) {
@ -1302,15 +1335,20 @@ static void *__msg_queue_thread(void *ctx)
* 2. destroy message itself
*/
/* TODO: destroy the message */
// destroy_sexp(msg->initial_sx);
// msg->initial_sx = NULL;
// msg->payload = NULL;
// destroy_sexp(msg->payload);
msg->payload = NULL;
} else {
__DBGLINE;
;
pthread_mutex_unlock(&(msg->wait));
}
}
len = strlen(tb);
tb += len*sizeof(char);
strcat(tb, ")))");
__DBGLINE;
;
} else { /* pulse messages */
/* here we're shouldn't process reply procedure */
snprintf(buf, 4096, "(ch-msg-pulse (:chid %lu (:msgid %lu ", ch->cid,
@ -1326,6 +1364,11 @@ static void *__msg_queue_thread(void *ctx)
* 2. destroy message itself
*/
/* TODO: destroy the message */
// destroy_sexp(msg->initial_sx);
// msg->initial_sx = NULL;
// msg->payload = NULL;
// destroy_sexp(msg->payload);
msg->payload = NULL;
} else
pthread_mutex_unlock(&(msg->wait));
}
@ -1335,17 +1378,24 @@ static void *__msg_queue_thread(void *ctx)
}
__ssl_write:
__DBGLINE;
if(msg->flags & ESXMSG_CLOSURE) {
/* first remove the message from tree */
pthread_rwlock_wrlock(&(ch->msglock));
usrtc_delete(ch->msgs_tree, &(msg->pendingq_node));
pthread_rwlock_unlock(&(ch->msglock));
/* destroy */
destroy_sexp(msg->initial_sx);
__destroy_msg(msg);
}
/* write it */
pthread_mutex_lock(&(co->oplock)); /* exclusive write */
SSL_write(co->ssl, (void *)buf, strlen(buf) + sizeof(char)); /* TODO: SSL*/
pthread_mutex_unlock(&(co->oplock));
__conn_write(co, (void *)buf, strlen(buf) + sizeof(char)); /* TODO: SSL*/
fprintf(stderr, "\t-->%s wrote %s\n", __FUNCTION__, buf);
}
len = 0;
}
free(msg);
free(buf);
return NULL;
@ -1458,11 +1508,8 @@ void *__system_queue_listener(void *data)
chan = sysmsg->pch;
co = chan->connection;
payload = (sxpayload_t *)sysmsg->payload;
/* lock the connection for ops */
pthread_mutex_lock(&(co->oplock)); /* exclusive write */
/* write the buf */
SSL_write(co->ssl, (void *)payload->cstr, strlen(payload->cstr) + 1); /* TODO: SSL*/
pthread_mutex_unlock(&(co->oplock));
__conn_write(co, (void *)payload->cstr, strlen(payload->cstr) + 1); /* TODO: SSL*/
}
}
@ -2047,7 +2094,7 @@ int channel_open(conn_t *co, chnl_t **ch, int type)
} else {
/* put the channel to the channels search tree */
pthread_rwlock_wrlock(&(co->chnl_lock));
printf("inserting cid = %d\n", nch->cid);
//printf("inserting cid = %d\n", nch->cid);
usrtc_insert(co->chnl_tree, &nch->node, &nch->cid);
pthread_rwlock_unlock(&(co->chnl_lock));
@ -2076,6 +2123,9 @@ int channel_open(conn_t *co, chnl_t **ch, int type)
} else r = 0;
nch->flags &= ~ESXCHAN_PENDING; /* mark it as established */
/* TODO: destroy system message in the channel */
free(pl->cstr);
free(pl);
__destroy_msg(nch->sysmsg);
}
__fini_op:
@ -2098,48 +2148,56 @@ int channel_close(chnl_t *chnl)
char *uuid_ = __generate_uuid();
usrtc_node_t *node = NULL;
int r;
/* check unprocessed messages */
if(!usrtc_isempty(chnl->msgs_tree)) {
fprintf(stderr, "Unable to close channel");
return EINVAL;
}
conn_t *co = chnl->connection;
sxmsg_t *sms;
sxpayload_t *pl;
node = usrtc_lookup(chnl->connection->chnl_tree, &chnl->cid);
pthread_rwlock_rdlock(&(co->chnl_lock));
node = usrtc_lookup(co->chnl_tree, &chnl->cid);
pthread_rwlock_unlock(&(co->chnl_lock));
if(!node) {
fprintf(stderr, "No such channel\n");
return ENOENT;
}
sxpayload_t *pl = malloc(sizeof(sxpayload_t));
pthread_rwlock_wrlock(&(chnl->msglock));
/* check unprocessed messages */
if(!usrtc_isempty(chnl->msgs_tree)) {
pthread_rwlock_unlock(&(chnl->msglock));
fprintf(stderr, "Unable to close channel\n");
return EBUSY;
}
pl = malloc(sizeof(sxpayload_t));
if(!pl) return ENOMEM;
sxmsg_t *sms;
if(__create_sys_msg(&sms, uuid_, chnl, pl)) {
/* TODO: destroy the channel*/
return ENOMEM;
}
pl->sx = NULL;
if(!(pl->cstr = malloc(sizeof(char) * ESX_SYSMSG_SIZE))) {
pthread_rwlock_unlock(&(chnl->msglock));
free(pl);
return ENOMEM;
}
}
memset(pl->cstr, 0, sizeof(char) * ESX_SYSMSG_SIZE);
/* put system message to the run queue */
/* first form the message */
snprintf(pl->cstr, sizeof(char) * ESX_SYSMSG_SIZE,
snprintf(pl->cstr, sizeof(char) * ESX_SYSMSG_SIZE,
"(ch-close (:id %ld))", chnl->cid);
chnl->sysmsg = sms; /* assign system message to the channel */
/* put it */
if((r = pth_queue_add(conn_sys->ioqueue, (void *)sms, SYS_MSG))) {
pthread_rwlock_unlock(&(chnl->msglock));
return r;
}
if(!(sms->flags & ESXMSG_PENDING)) {
/* was processed too fast */
goto __process_smsg;
} else pthread_mutex_lock(&(sms->wait)); /* will sleep until got a reply */
__process_smsg:
if(sms->opcode) {
r = sms->opcode;
@ -2147,23 +2205,29 @@ __process_smsg:
} else r = 0;
chnl->flags &= ~ESXCHAN_PENDING; /* mark it as established */
/* TODO: destroy system message in the channel */
__DBGLINE;
;
/* remove channel from the search tree */
pthread_rwlock_wrlock(&(chnl->connection->chnl_lock));
usrtc_delete(chnl->connection->chnl_tree, &chnl->node);
/* free index */
idx_free(co->idx_ch, chnl->cid);
pthread_rwlock_unlock(&(chnl->connection->chnl_lock));
pthread_rwlock_unlock(&(chnl->msglock));
__destroy_msg(chnl->sysmsg);
free(pl->cstr);
free(pl);
// free(chnl->uuid);
// pthread_rwlock_wrlock(&(chnl->connection->chnl_lock));
// idx_free(chnl->connection->idx_ch, chnl->cid);
// pthread_rwlock_unlock(&(chnl->connection->chnl_lock));
// idx_free(chnl->connection->idx_ch, chnl->cid);
// free(chnl);
free(chnl->uuid);
idx_allocator_destroy(chnl->idx_msg);
free(chnl->msgs_tree);
pthread_mutex_destroy(&(chnl->oplock));
pthread_rwlock_destroy(&(chnl->msglock));
free(chnl);
return 0;
}
/* message passing */
@ -2183,11 +2247,9 @@ static int __message_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec
*msg = NULL;
__DBGLINE;
r = __create_reg_msg(&m, ch);
if(r) return r;
else {
__DBGLINE;
/* put the message to the search tree */
pthread_rwlock_wrlock(&(ch->msglock));
usrtc_insert(ch->msgs_tree, &(m->pendingq_node), &(m->mid));
@ -2201,22 +2263,24 @@ static int __message_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec
/* put the message to the run queue */
r = pth_queue_add(co->mqueue, (void *)m, USR_MSG);
__DBGLINE;
if(r) return r; /* FIXME: better give up */
if(m->flags & ESXMSG_PENDING) {
__DBGLINE;
if(!tio) pthread_mutex_lock(&(m->wait));
else pthread_mutex_timedlock(&(m->wait), tio);
__DBGLINE;
}
__DBGLINE;
if(tio && (m->flags & ESXMSG_PENDING))
return SXOTIMEDOUT;
__DBGLINE;
if(!m->payload) {
/* TODO: destroy the message */
r = m->opcode;
/* first remove the message from tree */
pthread_rwlock_wrlock(&(ch->msglock));
usrtc_delete(ch->msgs_tree, &(m->pendingq_node));
pthread_rwlock_unlock(&(ch->msglock));
/* destroy s expression */
destroy_sexp(m->initial_sx);
/* destroy */
__destroy_msg(m);
} else {
*msg = m;
r = SXOREPLYREQ;
@ -2247,11 +2311,12 @@ static int __msg_reply(sxmsg_t *msg, sexp_t *sx, struct timespec *tio, int opcod
msg->flags |= ESXMSG_PENDING; /* pending */
msg->flags |= ESXMSG_ISREPLY; /* this is a reply */
if(!sx) msg->flags &= ~ESXMSG_PENDING;
/* put the message to the queue */
r = pth_queue_add(co->mqueue, (void *)msg, USR_MSG);
if(r) return r; /* FIXME: better give up */
if(!sx) return 0; /* TODO: destroy a message */
if(!sx) return 0;
if(msg->flags & ESXMSG_PENDING) {
if(!tio) pthread_mutex_lock(&(msg->wait));
@ -2263,9 +2328,17 @@ static int __msg_reply(sxmsg_t *msg, sexp_t *sx, struct timespec *tio, int opcod
r = msg->opcode;
#if 0
if(msg->flags & ESXMSG_CLOSURE) {
/* TODO: destroy message */
/* first remove the message from tree */
pthread_rwlock_wrlock(&(ch->msglock));
usrtc_delete(ch->msgs_tree, &(msg->pendingq_node));
pthread_rwlock_unlock(&(ch->msglock));
/* destroy */
destroy_sexp(msg->initial_sx);
__destroy_msg(msg);
}
#endif
return r;
}

Loading…
Cancel
Save