|
|
|
/*
|
|
|
|
* Secure Network Transport Layer Library v2 implementation.
|
|
|
|
* (sntllv2) it superseed all versions before due to the:
|
|
|
|
* - memory consumption
|
|
|
|
* - new features such as pulse emitting
|
|
|
|
* - performance optimization
|
|
|
|
*
|
|
|
|
* This is a proprietary software. See COPYING for further details.
|
|
|
|
*
|
|
|
|
* (c) Askele Group 2013-2015 <http://askele.com>
|
|
|
|
*
|
|
|
|
*/
|
|
|
|
|
|
|
|
#include <stdlib.h>
|
|
|
|
#include <stdio.h>
|
|
|
|
#include <errno.h>
|
|
|
|
#include <string.h>
|
|
|
|
#include <pthread.h>
|
|
|
|
#include <sys/stat.h>
|
|
|
|
#include <sys/time.h>
|
|
|
|
#include <sys/types.h>
|
|
|
|
#include <fcntl.h>
|
|
|
|
|
|
|
|
#include <tdata/usrtc.h>
|
|
|
|
#include <tdata/list.h>
|
|
|
|
#include <sexpr/sexp.h>
|
|
|
|
|
|
|
|
#include <sntl/sntllv2.h>
|
|
|
|
|
|
|
|
#include "internal.h"
|
|
|
|
|
|
|
|
void _message_process(sxmsg_t *msg)
|
|
|
|
{
|
|
|
|
chnl_t *chan = msg->pch;
|
|
|
|
sexp_t *sx, *isx;
|
|
|
|
usrtc_t *listrpc = chan->rpc_list->rpc_tree;
|
|
|
|
usrtc_node_t *node;
|
|
|
|
cx_rpc_t *rpcc;
|
|
|
|
int r;
|
|
|
|
|
|
|
|
sx = parse_sexp(msg->payload, msg->mhead.payload_length);
|
|
|
|
if(!sx) sxmsg_return(msg, SNE_BADPROTO);
|
|
|
|
|
|
|
|
sexp_list_car(sx, &isx);
|
|
|
|
if(!isx) { r = SNE_BADPROTO; goto __return_err; }
|
|
|
|
if(isx->ty == SEXP_LIST) { r = SNE_BADPROTO; goto __return_err; }
|
|
|
|
if(isx->aty != SEXP_BASIC) { r = SNE_BADPROTO; goto __return_err; }
|
|
|
|
|
|
|
|
node = usrtc_lookup(listrpc, (void *)isx->val);
|
|
|
|
if(!node) { r = SNE_ENORPC; goto __return_err; }
|
|
|
|
else rpcc = (cx_rpc_t *)usrtc_node_getdata(node);
|
|
|
|
|
|
|
|
rpcc->rpcf((void *)msg, sx); /* sx *MUST* be destroy asap */
|
|
|
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
__return_err:
|
|
|
|
destroy_sexp(sx);
|
|
|
|
sxmsg_return(msg, r);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
static inline int __sxmsg_send(chnl_t *channel, const char *data, size_t datalen,
|
|
|
|
sxmsg_t **omsg, int pp)
|
|
|
|
{
|
|
|
|
conn_t *co;
|
|
|
|
sxmsg_t *msg;
|
|
|
|
sntllv2_head_t *head;
|
|
|
|
ppmsg_t *ppm;
|
|
|
|
int msgidx, r;
|
|
|
|
|
|
|
|
if(!channel) return SNE_FAILED;
|
|
|
|
if(!data || !datalen) return SNE_FAILED;
|
|
|
|
|
|
|
|
if(!(msg = malloc(sizeof(sxmsg_t)))) return SNE_ENOMEM;
|
|
|
|
else memset(msg, 0, sizeof(sxmsg_t));
|
|
|
|
|
|
|
|
co = channel->connection;
|
|
|
|
head = &msg->mhead;
|
|
|
|
/* form a head */
|
|
|
|
head->attr = SXMSG_OPEN | SXMSG_REPLYREQ;
|
|
|
|
head->reserve = channel->cid;
|
|
|
|
head->payload_length = datalen;
|
|
|
|
/* init message itself */
|
|
|
|
pthread_mutex_init(&msg->wait, NULL);
|
|
|
|
pthread_mutex_lock(&msg->wait);
|
|
|
|
msg->pch = channel;
|
|
|
|
msg->payload = (void *)data;
|
|
|
|
|
|
|
|
pthread_mutex_lock(&co->idx_msg_lock);
|
|
|
|
msgidx = idx_allocate(&co->idx_msg);
|
|
|
|
if(msgidx != IDX_INVAL) co->messages[msgidx] = msg;
|
|
|
|
pthread_mutex_unlock(&co->idx_msg_lock);
|
|
|
|
|
|
|
|
if(msgidx == IDX_INVAL) { r = SNE_MMESSAGES; goto __freemsg; }
|
|
|
|
else head->msgid = (uint16_t)msgidx;
|
|
|
|
|
|
|
|
/* ready to send it */
|
|
|
|
if(!pp) {
|
|
|
|
r = _sntll_writemsg(co, msg);
|
|
|
|
if(r != SNE_SUCCESS) goto __closemsg;
|
|
|
|
} else { /* postponed */
|
|
|
|
if(!(ppm = malloc(sizeof(ppmsg_t)))) { r = SNE_ENOMEM; goto __closemsg; }
|
|
|
|
list_init_node(&ppm->node);
|
|
|
|
ppm->msg = msg;
|
|
|
|
|
|
|
|
/* under locking here */
|
|
|
|
pthread_mutex_lock(&co->write_pending_lock);
|
|
|
|
list_add2tail(&co->write_pending, &ppm->node); /* push it to the FIFO */
|
|
|
|
pthread_mutex_unlock(&co->write_pending_lock);
|
|
|
|
}
|
|
|
|
|
|
|
|
pthread_mutex_lock(&msg->wait); /* we will sleep here */
|
|
|
|
|
|
|
|
if(head->payload_length) {
|
|
|
|
*omsg = msg;
|
|
|
|
return head->opcode;
|
|
|
|
} else r = head->opcode;
|
|
|
|
|
|
|
|
__closemsg:
|
|
|
|
pthread_mutex_lock(&co->idx_msg_lock);
|
|
|
|
idx_free(&co->idx_msg, msgidx);
|
|
|
|
co->messages[msgidx] = NULL;
|
|
|
|
pthread_mutex_unlock(&co->idx_msg_lock);
|
|
|
|
__freemsg:
|
|
|
|
/* free resources for message */
|
|
|
|
pthread_mutex_unlock(&msg->wait);
|
|
|
|
pthread_mutex_destroy(&msg->wait);
|
|
|
|
free(msg);
|
|
|
|
|
|
|
|
return r;
|
|
|
|
}
|
|
|
|
|
|
|
|
int sxmsg_send(chnl_t *channel, const char *data, size_t datalen, sxmsg_t **omsg)
|
|
|
|
{
|
|
|
|
return __sxmsg_send(channel, data, datalen, omsg, 0);
|
|
|
|
}
|
|
|
|
|
|
|
|
/* the same - postponed message i.e. will be written to the queue - not to write immendatly */
|
|
|
|
int sxmsg_send_pp(chnl_t *channel, const char *data, size_t datalen, sxmsg_t **omsg)
|
|
|
|
{
|
|
|
|
return __sxmsg_send(channel, data, datalen, omsg, 1);
|
|
|
|
}
|
|
|
|
|
|
|
|
/* send a pulse message */
|
|
|
|
int sxmsg_pulse(conn_t *co, const char *data, size_t datalen)
|
|
|
|
{
|
|
|
|
sxmsg_t *msg = malloc(sizeof(sxmsg_t));
|
|
|
|
sntllv2_head_t *head;
|
|
|
|
int r;
|
|
|
|
|
|
|
|
/* a little bit of paranoid tests */
|
|
|
|
if(!msg) return SNE_ENOMEM;
|
|
|
|
else memset(msg, 0, sizeof(sxmsg_t));
|
|
|
|
|
|
|
|
/* prepare it */
|
|
|
|
head = &msg->mhead;
|
|
|
|
head->attr = 0;
|
|
|
|
head->attr = SXMSG_PULSE | SXMSG_LINK;
|
|
|
|
head->opcode = SNE_RAPIDMSG;
|
|
|
|
head->payload_length = datalen;
|
|
|
|
msg->payload = (void *)data;
|
|
|
|
|
|
|
|
r = _sntll_writemsg(co, msg);
|
|
|
|
|
|
|
|
free(msg);
|
|
|
|
|
|
|
|
return r;
|
|
|
|
}
|
|
|
|
|
|
|
|
static inline int __sxmsg_reply(sxmsg_t *msg, const char *data,
|
|
|
|
size_t datalen, int pp)
|
|
|
|
{
|
|
|
|
chnl_t *ch;
|
|
|
|
conn_t *co;
|
|
|
|
sntllv2_head_t *head;
|
|
|
|
ppmsg_t *ppm;
|
|
|
|
int r, i;
|
|
|
|
pthread_t self = pthread_self();
|
|
|
|
|
|
|
|
/* a little bit of paranoid tests */
|
|
|
|
if(!msg) return SNE_FAILED;
|
|
|
|
if(!(ch = msg->pch)) return SNE_FAILED;
|
|
|
|
if(!(co = ch->connection)) return SNE_FAILED;
|
|
|
|
|
|
|
|
/* test for blocking */
|
|
|
|
for(i = 0; i < 8; i++)
|
|
|
|
if(pthread_equal(self, co->thrd_poll[i])) return SNE_WOULDBLOCK;
|
|
|
|
|
|
|
|
/* prepare it */
|
|
|
|
head = &msg->mhead;
|
|
|
|
head->attr = 0;
|
|
|
|
head->attr |= SXMSG_REPLYREQ;
|
|
|
|
head->opcode = SNE_REPLYREQ;
|
|
|
|
head->payload_length = datalen;
|
|
|
|
msg->payload = (void *)data;
|
|
|
|
|
|
|
|
if(!pp) {
|
|
|
|
r = _sntll_writemsg(co, msg);
|
|
|
|
if(r != SNE_SUCCESS) return r;
|
|
|
|
} else {
|
|
|
|
if(!(ppm = malloc(sizeof(ppmsg_t)))) return SNE_ENOMEM;
|
|
|
|
list_init_node(&ppm->node);
|
|
|
|
ppm->msg = msg;
|
|
|
|
|
|
|
|
/* under locking here */
|
|
|
|
pthread_mutex_lock(&co->write_pending_lock);
|
|
|
|
list_add2tail(&co->write_pending, &ppm->node); /* push it to the FIFO */
|
|
|
|
pthread_mutex_unlock(&co->write_pending_lock);
|
|
|
|
}
|
|
|
|
|
|
|
|
pthread_mutex_lock(&msg->wait); /* wait */
|
|
|
|
|
|
|
|
r = head->opcode;
|
|
|
|
|
|
|
|
if((head->attr & SXMSG_CLOSED) && !head->payload_length) { /* dialog closed and no data exists */
|
|
|
|
pthread_mutex_unlock(&msg->wait); /* we able to invalidate it */
|
|
|
|
pthread_mutex_destroy(&msg->wait);
|
|
|
|
free(msg);
|
|
|
|
}
|
|
|
|
|
|
|
|
return r;
|
|
|
|
}
|
|
|
|
|
|
|
|
int sxmsg_reply(sxmsg_t *msg, const char *data, size_t datalen)
|
|
|
|
{
|
|
|
|
return __sxmsg_reply(msg, data, datalen, 0);
|
|
|
|
}
|
|
|
|
|
|
|
|
int sxmsg_reply_pp(sxmsg_t *msg, const char *data, size_t datalen)
|
|
|
|
{
|
|
|
|
return __sxmsg_reply(msg, data, datalen, 1);
|
|
|
|
}
|
|
|
|
|
|
|
|
int sxmsg_rreply(sxmsg_t *msg, size_t datalen)
|
|
|
|
{
|
|
|
|
chnl_t *ch;
|
|
|
|
conn_t *co;
|
|
|
|
sntllv2_head_t *head;
|
|
|
|
int r;
|
|
|
|
|
|
|
|
/* a little bit of paranoid tests */
|
|
|
|
if(!msg) return SNE_FAILED;
|
|
|
|
if(!(ch = msg->pch)) return SNE_FAILED;
|
|
|
|
if(!(co = ch->connection)) return SNE_FAILED;
|
|
|
|
|
|
|
|
/* prepare it */
|
|
|
|
head = &msg->mhead;
|
|
|
|
head->attr = 0;
|
|
|
|
head->attr |= SXMSG_CLOSED;
|
|
|
|
head->opcode = SNE_RAPIDMSG;
|
|
|
|
head->payload_length = datalen;
|
|
|
|
|
|
|
|
pthread_mutex_lock(&co->idx_msg_lock);
|
|
|
|
idx_free(&co->idx_msg, head->msgid);
|
|
|
|
co->messages[head->msgid] = NULL;
|
|
|
|
pthread_mutex_unlock(&co->idx_msg_lock);
|
|
|
|
|
|
|
|
r = _sntll_writemsg(co, msg);
|
|
|
|
|
|
|
|
pthread_mutex_unlock(&msg->wait); /* we able to invalidate it */
|
|
|
|
pthread_mutex_destroy(&msg->wait);
|
|
|
|
free(msg);
|
|
|
|
|
|
|
|
return r;
|
|
|
|
}
|
|
|
|
|
|
|
|
static inline int __sxmsg_return(sxmsg_t *msg, int opcode, int pp)
|
|
|
|
{
|
|
|
|
chnl_t *ch;
|
|
|
|
conn_t *co;
|
|
|
|
sntllv2_head_t *head;
|
|
|
|
ppmsg_t *ppm;
|
|
|
|
int r;
|
|
|
|
|
|
|
|
/* a little bit of paranoid tests */
|
|
|
|
if(!msg) return SNE_FAILED;
|
|
|
|
if(!(ch = msg->pch)) return SNE_FAILED;
|
|
|
|
if(!(co = ch->connection)) return SNE_FAILED;
|
|
|
|
|
|
|
|
head = &msg->mhead;
|
|
|
|
head->attr = 0;
|
|
|
|
head->attr |= SXMSG_CLOSED;
|
|
|
|
head->opcode = opcode;
|
|
|
|
head->payload_length = 0;
|
|
|
|
|
|
|
|
if(!pp) {
|
|
|
|
/* free index */
|
|
|
|
pthread_mutex_lock(&co->idx_msg_lock);
|
|
|
|
idx_free(&co->idx_msg, head->msgid);
|
|
|
|
co->messages[head->msgid] = NULL;
|
|
|
|
pthread_mutex_unlock(&co->idx_msg_lock);
|
|
|
|
|
|
|
|
r = _sntll_writemsg(co, msg);
|
|
|
|
} else {
|
|
|
|
if(!(ppm = malloc(sizeof(ppmsg_t)))) return SNE_ENOMEM;
|
|
|
|
else { /* remove it */
|
|
|
|
pthread_mutex_lock(&co->idx_msg_lock);
|
|
|
|
idx_free(&co->idx_msg, head->msgid);
|
|
|
|
co->messages[head->msgid] = NULL;
|
|
|
|
pthread_mutex_unlock(&co->idx_msg_lock);
|
|
|
|
}
|
|
|
|
|
|
|
|
list_init_node(&ppm->node);
|
|
|
|
ppm->msg = msg;
|
|
|
|
|
|
|
|
/* under locking here */
|
|
|
|
pthread_mutex_lock(&co->write_pending_lock);
|
|
|
|
list_add2tail(&co->write_pending, &ppm->node); /* push it to the FIFO */
|
|
|
|
pthread_mutex_unlock(&co->write_pending_lock);
|
|
|
|
|
|
|
|
r = SNE_SUCCESS;
|
|
|
|
}
|
|
|
|
|
|
|
|
return r;
|
|
|
|
}
|
|
|
|
|
|
|
|
int sxmsg_return(sxmsg_t *msg, int opcode)
|
|
|
|
{
|
|
|
|
return __sxmsg_return(msg, opcode, 0);
|
|
|
|
}
|
|
|
|
|
|
|
|
int sxmsg_return_pp(sxmsg_t *msg, int opcode)
|
|
|
|
{
|
|
|
|
return __sxmsg_return(msg, opcode, 1);
|
|
|
|
}
|
|
|
|
|