|
|
|
@ -564,7 +564,7 @@ static int __default_ch_open_ret(void *cctx, sexp_t *sx)
|
|
|
|
|
lsx = sx->list->next;
|
|
|
|
|
/* now we expect a list of lists */
|
|
|
|
|
if(lsx->ty != SEXP_LIST) {
|
|
|
|
|
printf("%s:%d\n", __FUNCTION__, __LINE__);
|
|
|
|
|
//printf("%s:%d\n", __FUNCTION__, __LINE__);
|
|
|
|
|
r = EINVAL; /* TODO: right opcode */
|
|
|
|
|
goto __mark_msg;
|
|
|
|
|
}
|
|
|
|
@ -716,8 +716,6 @@ static void *__cxslave_thread_listener(void *wctx)
|
|
|
|
|
char *buf = malloc(4096);
|
|
|
|
|
int r;
|
|
|
|
|
|
|
|
|
|
printf("Slave listening thread\n");
|
|
|
|
|
|
|
|
|
|
while((r = __conn_read(co, buf, 4096)) != -1) {
|
|
|
|
|
if(r) printf("Got the message %s \n", buf);
|
|
|
|
|
r = __eval_cstr(buf, conn_sys->system_rpc, co);
|
|
|
|
@ -734,8 +732,6 @@ static void *__cxmaster_thread_listener(void *wctx)
|
|
|
|
|
char *buf = malloc(4096);
|
|
|
|
|
int r;
|
|
|
|
|
|
|
|
|
|
printf("Master listening thread\n");
|
|
|
|
|
|
|
|
|
|
while((r = __conn_read(co, buf, 4096)) != -1) {
|
|
|
|
|
if(r) printf("Got the message %s \n", buf);
|
|
|
|
|
r = __eval_cstr(buf, conn_sys->system_rpc, co);
|
|
|
|
@ -746,6 +742,81 @@ static void *__cxmaster_thread_listener(void *wctx)
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void *__msg_queue_thread(void *ctx)
|
|
|
|
|
{
|
|
|
|
|
conn_t *co = (conn_t *)ctx;
|
|
|
|
|
pth_msg_t *tmp = malloc(sizeof(pth_msg_t));
|
|
|
|
|
sxmsg_t *msg;
|
|
|
|
|
chnl_t *ch;
|
|
|
|
|
int r = 0, len;
|
|
|
|
|
char *buf = malloc(4096), *tb;
|
|
|
|
|
sexp_t *sx;
|
|
|
|
|
|
|
|
|
|
if(!tmp || !buf) {
|
|
|
|
|
if(tmp) free(tmp);
|
|
|
|
|
if(buf) free(buf);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
while(1) {
|
|
|
|
|
r = pth_queue_get(co->mqueue, NULL, tmp);
|
|
|
|
|
if(r) {
|
|
|
|
|
free(buf);
|
|
|
|
|
free(tmp);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* message workout */
|
|
|
|
|
msg = tmp->data;
|
|
|
|
|
if(!msg) continue; /* spurious message */
|
|
|
|
|
|
|
|
|
|
if(!(sysmsg->flags & ESXMSG_USR)) { /* not a regular message */
|
|
|
|
|
msg->flags |= ESXMSG_NOWAY; /* mark it's as undeliverable */
|
|
|
|
|
msg->flags &= ~ESXMSG_PENDING;
|
|
|
|
|
pthread_mutex_unlock(&(msg->wait)); /* wake up the waitee */
|
|
|
|
|
continue;
|
|
|
|
|
} else {
|
|
|
|
|
ch = msg->pch;
|
|
|
|
|
/* now we need to complete the request */
|
|
|
|
|
sx = (sexp_t *)msg->payload; tb = buf;
|
|
|
|
|
if(!(msg->flags & ESXMSG_PULSE)) { /* %s))) */
|
|
|
|
|
snprintf(buf, 4096, "(ch-msg (:chid %lu (:msgid %lu ", ch->cid,
|
|
|
|
|
msg->mid);
|
|
|
|
|
len = strlen(buf);
|
|
|
|
|
tb += len*sizeof(char);
|
|
|
|
|
if(print_sexp(tb, 4096 - (len + 4*sizeof(char)), sx)) {
|
|
|
|
|
sx->opcode = ENOMEM;
|
|
|
|
|
/* we don't need to wake up anybody */
|
|
|
|
|
if(sx->flags & ESXMSG_TIMEDOUT) {
|
|
|
|
|
/* clean up all the shit:
|
|
|
|
|
* 1. remove from message tree
|
|
|
|
|
* 2. destroy message itself
|
|
|
|
|
*/
|
|
|
|
|
/* TODO: destroy the message */
|
|
|
|
|
} else
|
|
|
|
|
pthread_mutex_unlock(&(msg->wait));
|
|
|
|
|
}
|
|
|
|
|
len = strlen(tb);
|
|
|
|
|
tb += len*sizeof(char);
|
|
|
|
|
strcat(tb, ")))\0");
|
|
|
|
|
} else { /* pulse messages */
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* write it */
|
|
|
|
|
pthread_mutex_lock(&(co->oplock)); /* exclusive write */
|
|
|
|
|
SSL_write(co->ssl, (void *)buf, strlen(buf) + sizeof(char); /* TODO: SSL*/
|
|
|
|
|
pthread_mutex_unlock(&(co->oplock));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
len = 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
free(msg);
|
|
|
|
|
free(buf);
|
|
|
|
|
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* this function is an ugly implementation to get C string with uuid */
|
|
|
|
|
static char *__generate_uuid(void)
|
|
|
|
|
{
|
|
|
|
@ -846,7 +917,7 @@ void *__system_queue_listener(void *data)
|
|
|
|
|
|
|
|
|
|
if(!(sysmsg->flags & ESXMSG_SYS)) { /* not a system message */
|
|
|
|
|
sysmsg->flags |= ESXMSG_NOWAY; /* mark it's as undeliverable */
|
|
|
|
|
sysmsg->flags &= ~ESXMSG_PENDING;
|
|
|
|
|
sysmsg->flags &= ~ESXMSG_PENDING;
|
|
|
|
|
pthread_mutex_unlock(&(sysmsg->wait)); /* wake up the waitee */
|
|
|
|
|
continue;
|
|
|
|
|
} else {
|
|
|
|
@ -919,7 +990,6 @@ int connections_subsystem_init(void)
|
|
|
|
|
|
|
|
|
|
/* init SSL library */
|
|
|
|
|
SSL_library_init();
|
|
|
|
|
printf("here\n");
|
|
|
|
|
|
|
|
|
|
OpenSSL_add_all_algorithms();
|
|
|
|
|
SSL_load_error_strings();
|
|
|
|
@ -1307,6 +1377,44 @@ static sxmsg_t *__allocate_msg(int *res)
|
|
|
|
|
return msg;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void __destroy_msg(sxmsg_t *msg)
|
|
|
|
|
{
|
|
|
|
|
chnl_t *ch = msg->nch;
|
|
|
|
|
|
|
|
|
|
if(sm->flags & ESXMSG_USR) {
|
|
|
|
|
pthread_mutex_lock(&(ch->oplock));
|
|
|
|
|
idx_free(ch->idx_msg, sm->mid);
|
|
|
|
|
pthread_mutex_unlock(&(ch->oplock));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pthread_mutex_unlock(&(msg->wait));
|
|
|
|
|
pthread_mutex_destroy(&(msg->wait));
|
|
|
|
|
free(msg);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int __create_reg_msg(sxmsg_t **msg, chnl_t *ch)
|
|
|
|
|
{
|
|
|
|
|
int r = 0;
|
|
|
|
|
sxmsg_t *sm = __allocate_msg(&r);
|
|
|
|
|
|
|
|
|
|
if(r) return r;
|
|
|
|
|
else {
|
|
|
|
|
sm->pch = ch;
|
|
|
|
|
sm->flags = (ESXMSG_USR | ESXMSG_PENDING);
|
|
|
|
|
|
|
|
|
|
/* ok allocate message ID */
|
|
|
|
|
pthread_mutex_lock(&(ch->oplock));
|
|
|
|
|
sm->mid = idx_allocate(ch->idx_msg);
|
|
|
|
|
pthread_mutex_unlock(&(ch->oplock));
|
|
|
|
|
|
|
|
|
|
pthread_mutex_lock(&(sm->wait));
|
|
|
|
|
*msg = sm;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int __create_sys_msg(sxmsg_t **msg, char *uuid, chnl_t *ch, sxpayload_t *data)
|
|
|
|
|
{
|
|
|
|
|
int r = 0;
|
|
|
|
@ -1436,11 +1544,29 @@ int channel_close(conn_t *co)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* message passing */
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* How message sending works:
|
|
|
|
|
* 1. Create a message structure assigned to the channel,
|
|
|
|
|
* 2. Put S-expression context to it
|
|
|
|
|
* 3. Put the message to the queue
|
|
|
|
|
* 4. expect the result waiting on the lock mutex
|
|
|
|
|
*/
|
|
|
|
|
static int __message_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec *tio)
|
|
|
|
|
{
|
|
|
|
|
int r = 0;
|
|
|
|
|
sxmsg_t *m = NULL;
|
|
|
|
|
|
|
|
|
|
r = __create_reg_msg(&m, ch);
|
|
|
|
|
if(r) return r;
|
|
|
|
|
else {
|
|
|
|
|
/* put the message to the search tree */
|
|
|
|
|
pthread_rwlock_wrlock(&(ch->msglock));
|
|
|
|
|
usrtc_insert(ch->msgs_tree, &(m->pendinq_node), &(m->mid));
|
|
|
|
|
pthread_rwlock_unlock(&(ch->msglock));
|
|
|
|
|
/* put the message to the run queue */
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return r;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|