/* * Secure Network Transport Layer Library implementation. * This is a proprietary software. See COPYING for further details. * * (c) 2013-2014 Copyright Askele, inc. * (c) 2013-2014 Copyright Askele Ingria, inc. */ #include #include #include #include #include #include #include #include #include #ifdef WIN32 #include #else #include #include #include #include #endif #include #include #include #include #include void __destroy_msg(sxmsg_t *msg) { chnl_t *ch = msg->pch; if(msg->flags & ESXMSG_USR) { pthread_mutex_lock(&(ch->oplock)); idx_free(ch->idx_msg, msg->mid); pthread_mutex_unlock(&(ch->oplock)); } else if(msg->flags & ESXMSG_SYS) { //if(msg->uuid) free(msg->uuid); } pthread_mutex_unlock(&(msg->wait)); pthread_mutex_destroy(&(msg->wait)); free(msg); return; } sxmsg_t *__allocate_msg(int *res) { sxmsg_t *msg = malloc(sizeof(sxmsg_t)); int r = 0; if(!msg) { *res = ENOMEM; return NULL; } else { memset(msg, 0, sizeof(sxmsg_t)); if((r = pthread_mutex_init(&(msg->wait), NULL))) { free(msg); *res = r; return NULL; } usrtc_node_init(&(msg->chnl_node), msg); usrtc_node_init(&(msg->poll_node), msg); usrtc_node_init(&(msg->pendingq_node), msg); } *res = 0; return msg; } int __create_reg_msg(sxmsg_t **msg, chnl_t *ch) { int r = 0; sxmsg_t *sm = __allocate_msg(&r); if(r) return r; else { sm->pch = ch; sm->flags = (ESXMSG_USR | ESXMSG_PENDING); /* ok allocate message ID */ pthread_mutex_lock(&(ch->oplock)); sm->mid = idx_allocate(ch->idx_msg); pthread_mutex_unlock(&(ch->oplock)); pthread_mutex_lock(&(sm->wait)); *msg = sm; } return 0; } int __create_sys_msg(sxmsg_t **msg, char *uuid, chnl_t *ch, sxpayload_t *data) { int r = 0; sxmsg_t *m = __allocate_msg(&r); if(r) return r; else { /* fill values */ m->pch = ch; m->uuid = uuid; m->payload = data; /* set the right flags */ m->flags = (ESXMSG_SYS | ESXMSG_PENDING); /* we need to lock the wait mutex */ pthread_mutex_lock(&(m->wait)); *msg = m; } return 0; } /* message passing */ /* * How message sending works: * 1. Create a message structure assigned to the channel, * 2. Put S-expression context to it * 3. Put the message to the queue * 4. expect the result waiting on the lock mutex */ static int __message_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec *tio) { int r = 0; sxmsg_t *m = NULL; conn_t *co = ch->connection; if(!(co->flags & CXCONN_ESTABL)) { destroy_sexp(sx); return ESXNOCONNECT; } *msg = NULL; r = __create_reg_msg(&m, ch); if(r) return r; else { /* put the message to the search tree */ pthread_rwlock_wrlock(&(ch->msglock)); usrtc_insert(ch->msgs_tree, &(m->pendingq_node), &(m->mid)); pthread_rwlock_unlock(&(ch->msglock)); /* message assign */ m->opcode = 0; m->payload = (void *)sx; /* assign initial sx */ m->initial_sx = sx; /* put the message to the run queue */ r = pth_queue_add(co->mqueue, (void *)m, USR_MSG); if(r) return r; /* FIXME: better give up */ if(m->flags & ESXMSG_PENDING) { if(!tio) pthread_mutex_lock(&(m->wait)); else pthread_mutex_timedlock(&(m->wait), tio); } if(tio && (m->flags & ESXMSG_PENDING)) return SXOTIMEDOUT; if(!m->payload) { r = m->opcode; /* first remove the message from tree */ pthread_rwlock_wrlock(&(ch->msglock)); usrtc_delete(ch->msgs_tree, &(m->pendingq_node)); pthread_rwlock_unlock(&(ch->msglock)); /* destroy s expression */ destroy_sexp(m->initial_sx); /* destroy */ __destroy_msg(m); } else { *msg = m; if(m->opcode == ESXNOCONNECT) r = m->opcode; else r = SXOREPLYREQ; } } return r; } int msg_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg) { return __message_send(ch, sx, msg, NULL); } int msg_send_timed(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec *tio) { return __message_send(ch, sx, msg, tio); } static int __msg_reply(sxmsg_t *msg, sexp_t *sx, struct timespec *tio, int opcode) { int r = 0; chnl_t *ch = msg->pch; conn_t *co = ch->connection; if(!(co->flags & CXCONN_ESTABL)) { destroy_sexp(sx); return ESXNOCONNECT; } if(msg->flags & ESXMSG_ISREPLY) destroy_sexp((sexp_t *)msg->payload); msg->payload = sx; msg->opcode = opcode; msg->flags |= ESXMSG_PENDING; /* pending */ msg->flags |= ESXMSG_ISREPLY; /* this is a reply */ if(!sx) msg->flags &= ~ESXMSG_PENDING; else msg->flags |= ESXMSG_RMONRETR; /* put the message to the queue */ r = pth_queue_add(co->mqueue, (void *)msg, USR_MSG); if(r) return r; /* FIXME: better give up */ if(!sx) return 0; if(msg->flags & ESXMSG_PENDING) { if(!tio) pthread_mutex_lock(&(msg->wait)); else pthread_mutex_timedlock(&(msg->wait), tio); } if(tio && (msg->flags & ESXMSG_PENDING)) { msg->flags &= ~ESXMSG_PENDING; /* we will not wait for it */ return SXOTIMEDOUT; } r = msg->opcode; if(msg->flags & ESXMSG_CLOSURE) { /* destroy */ destroy_sexp(msg->initial_sx); __destroy_msg(msg); } return r; } int msg_return(sxmsg_t *msg, int opcode) { return __msg_reply(msg, NULL, NULL, opcode); } int msg_reply(sxmsg_t *msg, sexp_t *sx) { return __msg_reply(msg, sx, NULL, 0); } int msg_reply_timed(sxmsg_t *msg, sexp_t *sx, struct timespec *tio) { return __msg_reply(msg, sx, tio, 0); } /* * How message sending works: * 1. Create a message structure assigned to the channel, * 2. Put S-expression context to it * 3. Put the message to the queue * 4. Wait for job execution */ static int __message_send_pulse(chnl_t *ch, sexp_t *sx, struct timespec *tio, int nowait) { int r = 0; sxmsg_t *m = NULL; conn_t *co = ch->connection; if(!(co->flags & CXCONN_ESTABL)) { destroy_sexp(sx); return ESXNOCONNECT; } r = __create_reg_msg(&m, ch); if(r) return r; else { /* put the message to the search tree */ pthread_rwlock_wrlock(&(ch->msglock)); usrtc_insert(ch->msgs_tree, &(m->pendingq_node), &(m->mid)); pthread_rwlock_unlock(&(ch->msglock)); /* message assign */ m->opcode = 0; m->payload = (void *)sx; /* assign initial sx */ m->initial_sx = sx; m->flags |= ESXMSG_PULSE; /* put the message to the run queue */ r = pth_queue_add(co->mqueue, (void *)m, USR_MSG); if(r) return r; /* FIXME: better give up */ if(!nowait) { if(m->flags & ESXMSG_PENDING) { if(!tio) pthread_mutex_lock(&(m->wait)); else pthread_mutex_timedlock(&(m->wait), tio); } if(tio && (m->flags & ESXMSG_PENDING)) return SXOTIMEDOUT; } if(!m->payload) { r = m->opcode; /* first remove the message from tree */ pthread_rwlock_wrlock(&(ch->msglock)); usrtc_delete(ch->msgs_tree, &(m->pendingq_node)); pthread_rwlock_unlock(&(ch->msglock)); /* destroy s expression */ destroy_sexp(m->initial_sx); /* destroy */ __destroy_msg(m); } else { r = SXOREPLYREQ; } } return r; } int msg_send_pulse(chnl_t *ch, sexp_t *sx) { return __message_send_pulse(ch, sx, NULL, 0); } int msg_send_pulse_timed(chnl_t *ch, sexp_t *sx, struct timespec *tio) { return __message_send_pulse(ch, sx, tio, 0); } int msg_send_pulse_nowait(chnl_t *ch, sexp_t *sx) { return __message_send_pulse(ch, sx, NULL, 1); }