libsxmp/lib/message.c

368 lines
9.2 KiB
C
Raw Normal View History

2015-07-20 19:15:08 +03:00
/*
2015-10-22 05:15:52 +03:00
* Secure X Message Passing Library v2 implementation.
* (sxmplv2) it superseed all versions before due to the:
2015-07-20 19:15:08 +03:00
* - memory consumption
* - new features such as pulse emitting
* - performance optimization
*
* (c) Askele Group 2013-2015 <http://askele.com>
* (c) Alexander Vdolainen 2013-2015 <avdolainen@gmail.com>
*
* libsxmp is free software: you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* libsxmp is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.";
2015-07-20 19:15:08 +03:00
*
*/
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <fcntl.h>
2015-07-31 20:02:25 +03:00
#include <sched.h>
2015-07-20 19:15:08 +03:00
#include <tdata/usrtc.h>
#include <tdata/list.h>
#include <sexpr/sexp.h>
2015-10-22 19:05:30 +03:00
#include <sxmp/limits.h>
#include <sxmp/sxmp.h>
2015-07-20 19:15:08 +03:00
2015-07-21 02:24:08 +03:00
#include "internal.h"
2015-07-20 19:15:08 +03:00
void _message_process(sxmsg_t *msg)
{
2015-10-22 19:05:30 +03:00
sxchnl_t *chan = msg->pch;
2016-03-10 01:12:18 +02:00
sxhub_t *hub = chan->link->hub;
2015-07-20 19:27:27 +03:00
sexp_t *sx, *isx;
usrtc_t *listrpc;
2015-07-20 19:27:27 +03:00
usrtc_node_t *node;
2015-10-22 19:05:30 +03:00
sxl_rpc_t *rpcc;
2015-07-20 19:27:27 +03:00
int r;
sx = parse_sexp(msg->payload, msg->mhead.payload_length);
2015-10-22 19:05:30 +03:00
if(!sx) sxmsg_return(msg, SXE_BADPROTO);
2015-07-20 19:27:27 +03:00
sexp_list_car(sx, &isx);
2015-10-22 19:05:30 +03:00
if(!isx) { r = SXE_BADPROTO; goto __return_err; }
if(isx->ty == SEXP_LIST) { r = SXE_BADPROTO; goto __return_err; }
if(isx->aty != SEXP_BASIC) { r = SXE_BADPROTO; goto __return_err; }
2015-07-20 19:27:27 +03:00
/* in case of builtin RPC change rpclist */
if(*(isx->val) == '!') listrpc = hub->stream_rpc->rpc_tree;
else listrpc = chan->rpc_list->rpc_tree; /* non builtin */
2015-07-20 19:27:27 +03:00
node = usrtc_lookup(listrpc, (void *)isx->val);
2015-10-22 19:05:30 +03:00
if(!node) { r = SXE_ENORPC; goto __return_err; }
else rpcc = (sxl_rpc_t *)usrtc_node_getdata(node);
2015-07-20 19:27:27 +03:00
2015-07-24 13:41:16 +03:00
rpcc->rpcf((void *)msg, sx);
destroy_sexp(sx);
2015-07-20 19:27:27 +03:00
return;
__return_err:
destroy_sexp(sx);
sxmsg_return(msg, r);
2015-07-20 19:15:08 +03:00
return;
}
2015-07-20 20:41:11 +03:00
2015-10-22 19:05:30 +03:00
static inline int __sxmsg_send(sxchnl_t *channel, const char *data, size_t datalen,
2015-07-21 02:24:08 +03:00
sxmsg_t **omsg, int pp)
2015-07-20 20:41:11 +03:00
{
2015-10-22 19:05:30 +03:00
sxlink_t *co;
2015-07-21 02:24:08 +03:00
sxmsg_t *msg;
2015-10-22 19:05:30 +03:00
sxmplv2_head_t *head;
sxppmsg_t *ppm;
2015-07-21 02:24:08 +03:00
int msgidx, r;
2015-10-22 19:05:30 +03:00
if(!channel) return SXE_FAILED;
if(!data || !datalen) return SXE_FAILED;
2015-07-21 02:24:08 +03:00
2015-10-22 19:05:30 +03:00
if(!(msg = malloc(sizeof(sxmsg_t)))) return SXE_ENOMEM;
2015-07-21 02:24:08 +03:00
else memset(msg, 0, sizeof(sxmsg_t));
2015-10-22 19:05:30 +03:00
co = channel->link;
2015-07-21 02:24:08 +03:00
head = &msg->mhead;
/* form a head */
head->attr = SXMSG_OPEN | SXMSG_REPLYREQ;
head->reserve = channel->cid;
head->payload_length = datalen;
/* init message itself */
pthread_mutex_init(&msg->wait, NULL);
pthread_mutex_lock(&msg->wait);
msg->pch = channel;
msg->payload = (void *)data;
pthread_mutex_lock(&co->idx_msg_lock);
msgidx = idx_allocate(&co->idx_msg);
if(msgidx != IDX_INVAL) co->messages[msgidx] = msg;
pthread_mutex_unlock(&co->idx_msg_lock);
2015-10-22 19:05:30 +03:00
if(msgidx == IDX_INVAL) { r = SXE_MMESSAGES; goto __freemsg; }
2015-07-21 02:24:08 +03:00
else head->msgid = (uint16_t)msgidx;
/* ready to send it */
if(!pp) {
2015-10-22 19:05:30 +03:00
r = _sxmpl_writemsg(co, msg);
if(r != SXE_SUCCESS) {
2015-07-31 20:02:25 +03:00
__unpinmsg:
pthread_mutex_lock(&co->idx_msg_lock);
idx_free(&co->idx_msg, msgidx);
co->messages[msgidx] = NULL;
pthread_mutex_unlock(&co->idx_msg_lock);
goto __freemsg;
}
2015-07-21 02:24:08 +03:00
} else { /* postponed */
2015-10-22 19:05:30 +03:00
if(!(ppm = malloc(sizeof(sxppmsg_t)))) { r = SXE_ENOMEM; goto __unpinmsg; }
2015-07-21 02:24:08 +03:00
list_init_node(&ppm->node);
ppm->msg = msg;
/* under locking here */
pthread_mutex_lock(&co->write_pending_lock);
2015-07-25 03:25:35 +03:00
co->pending_messages++;
2015-07-21 02:24:08 +03:00
list_add2tail(&co->write_pending, &ppm->node); /* push it to the FIFO */
pthread_mutex_unlock(&co->write_pending_lock);
}
2015-07-31 21:27:52 +03:00
pthread_mutex_lock(&msg->wait); /* we will sleep here */
#if 0
2015-07-31 20:02:25 +03:00
while(pthread_mutex_trylock(&msg->wait)) {
//printf("here opcode = %d\n", head->opcode);
}
2015-07-31 21:27:52 +03:00
#endif
2015-07-21 02:24:08 +03:00
if(head->payload_length) {
*omsg = msg;
return head->opcode;
} else r = head->opcode;
__freemsg:
/* free resources for message */
pthread_mutex_unlock(&msg->wait);
pthread_mutex_destroy(&msg->wait);
free(msg);
return r;
}
2015-10-22 19:05:30 +03:00
int sxmsg_send(sxchnl_t *channel, const char *data, size_t datalen, sxmsg_t **omsg)
2015-07-21 02:24:08 +03:00
{
return __sxmsg_send(channel, data, datalen, omsg, 0);
2015-07-20 20:41:11 +03:00
}
/* the same - postponed message i.e. will be written to the queue - not to write immendatly */
2015-10-22 19:05:30 +03:00
int sxmsg_send_pp(sxchnl_t *channel, const char *data, size_t datalen, sxmsg_t **omsg)
2015-07-20 20:41:11 +03:00
{
2015-07-21 02:24:08 +03:00
return __sxmsg_send(channel, data, datalen, omsg, 1);
2015-07-20 20:41:11 +03:00
}
/* send a pulse message */
2015-10-22 19:05:30 +03:00
int sxmsg_pulse(sxlink_t *co, const char *data, size_t datalen)
2015-07-20 20:41:11 +03:00
{
2015-07-21 04:06:06 +03:00
sxmsg_t *msg = malloc(sizeof(sxmsg_t));
2015-10-22 19:05:30 +03:00
sxmplv2_head_t *head;
2015-07-21 04:06:06 +03:00
int r;
/* a little bit of paranoid tests */
2015-10-22 19:05:30 +03:00
if(!msg) return SXE_ENOMEM;
2015-07-21 04:06:06 +03:00
else memset(msg, 0, sizeof(sxmsg_t));
/* prepare it */
head = &msg->mhead;
head->attr = 0;
head->attr = SXMSG_PULSE | SXMSG_LINK;
2015-10-22 19:05:30 +03:00
head->opcode = SXE_RAPIDMSG;
2015-07-21 04:06:06 +03:00
head->payload_length = datalen;
msg->payload = (void *)data;
2015-10-22 19:05:30 +03:00
r = _sxmpl_writemsg(co, msg);
2015-07-21 04:06:06 +03:00
free(msg);
return r;
2015-07-20 20:41:11 +03:00
}
2015-07-21 04:06:06 +03:00
static inline int __sxmsg_reply(sxmsg_t *msg, const char *data,
size_t datalen, int pp)
2015-07-20 20:41:11 +03:00
{
2015-10-22 19:05:30 +03:00
sxchnl_t *ch;
sxlink_t *co;
sxmplv2_head_t *head;
sxppmsg_t *ppm;
2015-07-21 04:06:06 +03:00
int r, i;
pthread_t self = pthread_self();
/* a little bit of paranoid tests */
2015-10-22 19:05:30 +03:00
if(!msg) return SXE_FAILED;
if(!(ch = msg->pch)) return SXE_FAILED;
if(!(co = ch->link)) return SXE_FAILED;
2015-07-21 04:06:06 +03:00
/* test for blocking */
2015-10-22 19:05:30 +03:00
for(i = 0; i < MAX_SXMPLTHREADS; i++)
if(pthread_equal(self, co->thrd_poll[i])) return SXE_WOULDBLOCK;
2015-07-21 04:06:06 +03:00
/* prepare it */
head = &msg->mhead;
head->attr = 0;
head->attr |= SXMSG_REPLYREQ;
2015-10-22 19:05:30 +03:00
head->opcode = SXE_REPLYREQ;
2015-07-21 04:06:06 +03:00
head->payload_length = datalen;
msg->payload = (void *)data;
if(!pp) {
2015-10-22 19:05:30 +03:00
r = _sxmpl_writemsg(co, msg);
if(r != SXE_SUCCESS) return r;
2015-07-21 04:06:06 +03:00
} else {
2015-10-22 19:05:30 +03:00
if(!(ppm = malloc(sizeof(sxppmsg_t)))) return SXE_ENOMEM;
2015-07-21 04:06:06 +03:00
list_init_node(&ppm->node);
ppm->msg = msg;
/* under locking here */
pthread_mutex_lock(&co->write_pending_lock);
2015-07-25 03:25:35 +03:00
co->pending_messages++;
2015-07-21 04:06:06 +03:00
list_add2tail(&co->write_pending, &ppm->node); /* push it to the FIFO */
pthread_mutex_unlock(&co->write_pending_lock);
}
pthread_mutex_lock(&msg->wait); /* wait */
r = head->opcode;
if((head->attr & SXMSG_CLOSED) && !head->payload_length) { /* dialog closed and no data exists */
pthread_mutex_unlock(&msg->wait); /* we able to invalidate it */
pthread_mutex_destroy(&msg->wait);
free(msg);
}
return r;
2015-07-20 20:41:11 +03:00
}
int sxmsg_reply(sxmsg_t *msg, const char *data, size_t datalen)
{
2015-07-21 04:06:06 +03:00
return __sxmsg_reply(msg, data, datalen, 0);
2015-07-20 20:41:11 +03:00
}
int sxmsg_reply_pp(sxmsg_t *msg, const char *data, size_t datalen)
{
2015-07-21 04:06:06 +03:00
return __sxmsg_reply(msg, data, datalen, 1);
2015-07-20 20:41:11 +03:00
}
2015-07-21 04:06:06 +03:00
int sxmsg_rreply(sxmsg_t *msg, size_t datalen)
2015-07-20 20:41:11 +03:00
{
2015-10-22 19:05:30 +03:00
sxchnl_t *ch;
sxlink_t *co;
sxmplv2_head_t *head;
int r, mid;
2015-07-20 20:41:11 +03:00
2015-07-21 04:06:06 +03:00
/* a little bit of paranoid tests */
2015-10-22 19:05:30 +03:00
if(!msg) return SXE_FAILED;
if(!(ch = msg->pch)) return SXE_FAILED;
if(!(co = ch->link)) return SXE_FAILED;
2015-07-21 04:06:06 +03:00
/* prepare it */
head = &msg->mhead;
head->attr = 0;
head->attr |= SXMSG_CLOSED;
2015-10-22 19:05:30 +03:00
head->opcode = SXE_RAPIDMSG;
head->payload_length = (uint16_t)datalen;
2015-07-21 04:06:06 +03:00
mid = head->msgid;
2015-07-21 04:06:06 +03:00
pthread_mutex_lock(&co->idx_msg_lock);
idx_free(&co->idx_msg, mid);
co->messages[mid] = NULL;
2015-07-21 04:06:06 +03:00
pthread_mutex_unlock(&co->idx_msg_lock);
r = _sxmpl_rapidwrite(co, msg);
2015-07-21 04:06:06 +03:00
pthread_mutex_unlock(&msg->wait); /* we able to invalidate it */
pthread_mutex_destroy(&msg->wait);
free(msg);
return r;
2015-07-20 20:41:11 +03:00
}
2015-07-21 02:24:08 +03:00
static inline int __sxmsg_return(sxmsg_t *msg, int opcode, int pp)
{
2015-10-22 19:05:30 +03:00
sxchnl_t *ch;
sxlink_t *co;
sxmplv2_head_t *head;
sxppmsg_t *ppm;
int r, mid;
2015-07-21 02:24:08 +03:00
/* a little bit of paranoid tests */
2015-10-22 19:05:30 +03:00
if(!msg) return SXE_FAILED;
if(!(ch = msg->pch)) return SXE_FAILED;
if(!(co = ch->link)) return SXE_FAILED;
2015-07-21 02:24:08 +03:00
head = &msg->mhead;
head->attr = 0;
head->attr |= SXMSG_CLOSED;
head->opcode = opcode;
head->payload_length = 0;
mid = head->msgid;
2015-07-21 02:24:08 +03:00
if(!pp) {
/* free index */
pthread_mutex_lock(&co->idx_msg_lock);
idx_free(&co->idx_msg, mid);
co->messages[mid] = NULL;
2015-07-21 02:24:08 +03:00
pthread_mutex_unlock(&co->idx_msg_lock);
2015-10-22 19:05:30 +03:00
r = _sxmpl_writemsg(co, msg);
2015-07-31 20:02:25 +03:00
free(msg);
2015-07-21 02:24:08 +03:00
} else {
2015-10-22 19:05:30 +03:00
if(!(ppm = malloc(sizeof(sxppmsg_t)))) return SXE_ENOMEM;
2015-07-21 02:24:08 +03:00
else { /* remove it */
pthread_mutex_lock(&co->idx_msg_lock);
idx_free(&co->idx_msg, mid);
co->messages[mid] = NULL;
2015-07-21 02:24:08 +03:00
pthread_mutex_unlock(&co->idx_msg_lock);
}
list_init_node(&ppm->node);
ppm->msg = msg;
/* under locking here */
pthread_mutex_lock(&co->write_pending_lock);
2015-07-25 03:25:35 +03:00
co->pending_messages++;
2015-07-21 02:24:08 +03:00
list_add2tail(&co->write_pending, &ppm->node); /* push it to the FIFO */
pthread_mutex_unlock(&co->write_pending_lock);
2015-10-22 19:05:30 +03:00
r = SXE_SUCCESS;
2015-07-21 02:24:08 +03:00
}
return r;
}
2015-07-20 20:41:11 +03:00
int sxmsg_return(sxmsg_t *msg, int opcode)
{
2015-07-21 02:24:08 +03:00
return __sxmsg_return(msg, opcode, 0);
2015-07-20 20:41:11 +03:00
}
int sxmsg_return_pp(sxmsg_t *msg, int opcode)
{
2015-07-21 02:24:08 +03:00
return __sxmsg_return(msg, opcode, 1);
2015-07-20 20:41:11 +03:00
}
2015-07-23 23:50:22 +03:00
void sxmsg_clean(sxmsg_t *msg)
{
free(msg->payload);
free(msg);
return;
}