/* * Secure X Message Passing Library v2 implementation. * (sxmplv2) it superseed all versions before due to the: * - memory consumption * - new features such as pulse emitting * - performance optimization * * (c) Askele Group 2013-2015 * (c) Alexander Vdolainen 2013-2015 * * libsxmp is free software: you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as published * by the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libsxmp is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * See the GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with this program. If not, see ."; * */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "internal.h" void _message_process(sxmsg_t *msg) { sxchnl_t *chan = msg->pch; sxhub_t *hub = chan->link->hub; sexp_t *sx, *isx; usrtc_t *listrpc; usrtc_node_t *node; sxl_rpc_t *rpcc; int r; sx = parse_sexp(msg->payload, msg->mhead.payload_length); if(!sx) sxmsg_return(msg, SXE_BADPROTO); sexp_list_car(sx, &isx); if(!isx) { r = SXE_BADPROTO; goto __return_err; } if(isx->ty == SEXP_LIST) { r = SXE_BADPROTO; goto __return_err; } if(isx->aty != SEXP_BASIC) { r = SXE_BADPROTO; goto __return_err; } /* in case of builtin RPC change rpclist */ if(*(isx->val) == '!') listrpc = hub->stream_rpc->rpc_tree; else listrpc = chan->rpc_list->rpc_tree; /* non builtin */ node = usrtc_lookup(listrpc, (void *)isx->val); if(!node) { r = SXE_ENORPC; goto __return_err; } else rpcc = (sxl_rpc_t *)usrtc_node_getdata(node); rpcc->rpcf((void *)msg, sx); destroy_sexp(sx); return; __return_err: destroy_sexp(sx); sxmsg_return(msg, r); return; } static inline int __sxmsg_send(sxchnl_t *channel, const char *data, size_t datalen, sxmsg_t **omsg, int pp) { sxlink_t *co; sxmsg_t *msg; sxmplv2_head_t *head; sxppmsg_t *ppm; int msgidx, r; if(!channel) return SXE_FAILED; if(!data || !datalen) return SXE_FAILED; if(!(msg = malloc(sizeof(sxmsg_t)))) return SXE_ENOMEM; else memset(msg, 0, sizeof(sxmsg_t)); co = channel->link; head = &msg->mhead; /* form a head */ head->attr = SXMSG_OPEN | SXMSG_REPLYREQ; head->reserve = channel->cid; head->payload_length = datalen; /* init message itself */ pthread_mutex_init(&msg->wait, NULL); pthread_mutex_lock(&msg->wait); msg->pch = channel; msg->payload = (void *)data; pthread_mutex_lock(&co->idx_msg_lock); msgidx = idx_allocate(&co->idx_msg); if(msgidx != IDX_INVAL) co->messages[msgidx] = msg; pthread_mutex_unlock(&co->idx_msg_lock); if(msgidx == IDX_INVAL) { r = SXE_MMESSAGES; goto __freemsg; } else head->msgid = (uint16_t)msgidx; /* ready to send it */ if(!pp) { r = _sxmpl_writemsg(co, msg); if(r != SXE_SUCCESS) { __unpinmsg: pthread_mutex_lock(&co->idx_msg_lock); idx_free(&co->idx_msg, msgidx); co->messages[msgidx] = NULL; pthread_mutex_unlock(&co->idx_msg_lock); goto __freemsg; } } else { /* postponed */ if(!(ppm = malloc(sizeof(sxppmsg_t)))) { r = SXE_ENOMEM; goto __unpinmsg; } list_init_node(&ppm->node); ppm->msg = msg; /* under locking here */ pthread_mutex_lock(&co->write_pending_lock); co->pending_messages++; list_add2tail(&co->write_pending, &ppm->node); /* push it to the FIFO */ pthread_mutex_unlock(&co->write_pending_lock); } pthread_mutex_lock(&msg->wait); /* we will sleep here */ #if 0 while(pthread_mutex_trylock(&msg->wait)) { //printf("here opcode = %d\n", head->opcode); } #endif if(head->payload_length) { *omsg = msg; return head->opcode; } else r = head->opcode; __freemsg: /* free resources for message */ pthread_mutex_unlock(&msg->wait); pthread_mutex_destroy(&msg->wait); free(msg); return r; } int sxmsg_send(sxchnl_t *channel, const char *data, size_t datalen, sxmsg_t **omsg) { return __sxmsg_send(channel, data, datalen, omsg, 0); } /* the same - postponed message i.e. will be written to the queue - not to write immendatly */ int sxmsg_send_pp(sxchnl_t *channel, const char *data, size_t datalen, sxmsg_t **omsg) { return __sxmsg_send(channel, data, datalen, omsg, 1); } /* send a pulse message */ int sxmsg_pulse(sxlink_t *co, const char *data, size_t datalen) { sxmsg_t *msg = malloc(sizeof(sxmsg_t)); sxmplv2_head_t *head; int r; /* a little bit of paranoid tests */ if(!msg) return SXE_ENOMEM; else memset(msg, 0, sizeof(sxmsg_t)); /* prepare it */ head = &msg->mhead; head->attr = 0; head->attr = SXMSG_PULSE | SXMSG_LINK; head->opcode = SXE_RAPIDMSG; head->payload_length = datalen; msg->payload = (void *)data; r = _sxmpl_writemsg(co, msg); free(msg); return r; } static inline int __sxmsg_reply(sxmsg_t *msg, const char *data, size_t datalen, int pp) { sxchnl_t *ch; sxlink_t *co; sxmplv2_head_t *head; sxppmsg_t *ppm; int r, i; pthread_t self = pthread_self(); /* a little bit of paranoid tests */ if(!msg) return SXE_FAILED; if(!(ch = msg->pch)) return SXE_FAILED; if(!(co = ch->link)) return SXE_FAILED; /* test for blocking */ for(i = 0; i < MAX_SXMPLTHREADS; i++) if(pthread_equal(self, co->thrd_poll[i])) return SXE_WOULDBLOCK; /* prepare it */ head = &msg->mhead; head->attr = 0; head->attr |= SXMSG_REPLYREQ; head->opcode = SXE_REPLYREQ; head->payload_length = datalen; msg->payload = (void *)data; if(!pp) { r = _sxmpl_writemsg(co, msg); if(r != SXE_SUCCESS) return r; } else { if(!(ppm = malloc(sizeof(sxppmsg_t)))) return SXE_ENOMEM; list_init_node(&ppm->node); ppm->msg = msg; /* under locking here */ pthread_mutex_lock(&co->write_pending_lock); co->pending_messages++; list_add2tail(&co->write_pending, &ppm->node); /* push it to the FIFO */ pthread_mutex_unlock(&co->write_pending_lock); } pthread_mutex_lock(&msg->wait); /* wait */ r = head->opcode; if((head->attr & SXMSG_CLOSED) && !head->payload_length) { /* dialog closed and no data exists */ pthread_mutex_unlock(&msg->wait); /* we able to invalidate it */ pthread_mutex_destroy(&msg->wait); free(msg); } return r; } int sxmsg_reply(sxmsg_t *msg, const char *data, size_t datalen) { return __sxmsg_reply(msg, data, datalen, 0); } int sxmsg_reply_pp(sxmsg_t *msg, const char *data, size_t datalen) { return __sxmsg_reply(msg, data, datalen, 1); } int sxmsg_rreply(sxmsg_t *msg, size_t datalen) { sxchnl_t *ch; sxlink_t *co; sxmplv2_head_t *head; int r, mid; /* a little bit of paranoid tests */ if(!msg) return SXE_FAILED; if(!(ch = msg->pch)) return SXE_FAILED; if(!(co = ch->link)) return SXE_FAILED; /* prepare it */ head = &msg->mhead; head->attr = 0; head->attr |= SXMSG_CLOSED; head->opcode = SXE_RAPIDMSG; head->payload_length = (uint16_t)datalen; mid = head->msgid; pthread_mutex_lock(&co->idx_msg_lock); idx_free(&co->idx_msg, mid); co->messages[mid] = NULL; pthread_mutex_unlock(&co->idx_msg_lock); r = _sxmpl_rapidwrite(co, msg); pthread_mutex_unlock(&msg->wait); /* we able to invalidate it */ pthread_mutex_destroy(&msg->wait); free(msg); return r; } static inline int __sxmsg_return(sxmsg_t *msg, int opcode, int pp) { sxchnl_t *ch; sxlink_t *co; sxmplv2_head_t *head; sxppmsg_t *ppm; int r, mid; /* a little bit of paranoid tests */ if(!msg) return SXE_FAILED; if(!(ch = msg->pch)) return SXE_FAILED; if(!(co = ch->link)) return SXE_FAILED; head = &msg->mhead; head->attr = 0; head->attr |= SXMSG_CLOSED; head->opcode = opcode; head->payload_length = 0; mid = head->msgid; if(!pp) { /* free index */ pthread_mutex_lock(&co->idx_msg_lock); idx_free(&co->idx_msg, mid); co->messages[mid] = NULL; pthread_mutex_unlock(&co->idx_msg_lock); r = _sxmpl_writemsg(co, msg); free(msg); } else { if(!(ppm = malloc(sizeof(sxppmsg_t)))) return SXE_ENOMEM; else { /* remove it */ pthread_mutex_lock(&co->idx_msg_lock); idx_free(&co->idx_msg, mid); co->messages[mid] = NULL; pthread_mutex_unlock(&co->idx_msg_lock); } list_init_node(&ppm->node); ppm->msg = msg; /* under locking here */ pthread_mutex_lock(&co->write_pending_lock); co->pending_messages++; list_add2tail(&co->write_pending, &ppm->node); /* push it to the FIFO */ pthread_mutex_unlock(&co->write_pending_lock); r = SXE_SUCCESS; } return r; } int sxmsg_return(sxmsg_t *msg, int opcode) { return __sxmsg_return(msg, opcode, 0); } int sxmsg_return_pp(sxmsg_t *msg, int opcode) { return __sxmsg_return(msg, opcode, 1); } void sxmsg_clean(sxmsg_t *msg) { free(msg->payload); free(msg); return; }