You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
libsxmp/lib/channel.c

313 lines
7.6 KiB
C

/*
* Secure Network Transport Layer Library implementation.
* This is a proprietary software. See COPYING for further details.
*
* (c) Askele Group 2013-2015 <http://askele.com>
*
*/
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <fcntl.h>
#ifdef WIN32
#include <Winsock2.h>
#else
#include <sys/select.h>
#include <netdb.h>
#include <unistd.h>
#include <uuid/uuid.h>
#endif
#include <openssl/ssl.h>
#include <openssl/err.h>
#include <tdata/usrtc.h>
#include <sexpr/sexp.h>
#include <sntl/connection.h>
extern char *__generate_uuid(void);
extern void __destroy_msg(sxmsg_t *msg);
extern int __create_sys_msg(sxmsg_t **msg, char *uuid, chnl_t *ch,
sxpayload_t *data);
static long __cmp_ulong(const void *a, const void *b)
{
return *(ulong_t *)a - *(ulong_t *)b;
}
int __alloc_channel(ulong_t cid, conn_t *co, rpc_typed_list_t *rlist, chnl_t **channel)
{
int r = 0;
chnl_t *ch = malloc(sizeof(chnl_t));
usrtc_t *msg_tree = malloc(sizeof(usrtc_t));
idx_allocator_t *idx_msg = malloc(sizeof(idx_allocator_t));
if(!idx_msg) goto __fin_enomem;
else if(idx_allocator_init(idx_msg, MAX_MSGINDEX, 0)) goto __fin_enomem;
if(!ch || !msg_tree) {
__fin_enomem:
r = ENOMEM;
goto __fin_up;
} else {
usrtc_init(msg_tree, USRTC_REDBLACK, MAX_PENDINGMSG, __cmp_ulong);
ch->cid = cid;
ch->flags = ch->use_count = 0;
ch->uuid = NULL;
usrtc_node_init(&ch->node, ch);
if(rlist) ch->rpc_list = rlist->rpc_list;
/* init locks */
if(pthread_rwlock_init(&(ch->msglock), NULL)) {
r = ENOMEM;
goto __fin_up;
}
if(pthread_mutex_init(&(ch->oplock), NULL)) {
pthread_rwlock_destroy(&(ch->msglock));
r = ENOMEM;
goto __fin_up;
}
/* assign all the stuff */
ch->idx_msg = idx_msg;
ch->msgs_tree = msg_tree;
ch->connection = co;
}
__fin_up:
if(r) {
if(idx_msg) free(idx_msg);
if(ch) free(ch);
if(msg_tree) free(msg_tree);
return r;
} else {
*channel = ch;
return 0;
}
}
/* channels */
int channel_open(conn_t *co, chnl_t **ch, int type)
{
chnl_t *nch = NULL;
conn_sys_t *ssys = co->ssys;
int r = 0;
char *uuid_;
sxpayload_t *pl;
ulong_t cid;
rpc_typed_list_t *rpclist = NULL;
usrtc_node_t *node = NULL;
sxmsg_t *sms;
if(!(co->flags & CXCONN_ESTABL)) {
return ESXNOCONNECT;
}
uuid_ = __generate_uuid();
pl = malloc(sizeof(sxpayload_t));
node = usrtc_lookup(co->rpc_list, &type);
if(node) rpclist = (rpc_typed_list_t *)usrtc_node_getdata(node);
if(!uuid_) {
if(pl) free(pl);
return ENOMEM;
}
if(!pl) {
__ffail:
if(uuid_) free(uuid_);
return ENOMEM;
} else {
pl->sx = NULL;
if(!(pl->cstr = malloc(sizeof(char)*ESX_SYSMSG_SIZE))) {
free(pl); goto __ffail;
} else memset(pl->cstr, 0, sizeof(char)*ESX_SYSMSG_SIZE);
}
pthread_rwlock_wrlock(&(co->chnl_lock));
cid = idx_allocate(co->idx_ch);
pthread_rwlock_unlock(&(co->chnl_lock));
if(cid == IDX_INVAL) {
r = ENOMEM;
goto __fini_op;
}
if((r = __alloc_channel(cid, co, rpclist, &nch))) {
goto __fini_op;
} else nch->flags |= ESXCHAN_PENDING;
nch->uuid = uuid_;
/* ok now we're ready to create a message and push channel to the list */
if((r = __create_sys_msg(&sms, uuid_, nch, pl))) {
__fail_chan:
/* destroy the channel*/
goto __fini_op;
} else {
/* put the channel to the channels search tree */
pthread_rwlock_wrlock(&(co->chnl_lock));
//printf("inserting cid = %d\n", nch->cid);
usrtc_insert(co->chnl_tree, &nch->node, &nch->cid);
pthread_rwlock_unlock(&(co->chnl_lock));
/* put system message to the run queue */
/* first form the message */
snprintf(pl->cstr, sizeof(char)*ESX_SYSMSG_SIZE,
"(ch-open ((:id %ld)(:uuid %s)(:type %d)))", nch->cid, nch->uuid, type);
nch->sysmsg = sms; /* assign system message to the channel */
/* put it */
if((r = pth_queue_add(ssys->ioqueue, (void *)sms, SYS_MSG))) {
__fail_chan_r:
/* remove it from the search tree */
pthread_rwlock_wrlock(&(co->chnl_lock));
usrtc_delete(co->chnl_tree, &nch->node);
pthread_rwlock_unlock(&(co->chnl_lock));
goto __fail_chan;
}
if(!(sms->flags & ESXMSG_PENDING)) {
/* was processed too fast */
goto __process_smsg;
} else pthread_mutex_lock(&(sms->wait)); /* will sleep until got a reply */
__process_smsg:
if(sms->opcode) {
r = sms->opcode;
goto __fail_chan_r;
} else r = 0;
nch->flags &= ~ESXCHAN_PENDING; /* mark it as established */
free(pl->cstr);
free(pl);
__destroy_msg(nch->sysmsg);
}
__fini_op:
if(r) {
if(uuid_) free(uuid_);
if(pl) {
if(pl->cstr) free(pl->cstr);
free(pl);
}
pthread_rwlock_wrlock(&(co->chnl_lock));
idx_free(co->idx_ch, nch->cid);
pthread_rwlock_unlock(&(co->chnl_lock));
idx_allocator_destroy(nch->idx_msg);
free(nch->idx_msg);
free(nch->msgs_tree);
pthread_mutex_destroy(&(nch->oplock));
pthread_rwlock_destroy(&(nch->msglock));
free(nch);
} else *ch = nch;
return r;
}
int channel_close(chnl_t *chnl)
{
char *uuid_;
usrtc_node_t *node = NULL;
int r;
conn_t *co = chnl->connection;
conn_sys_t *ssys = co->ssys;
sxmsg_t *sms;
sxpayload_t *pl;
if(!(co->flags & CXCONN_ESTABL)) {
return ESXNOCONNECT;
}
pthread_rwlock_rdlock(&(co->chnl_lock));
node = usrtc_lookup(co->chnl_tree, &chnl->cid);
pthread_rwlock_unlock(&(co->chnl_lock));
if(!node) {
return ENOENT;
}
pthread_rwlock_wrlock(&(chnl->msglock));
/* check unprocessed messages */
if(!usrtc_isempty(chnl->msgs_tree)) { /* messages on the queue */
pthread_rwlock_unlock(&(chnl->msglock));
return EBUSY;
}
uuid_ = __generate_uuid();
pl = malloc(sizeof(sxpayload_t));
if(!pl) {
pthread_rwlock_unlock(&(chnl->msglock));
free(uuid_);
return ENOMEM;
}
if(__create_sys_msg(&sms, uuid_, chnl, pl)) {
pthread_rwlock_unlock(&(chnl->msglock));
free(pl);
free(uuid_);
return ENOMEM;
}
pl->sx = NULL;
if(!(pl->cstr = malloc(sizeof(char) * ESX_SYSMSG_SIZE))) {
pthread_rwlock_unlock(&(chnl->msglock));
free(pl);
free(uuid_);
return ENOMEM;
}
memset(pl->cstr, 0, sizeof(char) * ESX_SYSMSG_SIZE);
/* put system message to the run queue */
/* first form the message */
snprintf(pl->cstr, sizeof(char) * ESX_SYSMSG_SIZE,
"(ch-close (:id %ld))", chnl->cid);
chnl->sysmsg = sms; /* assign system message to the channel */
/* put it */
if((r = pth_queue_add(ssys->ioqueue, (void *)sms, SYS_MSG))) {
pthread_rwlock_unlock(&(chnl->msglock));
return r;
}
if(!(sms->flags & ESXMSG_PENDING)) {
/* was processed too fast */
goto __process_smsg;
} else pthread_mutex_lock(&(sms->wait)); /* will sleep until got a reply */
__process_smsg:
if(sms->opcode) {
pthread_rwlock_unlock(&(chnl->msglock));
r = sms->opcode;
return r;
} else r = 0;
chnl->flags &= ~ESXCHAN_PENDING; /* mark it as established */
/* remove channel from the search tree */
pthread_rwlock_wrlock(&(chnl->connection->chnl_lock));
usrtc_delete(chnl->connection->chnl_tree, &chnl->node);
/* free index */
idx_free(co->idx_ch, chnl->cid);
pthread_rwlock_unlock(&(chnl->connection->chnl_lock));
pthread_rwlock_unlock(&(chnl->msglock));
__destroy_msg(chnl->sysmsg);
free(uuid_);
free(pl->cstr);
free(pl);
free(chnl->uuid);
idx_allocator_destroy(chnl->idx_msg);
free(chnl->idx_msg);
free(chnl->msgs_tree);
pthread_mutex_destroy(&(chnl->oplock));
pthread_rwlock_destroy(&(chnl->msglock));
free(chnl);
return 0;
}