connection.c split;
parent
34a6b55f6b
commit
47394febc8
@ -0,0 +1,300 @@
|
||||
/*
|
||||
* Secure Network Transport Layer Library implementation.
|
||||
* This is a proprietary software. See COPYING for further details.
|
||||
*
|
||||
* (c) 2013-2014 Copyright Askele, inc. <http://askele.com>
|
||||
* (c) 2013-2014 Copyright Askele Ingria, inc. <http://askele-ingria.com>
|
||||
*/
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
#include <errno.h>
|
||||
#include <string.h>
|
||||
#include <pthread.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/time.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/select.h>
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
#include <netdb.h>
|
||||
|
||||
#include <uuid/uuid.h>
|
||||
#include <openssl/ssl.h>
|
||||
#include <openssl/err.h>
|
||||
|
||||
#include <tdata/usrtc.h>
|
||||
#include <sexpr/sexp.h>
|
||||
|
||||
#include <sntl/connection.h>
|
||||
|
||||
extern char *__generate_uuid(void);
|
||||
|
||||
extern void __destroy_msg(sxmsg_t *msg);
|
||||
|
||||
extern int __create_sys_msg(sxmsg_t **msg, char *uuid, chnl_t *ch,
|
||||
sxpayload_t *data);
|
||||
|
||||
static long __cmp_ulong(const void *a, const void *b)
|
||||
{
|
||||
return *(ulong_t *)a - *(ulong_t *)b;
|
||||
}
|
||||
|
||||
int __alloc_channel(ulong_t cid, conn_t *co, rpc_typed_list_t *rlist, chnl_t **channel)
|
||||
{
|
||||
int r = 0;
|
||||
chnl_t *ch = malloc(sizeof(chnl_t));
|
||||
usrtc_t *msg_tree = malloc(sizeof(usrtc_t));
|
||||
idx_allocator_t *idx_msg = malloc(sizeof(idx_allocator_t));
|
||||
|
||||
if(!idx_msg) goto __fin_enomem;
|
||||
else if(idx_allocator_init(idx_msg, MAX_MSGINDEX, 0)) goto __fin_enomem;
|
||||
|
||||
if(!ch || !msg_tree) {
|
||||
__fin_enomem:
|
||||
r = ENOMEM;
|
||||
goto __fin_up;
|
||||
} else {
|
||||
usrtc_init(msg_tree, USRTC_REDBLACK, MAX_PENDINGMSG, __cmp_ulong);
|
||||
ch->cid = cid;
|
||||
ch->flags = ch->use_count = 0;
|
||||
usrtc_node_init(&ch->node, ch);
|
||||
if(rlist) ch->rpc_list = rlist->rpc_list;
|
||||
/* init locks */
|
||||
if(pthread_rwlock_init(&(ch->msglock), NULL)) {
|
||||
r = ENOMEM;
|
||||
goto __fin_up;
|
||||
}
|
||||
if(pthread_mutex_init(&(ch->oplock), NULL)) {
|
||||
pthread_rwlock_destroy(&(ch->msglock));
|
||||
r = ENOMEM;
|
||||
goto __fin_up;
|
||||
}
|
||||
/* assign all the stuff */
|
||||
ch->idx_msg = idx_msg;
|
||||
ch->msgs_tree = msg_tree;
|
||||
ch->connection = co;
|
||||
}
|
||||
|
||||
__fin_up:
|
||||
if(r) {
|
||||
if(idx_msg) free(idx_msg);
|
||||
if(ch) free(ch);
|
||||
if(msg_tree) free(msg_tree);
|
||||
return r;
|
||||
} else {
|
||||
*channel = ch;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
/* channels */
|
||||
int channel_open(conn_t *co, chnl_t **ch, int type)
|
||||
{
|
||||
chnl_t *nch = NULL;
|
||||
int r = 0;
|
||||
char *uuid_;
|
||||
sxpayload_t *pl;
|
||||
ulong_t cid;
|
||||
rpc_typed_list_t *rpclist = NULL;
|
||||
usrtc_node_t *node = NULL;
|
||||
sxmsg_t *sms;
|
||||
|
||||
if(!(co->flags & CXCONN_ESTABL)) {
|
||||
return ESXNOCONNECT;
|
||||
}
|
||||
|
||||
uuid_ = __generate_uuid();
|
||||
pl = malloc(sizeof(sxpayload_t));
|
||||
node = usrtc_lookup(co->rpc_list, &type);
|
||||
|
||||
if(node) rpclist = (rpc_typed_list_t *)usrtc_node_getdata(node);
|
||||
|
||||
if(!uuid_) {
|
||||
if(pl) free(pl);
|
||||
return ENOMEM;
|
||||
}
|
||||
|
||||
if(!pl) {
|
||||
__ffail:
|
||||
if(uuid_) free(uuid_);
|
||||
return ENOMEM;
|
||||
} else {
|
||||
pl->sx = NULL;
|
||||
if(!(pl->cstr = malloc(sizeof(char)*ESX_SYSMSG_SIZE))) {
|
||||
free(pl); goto __ffail;
|
||||
} else memset(pl->cstr, 0, sizeof(char)*ESX_SYSMSG_SIZE);
|
||||
}
|
||||
|
||||
pthread_rwlock_wrlock(&(co->chnl_lock));
|
||||
cid = idx_allocate(co->idx_ch);
|
||||
pthread_rwlock_unlock(&(co->chnl_lock));
|
||||
if(cid == IDX_INVAL) {
|
||||
r = ENOMEM;
|
||||
goto __fini_op;
|
||||
}
|
||||
|
||||
if((r = __alloc_channel(cid, co, rpclist, &nch))) {
|
||||
goto __fini_op;
|
||||
} else nch->flags |= ESXCHAN_PENDING;
|
||||
|
||||
nch->uuid = uuid_;
|
||||
|
||||
/* ok now we're ready to create a message and push channel to the list */
|
||||
if((r = __create_sys_msg(&sms, uuid_, nch, pl))) {
|
||||
__fail_chan:
|
||||
/* destroy the channel*/
|
||||
goto __fini_op;
|
||||
} else {
|
||||
/* put the channel to the channels search tree */
|
||||
pthread_rwlock_wrlock(&(co->chnl_lock));
|
||||
//printf("inserting cid = %d\n", nch->cid);
|
||||
usrtc_insert(co->chnl_tree, &nch->node, &nch->cid);
|
||||
pthread_rwlock_unlock(&(co->chnl_lock));
|
||||
|
||||
/* put system message to the run queue */
|
||||
/* first form the message */
|
||||
snprintf(pl->cstr, sizeof(char)*ESX_SYSMSG_SIZE,
|
||||
"(ch-open ((:id %ld)(:uuid %s)(:type %d)))", nch->cid, nch->uuid, type);
|
||||
nch->sysmsg = sms; /* assign system message to the channel */
|
||||
/* put it */
|
||||
if((r = pth_queue_add(conn_sys->ioqueue, (void *)sms, SYS_MSG))) {
|
||||
__fail_chan_r:
|
||||
/* remove it from the search tree */
|
||||
pthread_rwlock_wrlock(&(co->chnl_lock));
|
||||
usrtc_delete(co->chnl_tree, &nch->node);
|
||||
pthread_rwlock_unlock(&(co->chnl_lock));
|
||||
goto __fail_chan;
|
||||
}
|
||||
if(!(sms->flags & ESXMSG_PENDING)) {
|
||||
/* was processed too fast */
|
||||
goto __process_smsg;
|
||||
} else pthread_mutex_lock(&(sms->wait)); /* will sleep until got a reply */
|
||||
__process_smsg:
|
||||
if(sms->opcode) {
|
||||
r = sms->opcode;
|
||||
goto __fail_chan_r;
|
||||
} else r = 0;
|
||||
nch->flags &= ~ESXCHAN_PENDING; /* mark it as established */
|
||||
free(pl->cstr);
|
||||
free(pl);
|
||||
__destroy_msg(nch->sysmsg);
|
||||
}
|
||||
|
||||
__fini_op:
|
||||
if(r) {
|
||||
if(uuid_) free(uuid_);
|
||||
if(pl) {
|
||||
if(pl->cstr) free(pl->cstr);
|
||||
free(pl);
|
||||
}
|
||||
pthread_rwlock_wrlock(&(co->chnl_lock));
|
||||
idx_free(co->idx_ch, nch->cid);
|
||||
pthread_rwlock_unlock(&(co->chnl_lock));
|
||||
idx_allocator_destroy(nch->idx_msg);
|
||||
free(nch->idx_msg);
|
||||
free(nch->msgs_tree);
|
||||
pthread_mutex_destroy(&(nch->oplock));
|
||||
pthread_rwlock_destroy(&(nch->msglock));
|
||||
|
||||
free(nch);
|
||||
} else *ch = nch;
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
int channel_close(chnl_t *chnl)
|
||||
{
|
||||
char *uuid_;
|
||||
usrtc_node_t *node = NULL;
|
||||
int r;
|
||||
conn_t *co = chnl->connection;
|
||||
sxmsg_t *sms;
|
||||
sxpayload_t *pl;
|
||||
|
||||
if(!(co->flags & CXCONN_ESTABL)) {
|
||||
return ESXNOCONNECT;
|
||||
}
|
||||
|
||||
uuid_ = __generate_uuid();
|
||||
|
||||
pthread_rwlock_rdlock(&(co->chnl_lock));
|
||||
node = usrtc_lookup(co->chnl_tree, &chnl->cid);
|
||||
pthread_rwlock_unlock(&(co->chnl_lock));
|
||||
if(!node) {
|
||||
fprintf(stderr, "No such channel\n");
|
||||
return ENOENT;
|
||||
}
|
||||
|
||||
pthread_rwlock_wrlock(&(chnl->msglock));
|
||||
/* check unprocessed messages */
|
||||
if(!usrtc_isempty(chnl->msgs_tree)) {
|
||||
pthread_rwlock_unlock(&(chnl->msglock));
|
||||
fprintf(stderr, "Unable to close channel\n");
|
||||
return EBUSY;
|
||||
}
|
||||
|
||||
pl = malloc(sizeof(sxpayload_t));
|
||||
if(!pl) return ENOMEM;
|
||||
if(__create_sys_msg(&sms, uuid_, chnl, pl)) {
|
||||
if(chnl->idx_msg) free(chnl->idx_msg);
|
||||
if(chnl->msgs_tree) free(chnl->msgs_tree);
|
||||
free(chnl);
|
||||
return ENOMEM;
|
||||
}
|
||||
|
||||
pl->sx = NULL;
|
||||
if(!(pl->cstr = malloc(sizeof(char) * ESX_SYSMSG_SIZE))) {
|
||||
pthread_rwlock_unlock(&(chnl->msglock));
|
||||
free(pl);
|
||||
return ENOMEM;
|
||||
}
|
||||
memset(pl->cstr, 0, sizeof(char) * ESX_SYSMSG_SIZE);
|
||||
|
||||
/* put system message to the run queue */
|
||||
/* first form the message */
|
||||
snprintf(pl->cstr, sizeof(char) * ESX_SYSMSG_SIZE,
|
||||
"(ch-close (:id %ld))", chnl->cid);
|
||||
chnl->sysmsg = sms; /* assign system message to the channel */
|
||||
/* put it */
|
||||
if((r = pth_queue_add(conn_sys->ioqueue, (void *)sms, SYS_MSG))) {
|
||||
pthread_rwlock_unlock(&(chnl->msglock));
|
||||
return r;
|
||||
}
|
||||
if(!(sms->flags & ESXMSG_PENDING)) {
|
||||
/* was processed too fast */
|
||||
goto __process_smsg;
|
||||
} else pthread_mutex_lock(&(sms->wait)); /* will sleep until got a reply */
|
||||
|
||||
__process_smsg:
|
||||
if(sms->opcode) {
|
||||
r = sms->opcode;
|
||||
return r;
|
||||
} else r = 0;
|
||||
chnl->flags &= ~ESXCHAN_PENDING; /* mark it as established */
|
||||
/* remove channel from the search tree */
|
||||
pthread_rwlock_wrlock(&(chnl->connection->chnl_lock));
|
||||
usrtc_delete(chnl->connection->chnl_tree, &chnl->node);
|
||||
/* free index */
|
||||
idx_free(co->idx_ch, chnl->cid);
|
||||
pthread_rwlock_unlock(&(chnl->connection->chnl_lock));
|
||||
|
||||
pthread_rwlock_unlock(&(chnl->msglock));
|
||||
|
||||
__destroy_msg(chnl->sysmsg);
|
||||
free(uuid_);
|
||||
|
||||
free(pl->cstr);
|
||||
free(pl);
|
||||
free(chnl->uuid);
|
||||
idx_allocator_destroy(chnl->idx_msg);
|
||||
free(chnl->idx_msg);
|
||||
free(chnl->msgs_tree);
|
||||
pthread_mutex_destroy(&(chnl->oplock));
|
||||
pthread_rwlock_destroy(&(chnl->msglock));
|
||||
|
||||
free(chnl);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
Loading…
Reference in New Issue