|
|
@ -770,7 +770,7 @@ static void *__msg_queue_thread(void *ctx)
|
|
|
|
msg = tmp->data;
|
|
|
|
msg = tmp->data;
|
|
|
|
if(!msg) continue; /* spurious message */
|
|
|
|
if(!msg) continue; /* spurious message */
|
|
|
|
|
|
|
|
|
|
|
|
if(!(sysmsg->flags & ESXMSG_USR)) { /* not a regular message */
|
|
|
|
if(!(msg->flags & ESXMSG_USR)) { /* not a regular message */
|
|
|
|
msg->flags |= ESXMSG_NOWAY; /* mark it's as undeliverable */
|
|
|
|
msg->flags |= ESXMSG_NOWAY; /* mark it's as undeliverable */
|
|
|
|
msg->flags &= ~ESXMSG_PENDING;
|
|
|
|
msg->flags &= ~ESXMSG_PENDING;
|
|
|
|
pthread_mutex_unlock(&(msg->wait)); /* wake up the waitee */
|
|
|
|
pthread_mutex_unlock(&(msg->wait)); /* wake up the waitee */
|
|
|
@ -785,9 +785,9 @@ static void *__msg_queue_thread(void *ctx)
|
|
|
|
len = strlen(buf);
|
|
|
|
len = strlen(buf);
|
|
|
|
tb += len*sizeof(char);
|
|
|
|
tb += len*sizeof(char);
|
|
|
|
if(print_sexp(tb, 4096 - (len + 4*sizeof(char)), sx)) {
|
|
|
|
if(print_sexp(tb, 4096 - (len + 4*sizeof(char)), sx)) {
|
|
|
|
sx->opcode = ENOMEM;
|
|
|
|
msg->opcode = ENOMEM;
|
|
|
|
/* we don't need to wake up anybody */
|
|
|
|
/* we don't need to wake up anybody */
|
|
|
|
if(sx->flags & ESXMSG_TIMEDOUT) {
|
|
|
|
if(msg->flags & ESXMSG_TIMEDOUT) {
|
|
|
|
/* clean up all the shit:
|
|
|
|
/* clean up all the shit:
|
|
|
|
* 1. remove from message tree
|
|
|
|
* 1. remove from message tree
|
|
|
|
* 2. destroy message itself
|
|
|
|
* 2. destroy message itself
|
|
|
@ -800,11 +800,12 @@ static void *__msg_queue_thread(void *ctx)
|
|
|
|
tb += len*sizeof(char);
|
|
|
|
tb += len*sizeof(char);
|
|
|
|
strcat(tb, ")))\0");
|
|
|
|
strcat(tb, ")))\0");
|
|
|
|
} else { /* pulse messages */
|
|
|
|
} else { /* pulse messages */
|
|
|
|
|
|
|
|
tb = tb;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/* write it */
|
|
|
|
/* write it */
|
|
|
|
pthread_mutex_lock(&(co->oplock)); /* exclusive write */
|
|
|
|
pthread_mutex_lock(&(co->oplock)); /* exclusive write */
|
|
|
|
SSL_write(co->ssl, (void *)buf, strlen(buf) + sizeof(char); /* TODO: SSL*/
|
|
|
|
SSL_write(co->ssl, (void *)buf, strlen(buf) + sizeof(char)); /* TODO: SSL*/
|
|
|
|
pthread_mutex_unlock(&(co->oplock));
|
|
|
|
pthread_mutex_unlock(&(co->oplock));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -1379,11 +1380,11 @@ static sxmsg_t *__allocate_msg(int *res)
|
|
|
|
|
|
|
|
|
|
|
|
static void __destroy_msg(sxmsg_t *msg)
|
|
|
|
static void __destroy_msg(sxmsg_t *msg)
|
|
|
|
{
|
|
|
|
{
|
|
|
|
chnl_t *ch = msg->nch;
|
|
|
|
chnl_t *ch = msg->pch;
|
|
|
|
|
|
|
|
|
|
|
|
if(sm->flags & ESXMSG_USR) {
|
|
|
|
if(msg->flags & ESXMSG_USR) {
|
|
|
|
pthread_mutex_lock(&(ch->oplock));
|
|
|
|
pthread_mutex_lock(&(ch->oplock));
|
|
|
|
idx_free(ch->idx_msg, sm->mid);
|
|
|
|
idx_free(ch->idx_msg, msg->mid);
|
|
|
|
pthread_mutex_unlock(&(ch->oplock));
|
|
|
|
pthread_mutex_unlock(&(ch->oplock));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -1562,7 +1563,7 @@ static int __message_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec
|
|
|
|
else {
|
|
|
|
else {
|
|
|
|
/* put the message to the search tree */
|
|
|
|
/* put the message to the search tree */
|
|
|
|
pthread_rwlock_wrlock(&(ch->msglock));
|
|
|
|
pthread_rwlock_wrlock(&(ch->msglock));
|
|
|
|
usrtc_insert(ch->msgs_tree, &(m->pendinq_node), &(m->mid));
|
|
|
|
usrtc_insert(ch->msgs_tree, &(m->pendingq_node), &(m->mid));
|
|
|
|
pthread_rwlock_unlock(&(ch->msglock));
|
|
|
|
pthread_rwlock_unlock(&(ch->msglock));
|
|
|
|
/* put the message to the run queue */
|
|
|
|
/* put the message to the run queue */
|
|
|
|
}
|
|
|
|
}
|
|
|
|