You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
libsxmp/lib/connection.c

2478 lines
66 KiB
C

/*
* Secure Network Transport Layer Library v2 implementation.
* (sntllv2) it superseed all versions before due to the:
* - memory consumption
* - new features such as pulse emitting
* - performance optimization
*
* This is a proprietary software. See COPYING for further details.
*
* (c) Askele Group 2013-2015 <http://askele.com>
*
*/
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <fcntl.h>
#ifdef WIN32
#include <Winsock2.h>
#define EBADE 1
#define NETDB_SUCCESS 0
#else
#include <sys/select.h>
#include <netdb.h>
#include <unistd.h>
#include <uuid/uuid.h>
#endif
#include <openssl/ssl.h>
#include <openssl/err.h>
#include <tdata/usrtc.h>
#include <sexpr/sexp.h>
#include <sntl/connection.h>
struct __rpc_job
{
sxmsg_t *msg;
sexp_t *sx;
int (*rpcf) (void *, sexp_t *);
};
conn_sys_t *conn_sys = NULL;
static int ex_ssldata_index; /** < index used to work with additional data
* provided to the special call during SSL handshake */
int sntl_init(void)
{
/* init SSL library */
SSL_library_init();
OpenSSL_add_all_algorithms();
SSL_load_error_strings();
ex_ssldata_index = SSL_get_ex_new_index(0, "__ssldata index", NULL, NULL, NULL);
return 0;
}
static long __cmp_ulong(const void *a, const void *b);
/* message alloc and destroy */
extern sxmsg_t *__allocate_msg(int *res);
extern void __destroy_msg(sxmsg_t *msg);
/* connections */
static void __connections_subsystem_connection_remove(conn_t *);
static void __connection_free(conn_t *co);
/* examination */
static inline int __exam_connection(conn_t *co)
{
int r = 0;
pthread_mutex_lock(&co->oplock);
if(co->flags | CXCONN_BROKEN) {
/* wake up all */
/* destroy thread poll */
/* free all memory and sync primitives */
r = 1;
}
pthread_mutex_unlock(&co->oplock);
return r;
}
static int __rpc_callback(void *data)
{
struct __rpc_job *job = (struct __rpc_job *)data;
int rc = 0;
rc = job->rpcf((void *)(job->msg), job->sx);
free(job);
return rc;
}
extern int __alloc_channel(ulong_t cid, conn_t *co, rpc_typed_list_t *rlist,
chnl_t **channel);
static int __conn_read(conn_t *co, void *buf, size_t buf_len)
{
int rfd = SSL_get_fd(co->ssl), r;
fd_set readset, writeset;
int ofcmode, read_blocked = 0, read_blocked_on_write = 0;
/* First we make the socket nonblocking */
#ifndef WIN32
ofcmode = fcntl(rfd, F_GETFL,0);
ofcmode |= O_NDELAY;
if(fcntl(rfd, F_SETFL, ofcmode))
fprintf(stderr, "Couldn't make socket nonblocking");
#endif
__retry:
do {
__try_again:
r = SSL_read(co->ssl, buf, (int)buf_len);
switch(SSL_get_error (co->ssl, r)) {
case SSL_ERROR_NONE:
return r;
break;
case SSL_ERROR_WANT_READ:
/* get prepare to select */
read_blocked = 1;
break;
case SSL_ERROR_WANT_WRITE: /* here we blocked on write */
read_blocked_on_write = 1;
break;
case SSL_ERROR_SYSCALL:
if(errno == EAGAIN || errno == EINTR) goto __try_again;
else {
fprintf(stderr, "SSL syscall error.\n");
goto __close_conn;
}
break;
case SSL_ERROR_WANT_CONNECT:
case SSL_ERROR_WANT_ACCEPT:
fprintf(stderr, "SSL negotiation required. Trying again.\n");
goto __try_again;
break;
case SSL_ERROR_SSL:
fprintf(stderr, "SSL error occured. Connection will be closed.\n");
goto __close_conn;
break;
case SSL_ERROR_ZERO_RETURN:
fprintf(stderr, "SSL connection is cleary closed.\n");
default:
__close_conn:
fprintf(stderr, "(RD)Unknown error on %s (errno = %d)\n", co->uuid, errno);
return -1;
}
} while(SSL_pending(co->ssl) && !read_blocked);
__select_retry:
if(read_blocked) {
FD_ZERO(&readset);
FD_SET(rfd, &readset);
/* waits until something will be ready to read */
r = select(rfd + 1, &readset, NULL, NULL, NULL);
if(r < 0) {
if(errno == EINTR || errno == EAGAIN) goto __select_retry;
printf("(RD) select (%d) on %s\n", errno, co->uuid);
return -1;
}
if(!r) {
printf("Nothing to wait for\n");
return 0;
}
read_blocked = 0;
if(r && FD_ISSET(rfd, &readset)) goto __retry; /* try to read again */
}
if(read_blocked_on_write) { /* we was blocked on write */
FD_ZERO(&readset);
FD_ZERO(&writeset);
FD_SET(rfd, &readset);
FD_SET(rfd, &writeset);
r = select(rfd + 1, &readset, &writeset, NULL, NULL);
read_blocked_on_write = 0;
if(r && FD_ISSET(rfd, &writeset)) goto __retry;
}
return 0;
}
static int __conn_write(conn_t *co, void *buf, size_t buf_len)
{
int r, rfd = SSL_get_fd(co->ssl);
fd_set writeset;
pthread_mutex_lock(&(co->oplock));
__retry:
r = SSL_write(co->ssl, buf, (int)buf_len);
switch(SSL_get_error(co->ssl, r)) {
case SSL_ERROR_WANT_READ:
case SSL_ERROR_WANT_WRITE:
/* here we should block */
FD_ZERO(&writeset);
FD_SET(rfd, &writeset);
r = select(rfd + 1, NULL, &writeset, NULL, NULL);
if(r && FD_ISSET(rfd, &writeset)) goto __retry;
break;
case SSL_ERROR_SYSCALL:
if(errno == EAGAIN || errno == EINTR) goto __retry;
else goto __close_conn;
break;
default:
pthread_mutex_unlock(&(co->oplock));
__close_conn:
if(r < 0) {
fprintf(stderr, "(WR)Unknown error on %s (%d)\n", co->uuid, r);
return -1;
} else return 0;
}
pthread_mutex_unlock(&(co->oplock));
return 0;
}
static long __cmp_cstr(const void *a, const void *b)
{
return strcmp((char *)a, (char *)b);
}
static long __cmp_int(const void *a, const void *b)
{
return *(int *)a - *(int *)b;
}
static long __cmp_ulong(const void *a, const void *b)
{
return *(ulong_t *)a - *(ulong_t *)b;
}
extern int __resolvehost(const char *hostname, char *buf, int buf_len,
struct hostent **rhp);
static void __destroy_rpc_list_tree(usrtc_t *tree)
{
usrtc_node_t *node;
cx_rpc_t *ent;
for(node = usrtc_first(tree); node != NULL; node = usrtc_first(tree)) {
ent = (cx_rpc_t *)usrtc_node_getdata(node);
usrtc_delete(tree, node);
free(ent->name);
free(ent);
}
return;
}
static int __insert_rpc_function(usrtc_t *tree, const char *name, int (*rpcf)(void *, sexp_t *))
{
cx_rpc_t *ent = malloc(sizeof(cx_rpc_t));
usrtc_node_t *node;
if(!ent) return ENOMEM;
else node = &ent->node;
if(!(ent->name = strdup(name))) {
free(ent);
return ENOMEM;
} else ent->rpcf = rpcf;
usrtc_node_init(node, ent);
usrtc_insert(tree, node, ent->name);
return 0;
}
/* wake up all waiters on messages with given opcode */
static void __wake_up_waiters(conn_t *co, int opcode)
{
usrtc_node_t *node = NULL, *last_node = NULL;
usrtc_node_t *msg_node = NULL, *last_msg_node = NULL;
chnl_t *ch;
sxmsg_t *smsg = NULL;
pthread_rwlock_wrlock(&(co->chnl_lock));
if(!co->chnl_tree) goto __skip;
node = usrtc_first(co->chnl_tree);
last_node = usrtc_last(co->chnl_tree);
/* going through channels tree */
while(!usrtc_isempty(co->chnl_tree)) {
ch = (chnl_t *)usrtc_node_getdata(node);
pthread_rwlock_rdlock(&(ch->msglock));
msg_node = usrtc_first(ch->msgs_tree);
last_msg_node = usrtc_last(ch->msgs_tree);
while(!usrtc_isempty(ch->msgs_tree)) { /* messages bypassing */
smsg = (sxmsg_t *)usrtc_node_getdata(msg_node);
smsg->opcode = opcode;
/* wake up waiting thread */
pthread_mutex_unlock(&(smsg->wait));
if(msg_node == last_msg_node) break;
msg_node = usrtc_next(ch->msgs_tree, msg_node);
}
pthread_rwlock_unlock(&(ch->msglock));
if(node == last_node) break;
node = usrtc_next(co->chnl_tree, node);
}
__skip:
pthread_rwlock_unlock(&(co->chnl_lock));
return;
}
/* (!) NOTE: this call use only after all threads are dead ! */
static void __destroy_all_channels(conn_t *co)
{
usrtc_node_t *node = NULL;
chnl_t *ch;
for(node = usrtc_first(co->chnl_tree); node != NULL; node =
usrtc_first(co->chnl_tree)) {
ch = (chnl_t *)usrtc_node_getdata(node);
/* free allocated resources */
if(ch->uuid) free(ch->uuid);
idx_allocator_destroy(ch->idx_msg); /* allocator */
free(ch->idx_msg);
free(ch->msgs_tree);
/* locks */
pthread_mutex_destroy(&(ch->oplock));
pthread_rwlock_destroy(&(ch->msglock));
/* remove it */
usrtc_delete(co->chnl_tree, node);
/* free */
free(ch);
}
return;
}
static int __default_auth_set_context(void *cctx, sexp_t *sx)
{
conn_t *co = (conn_t *)cctx;
conn_sys_t *ssys = co->ssys;
char *val, *var, *tbuf = NULL;
sexp_t *lsx, *sx_iter, *sx_in;
int llen, idx, err = 0;
//co->pctx = malloc(sizeof(perm_ctx_t));
/* skip keyword itself */
lsx = sx->list->next;
/* now we expect a list of lists */
if(lsx->ty != SEXP_LIST) {
err = ESXRCBADPROT;
goto __reply;
}
/* take length of the list */
llen = sexp_list_length(lsx);
if(!llen) return 0; /* other side will not set any security attributes */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
sexp_list_car(sx_iter, &sx_in);
if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) {
err = ESXRCBADPROT;
goto __reply;
} else val = sx_in->val;
if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */
sexp_list_cdr(sx_iter, &sx_in);
if(!SEXP_IS_TYPE(sx_in, SEXP_DQUOTE)) {
err = ESXRCBADPROT;
goto __reply;
} else var = sx_in->val;
/* ok, now we need to analyze parameters */
if(!strcmp(val, ":user") && var) {
co->pctx->login = strdup(var);
} else if(!strcmp(val, ":passwd") && var) {
co->pctx->passwd = strdup(var);
} else {
/* just ignore in default implementation */
}
} else continue; /* ignore */
}
/* ok, now we need to fill security context */
tbuf = malloc(2048);
if(!tbuf) {
err = ENOMEM;
goto __reply;
}
if(ssys->secure_check)
err = ssys->secure_check(co);
__reply:
if(err) {
snprintf(tbuf, 2048, "(auth-set-error (%d))", err);
} else {
snprintf(tbuf, 2048, "(auth-set-attr (:attr %d)(:uid %ld)(:gid %ld))",
co->pctx->p_attr, co->pctx->uid, co->pctx->gid);
}
/* we will send it */
if(__conn_write(co, tbuf, strlen(tbuf)) < 0) {
co->flags &= ~CXCONN_ESTABL;
co->flags |= CXCONN_BROKEN;
__wake_up_waiters(co, ESXNOCONNECT);
}
destroy_sexp(sx);
free(tbuf);
return err;
}
static int __default_auth_set_attr(void *cctx, sexp_t *sx)
{
conn_t *co = (conn_t *)cctx;
char *val, *var;
sexp_t *lsx, *sx_iter, *sx_in;
int llen, idx, r = 0;
/* skip keyword itself */
lsx = sx->list->next;
/* now we expect a list of lists */
if(lsx->ty != SEXP_LIST) {
r = ESXRCBADPROT;
goto __finish;
}
/* take length of the list */
llen = sexp_list_length(lsx);
if(!llen) return 0; /* other side will not set any security attributes */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
sexp_list_car(sx_iter, &sx_in);
if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) {
r = ESXRCBADPROT;
goto __finish;
} else val = sx_in->val;
if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */
sexp_list_cdr(sx_iter, &sx_in);
if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) {
r = ESXRCBADPROT;
goto __finish;
} else var = sx_in->val;
/* ok, now we need to analyze parameters */
if(!strcmp(val, ":attr")) {
co->pctx->p_attr = atoi(var);
} else if(!strcmp(val, ":uid")) {
co->pctx->uid = (ulong_t)atoll(var);
} else if(!strcmp(val, ":gid")) {
co->pctx->gid = (ulong_t)atoll(var);
} else {
/* just ignore in default implementation */
}
} else continue; /* ignore */
}
__finish:
destroy_sexp(sx);
return r;
}
static int __default_auth_set_error(void *cctx, sexp_t *sx)
{
char *errstr = NULL;
int r;
/* skip keyword itself */
sx->list = sx->list->next;
/* be sure - this is a list */
if(sx->ty != SEXP_LIST) return ESXRCBADPROT;
else sx = sx->list; /* get it */
errstr = sx->list->val;
r = atoi(errstr);
destroy_sexp(sx);
return r;
}
static int __default_ch_get_types(void *cctx, sexp_t *sx)
{
conn_t *co = (conn_t *)cctx;
conn_sys_t *ssys = co->ssys;
usrtc_node_t *node;
rpc_typed_list_t *list_ent;
char *tbuf = malloc(4096), *tt;
int err = 0;
/* if we cannot allocate anything ... */
if(!tbuf) return ENOMEM;
/* ok here we go */
co->rpc_list = ssys->get_rpc_typed_list_tree(co);
/* ok, here we're don't need to parse anything */
if(!usrtc_count(co->rpc_list)) {
err = ENXIO;
snprintf(tbuf, 4096, "(ch-gl-error (%d))", err);
} else {
tt = tbuf;
snprintf(tt, 4096, "(ch-set-types (");
tt += strlen(tt);
for(node = usrtc_first(co->rpc_list); node != NULL;
node = usrtc_next(co->rpc_list, node), tt += strlen(tt)) {
list_ent = (rpc_typed_list_t *)usrtc_node_getdata(node);
snprintf(tt, 4096, "(:%d \"%s\")", list_ent->type_id, list_ent->description);
}
snprintf(tt, 4096, "))");
}
/* reply to this rpc */
if(__conn_write(co, tbuf, strlen(tbuf)) < 0) {
co->flags &= ~CXCONN_ESTABL;
co->flags |= CXCONN_BROKEN;
__wake_up_waiters(co, ESXNOCONNECT);
}
free(tbuf);
destroy_sexp(sx);
return err;
}
static int __default_ch_set_types(void *cctx, sexp_t *sx)
{
conn_t *co = (conn_t *)cctx;
conn_sys_t *ssys = co->ssys;
char buf[1024], *val, *var;
int r = 0, llen, typeid, idx;
sexp_t *lsx, *sx_iter, *sx_in;
/* skip keyword itself */
lsx = sx->list->next;
/* now we expect a list of lists */
if(lsx->ty != SEXP_LIST) {
r = ESXRCBADPROT;
goto __send_reply;
}
/* take length of the list */
llen = sexp_list_length(lsx);
if(!llen) return 0; /* other side will not set any security attributes */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
sexp_list_car(sx_iter, &sx_in);
if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) {
r = ESXRCBADPROT;
goto __send_reply;
} else val = sx_in->val;
if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */
sexp_list_cdr(sx_iter, &sx_in);
if(!SEXP_IS_TYPE(sx_in, SEXP_DQUOTE)) {
r = ESXRCBADPROT;
goto __send_reply;
} else var = sx_in->val;
/* ok, now we need to analyze parameters */
if(*val != ':') {
r = ESXRCBADPROT;
goto __send_reply;
} else {
if(ssys->set_typed_list_callback) {
typeid = atoi((char *)(val + sizeof(char)));
if(ssys->set_typed_list_callback(co, typeid, var)) {
destroy_sexp(sx);
return ENXIO;
}
} /* FIXME: if no function, accept or decline ? */
}
} else continue; /* ignore */
}
__send_reply:
snprintf(buf, 1024, "(ch-gl-error (%d))", r);
if(__conn_write(co, buf, strlen(buf)) < 0) {
co->flags &= ~CXCONN_ESTABL;
co->flags |= CXCONN_BROKEN;
__wake_up_waiters(co, ESXNOCONNECT);
}
destroy_sexp(sx);
return r;
}
static int __default_ch_gl_error(void *cctx, sexp_t *sx)
{
int r;
char *errstr;
conn_t *co = (conn_t *)cctx;
if(co->flags & CXCONN_ESTABL) return EINVAL; /* error, we're already have channels list */
/* skip keyword itself */
sx->list = sx->list->next;
/* be sure - this is a list */
if(sx->ty != SEXP_LIST) return ESXRCBADPROT;
else sx = sx->list; /* get it */
errstr = sx->list->val;
r = atoi(errstr);
if(!r) co->flags |= CXCONN_ESTABL;
return r;
}
static int __default_ch_open(void *cctx, sexp_t *sx)
{
conn_t *co = (conn_t *)cctx;
usrtc_node_t *node;
char *val, *var, *uuid = NULL, *buf;
int typ = -1, idx, llen, r;
ulong_t cid;
sexp_t *lsx, *sx_iter, *sx_in;
rpc_typed_list_t *rlist;
chnl_t *channel;
/* skip keyword itself */
lsx = sx->list->next;
/* now we expect a list of lists */
if(lsx->ty != SEXP_LIST) {
printf("%s:%d\n", __FUNCTION__, __LINE__);
r = ESXRCBADPROT;
goto __send_repl;
}
/* take length of the list */
llen = sexp_list_length(lsx);
if(!llen) return 0; /* other side will not set any security attributes */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
sexp_list_car(sx_iter, &sx_in);
if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) {
printf("%s:%d\n", __FUNCTION__, __LINE__);
r = ESXRCBADPROT;
goto __send_repl;
} else val = sx_in->val;
if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */
sexp_list_cdr(sx_iter, &sx_in);
if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) {
r = ESXRCBADPROT;
printf("%s:%d\n", __FUNCTION__, __LINE__);
goto __send_repl;
} else var = sx_in->val;
/* ok, now we need to analyze parameters */
if(*val != ':') {
r = ESXRCBADPROT;
goto __send_repl;
} else {
if(!strcmp((char *)(val + sizeof(char)), "type"))
typ = atoi(var);
else if(!strcmp((char *)(val + sizeof(char)), "id"))
cid = atoll(var);
else if(!strcmp((char *)(val + sizeof(char)), "uuid"))
uuid = var;
}
} else continue; /* ignore */
}
/* additional check for type of the channel */
node = usrtc_lookup(co->rpc_list, &typ);
if(!node) {
r = ESXNOCHANSUP;
/* printf("%s:%d (usrtc count: %d) (typ %d)\n", __FUNCTION__, __LINE__,
usrtc_count(co->rpc_list), typ);*/
node = usrtc_first(co->rpc_list);
rlist = (rpc_typed_list_t *)usrtc_node_getdata(node);
printf("---- rlist->type_id = %d\n", rlist->type_id);
goto __send_repl;
} else rlist = (rpc_typed_list_t *)usrtc_node_getdata(node);
/* now we need to check up the channel */
pthread_rwlock_rdlock(&(co->chnl_lock));
node = usrtc_lookup(co->chnl_tree, &cid);
if(node) {
pthread_rwlock_unlock(&(co->chnl_lock));
r = EEXIST;
goto __send_repl;
} else {
idx_reserve(co->idx_ch, cid);
pthread_rwlock_unlock(&(co->chnl_lock)); /* now we should alloc channel */
if((r = __alloc_channel(cid, co, rlist, &channel))) {
pthread_rwlock_wrlock(&(co->chnl_lock));
idx_free(co->idx_ch, cid);
pthread_rwlock_unlock(&(co->chnl_lock));
goto __send_repl;
} else {
/* now we ready to confirm channel creation */
pthread_rwlock_wrlock(&(co->chnl_lock));
usrtc_insert(co->chnl_tree, &(channel->node), &(channel->cid));
pthread_rwlock_unlock(&(co->chnl_lock));
r = 0;
}
}
__send_repl:
buf = malloc(2048);
snprintf(buf, 2048, "(ch-open-ret ((:error %d)(:uuid %s)(:id %ld)))", r,
uuid, cid);
if(__conn_write(co, buf, strlen(buf)) < 0) {
co->flags &= ~CXCONN_ESTABL;
co->flags |= CXCONN_BROKEN;
__wake_up_waiters(co, ESXNOCONNECT);
}
destroy_sexp(sx);
free(buf);
return r;
}
static int __default_ch_open_ret(void *cctx, sexp_t *sx)
{
conn_t *co = (conn_t *)cctx;
chnl_t *chan;
usrtc_node_t *node;
int err = 0, r, llen, idx;
ulong_t id;
char *val, *var;
sexp_t *lsx, *sx_iter, *sx_in;
sxmsg_t *sms = NULL;
/* skip keyword itself */
lsx = sx->list->next;
/* now we expect a list of lists */
if(lsx->ty != SEXP_LIST) {
//printf("%s:%d\n", __FUNCTION__, __LINE__);
r = ESXRCBADPROT;
goto __mark_msg;
}
/* take length of the list */
llen = sexp_list_length(lsx);
if(!llen) return EINVAL; /* !! other side will not set any security attributes */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
sexp_list_car(sx_iter, &sx_in);
if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) {
r = ESXRCBADPROT;
goto __mark_msg;
} else val = sx_in->val;
if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */
sexp_list_cdr(sx_iter, &sx_in);
if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) {
r = ESXRCBADPROT;
goto __mark_msg;
} else var = sx_in->val;
/* ok, now we need to analyze parameters */
if(*val != ':') {
r = ESXRCBADPROT;
goto __mark_msg;
} else {
if(!strcmp((char *)(val + sizeof(char)), "error"))
err = atoi(var);
else if(!strcmp((char *)(val + sizeof(char)), "id"))
id = atoll(var);
}
} else continue; /* ignore */
}
/* try to find desired channel to intercept message */
pthread_rwlock_rdlock(&(co->chnl_lock));
node = usrtc_lookup(co->chnl_tree, (void *)&id);
//printf("channels (%d)\n", usrtc_count(co->chnl_tree));
pthread_rwlock_unlock(&(co->chnl_lock));
if(node) {
chan = (chnl_t *)usrtc_node_getdata(node);
sms = chan->sysmsg;
}
__mark_msg:
if(!sms) return r;
sms->flags &= ~ESXMSG_PENDING; /* the message is done */
sms->opcode = err;
destroy_sexp(sx);
/* unlock mutex to wake up the waiting thread */
pthread_mutex_unlock(&(sms->wait));
return 0;
}
static int __default_ch_close(void *cctx, sexp_t *sx)
{
conn_t *co = (conn_t *)cctx;
usrtc_node_t *node;
char *val, *var, *buf;
int idx, llen, r;
ulong_t cid = -1;
sexp_t *lsx, *sx_iter, *sx_in;
chnl_t *channel = NULL;
r = 0;
/* skip keyword itself */
lsx = sx->list->next;
/* now we expect a list of lists */
if(lsx->ty != SEXP_LIST) {
printf("%s:%d\n", __FUNCTION__, __LINE__);
r = ESXRCBADPROT;
goto __send_repl;
}
/* take length of the list */
llen = sexp_list_length(lsx);
if(!llen) return 0; /* other side will not set any security attributes */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
continue;
}
if(!SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) {
printf("%s:%d\n", __FUNCTION__, __LINE__);
r = ESXRCBADPROT;
goto __send_repl;
} else val = sx_iter->val;
sx_in = sx_iter->next;
if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) {
r = ESXRCBADPROT;
printf("%s:%d\n", __FUNCTION__, __LINE__);
goto __send_repl;
} else var = sx_in->val;
/* ok, now we need to analyze parameters */
if(*val != ':') {
r = ESXRCBADPROT;
goto __send_repl;
} else {
if(!strcmp((char *)(val + sizeof(char)), "id")) {
cid = atoll(var);
break;
}
}
}
/* additional check for type of the channel */
pthread_rwlock_rdlock(&(co->chnl_lock));
node = usrtc_lookup(co->chnl_tree, &cid);
pthread_rwlock_unlock(&(co->chnl_lock));
if(!node) {
r = ENOENT;
/* there are no such channel exist */
destroy_sexp(sx);
goto __send_repl;
}
channel = (chnl_t *)usrtc_node_getdata(node);
/* check up the message queue */
pthread_rwlock_rdlock(&(channel->msglock));
if(usrtc_count(channel->msgs_tree)) {
/* we have some undelivered messages in the queue */
destroy_sexp(sx);
r = EBUSY;
goto __send_repl;
}
pthread_rwlock_unlock(&(channel->msglock));
/* remove channel from the search tree */
pthread_rwlock_wrlock(&(co->chnl_lock));
usrtc_delete(co->chnl_tree, &(channel->node));
/* free index */
idx_free(co->idx_ch, channel->cid);
pthread_rwlock_unlock(&(co->chnl_lock));
idx_allocator_destroy(channel->idx_msg);
free(channel->idx_msg);
free(channel->msgs_tree);
pthread_mutex_destroy(&(channel->oplock));
pthread_rwlock_destroy(&(channel->msglock));
free(channel);
destroy_sexp(sx);
__send_repl:
buf = malloc(2048);
snprintf(buf, 2048, "(ch-close-ret ((:id %ld) (:error %d)))", cid, r);
if(__conn_write(co, buf, strlen(buf)) < 0) {
co->flags &= ~CXCONN_ESTABL;
co->flags |= CXCONN_BROKEN;
__wake_up_waiters(co, ESXNOCONNECT);
}
free(buf);
return 0;
}
static int __default_ch_close_ret(void *cctx, sexp_t *sx)
{
conn_t *co = (conn_t *)cctx;
chnl_t *chan;
usrtc_node_t *node;
int err = 0, r, llen, idx;
ulong_t id;
char *val, *var;
sexp_t *lsx, *sx_iter, *sx_in;
sxmsg_t *sms = NULL;
/* skip keyword itself */
lsx = sx->list->next;
/* now we expect a list of lists */
if(lsx->ty != SEXP_LIST) {
r = ESXRCBADPROT;
goto __mark_msg;
}
/* take length of the list */
llen = sexp_list_length(lsx);
if(!llen) return EINVAL; /* !! other side will not set any security attributes */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
sexp_list_car(sx_iter, &sx_in);
if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) {
r = ESXRCBADPROT;
goto __mark_msg;
} else val = sx_in->val;
if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */
sexp_list_cdr(sx_iter, &sx_in);
if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) {
r = ESXRCBADPROT;
goto __mark_msg;
} else var = sx_in->val;
/* ok, now we need to analyze parameters */
if(*val != ':') {
r = ESXRCBADPROT;
goto __mark_msg;
} else {
if(!strcmp((char *)(val + sizeof(char)), "error"))
err = atoi(var);
else if(!strcmp((char *)(val + sizeof(char)), "id"))
id = atoll(var);
}
} else continue; /* ignore */
}
/* try to find desired channel to intercept message */
pthread_rwlock_rdlock(&(co->chnl_lock));
node = usrtc_lookup(co->chnl_tree, (void *)&id);
pthread_rwlock_unlock(&(co->chnl_lock));
if(node) {
chan = (chnl_t *)usrtc_node_getdata(node);
sms = chan->sysmsg;
}
__mark_msg:
if(!sms) return r;
sms->flags &= ~ESXMSG_PENDING; /* the message is done */
sms->opcode = err;
destroy_sexp(sx);
/* unlock mutex to wake up the waiting thread */
pthread_mutex_unlock(&(sms->wait));
return 0;
}
/* create a nould of the message */
static int __create_reg_msg_mould(sxmsg_t **msg, chnl_t *ch, ulong_t mid)
{
int r = 0;
sxmsg_t *sm = __allocate_msg(&r);
if(r) return r;
else {
sm->pch = ch;
sm->flags = (ESXMSG_USR | ESXMSG_PENDING);
sm->mid = mid;
/* ok reserve message ID */
pthread_mutex_lock(&(ch->oplock));
idx_reserve(ch->idx_msg, mid);
pthread_mutex_unlock(&(ch->oplock));
pthread_mutex_lock(&(sm->wait));
*msg = sm;
}
return 0;
}
static int __default_msg(void *cctx, sexp_t *sx)
{
conn_t *co = (conn_t *)cctx;
usrtc_node_t *node = NULL;
chnl_t *chan = NULL;
int r = 0;
sexp_t *lsx = NULL, *sx_iter = NULL;
sexp_t *sx_sublist = NULL, *sx_value = NULL;
ulong_t chnl_id = -1;
ulong_t msg_id = -1;
sexp_t *msg = NULL;
sxmsg_t *smsg = NULL;
int idx;
/* get parameters from the message */
if(sexp_list_cdr(sx, &lsx)) return ESXRCBADPROT;
if(!SEXP_IS_LIST(lsx)) return ESXRCBADPROT;
/* find channel id */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
sx_sublist = sx_iter;
continue;
} else {
if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) {
if(strcmp(sx_iter->val, ":chid")) {
continue; // ignore it
}
sx_value = sx_iter->next;
if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) {
continue;
}
chnl_id = atol(sx_value->val);
} else continue; // ignore it
}
}
lsx = sx_sublist;
/* find message id */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
msg = sx_iter;
continue;
} else {
if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) {
if(strcmp(sx_iter->val, ":msgid")) {
continue; // ignore
}
sx_value = sx_iter->next;
if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) {
continue;
}
msg_id = atol(sx_value->val);
} else continue; // ignore it
}
}
if(msg_id < 0 || chnl_id < 0) {
return ESXRCBADPROT;
}
/* find channel */
if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT;
else chan = (chnl_t *)usrtc_node_getdata(node);
/* lookup for the message */
pthread_rwlock_rdlock(&(chan->msglock));
if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) {
pthread_rwlock_unlock(&(chan->msglock));
/* here, rpc lookup has no sense, just put this job to the queue */
/* btw, we're need to create a message first */
r = __create_reg_msg_mould(&smsg, chan, msg_id);
if(r) return r;
/* assign the message */
smsg->opcode = 0;
smsg->payload = (void *)msg;
/* assign initial S-expression structure */
smsg->initial_sx = sx;
/* put the message to the search tree */
pthread_rwlock_wrlock(&(chan->msglock));
usrtc_insert(chan->msgs_tree, &(smsg->pendingq_node), &(smsg->mid));
pthread_rwlock_unlock(&(chan->msglock));
} else {
pthread_rwlock_unlock(&(chan->msglock));
smsg = (sxmsg_t *)usrtc_node_getdata(node);
msg_return(smsg, EEXIST);
return EEXIST;
}
/* put job to the queue and give up */
r = pth_queue_add(co->rqueue, (void *)smsg, USR_MSG);
if(r) { /* cannot put job to the queue */
msg_return(smsg, r);
pthread_rwlock_wrlock(&(chan->msglock));
usrtc_delete(chan->msgs_tree, &(smsg->pendingq_node));
pthread_rwlock_unlock(&(chan->msglock));
__destroy_msg(smsg);
return r;
}
/* put to the IN queue */
return r;
}
/* TODO: optimize amount of code: (must be first)
* __default_msg_return
* __default_msg_reply
* there are many copy-n-paste code!!!!!
*/
static int __default_msg_return(void *cctx, sexp_t *sx)
{
conn_t *co = (conn_t *)cctx;
usrtc_node_t *node = NULL;
chnl_t *chan = NULL;
int r = 0;
sexp_t *lsx = NULL, *sx_iter = NULL;
sexp_t *sx_sublist = NULL, *sx_value = NULL;
ulong_t chnl_id = -1;
ulong_t msg_id = -1;
sexp_t *msg = NULL;
sxmsg_t *smsg = NULL;
int idx, opcode;
/* get parameters from the message */
if(sexp_list_cdr(sx, &lsx)) {
r = ESXRCBADPROT;
goto __finish;
}
if(!SEXP_IS_LIST(lsx)) {
r = ESXRCBADPROT;
goto __finish;
}
/* get parameters */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
sx_sublist = sx_iter;
continue;
} else {
if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) {
if(strcmp(sx_iter->val, ":chid")) {
continue; // ignore it
}
sx_value = sx_iter->next;
if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) {
continue;
}
chnl_id = atol(sx_value->val);
} else continue; // ignore it
}
}
lsx = sx_sublist;
/* find message id */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
msg = sx_iter;
continue;
} else {
if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) {
if(strcmp(sx_iter->val, ":msgid")) {
continue; // ignore
}
sx_value = sx_iter->next;
if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) {
continue;
}
msg_id = atol(sx_value->val);
} else continue; // ignore it
}
}
/* get opcode */
sexp_list_car(msg, &lsx);
opcode = atoi(lsx->val);
if(msg_id < 0 || chnl_id < 0) {
r = ESXRCBADPROT;
goto __finish;
}
if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT;
else chan = (chnl_t *)usrtc_node_getdata(node);
/* lookup for the message */
pthread_rwlock_rdlock(&(chan->msglock));
if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) {
pthread_rwlock_unlock(&(chan->msglock));
r = ENOENT;
goto __finish;
} else {
pthread_rwlock_unlock(&(chan->msglock));
smsg = (sxmsg_t *)usrtc_node_getdata(node);
smsg->opcode = opcode;
if(smsg->flags & ESXMSG_ISREPLY)
destroy_sexp((sexp_t *)smsg->payload);
smsg->payload = NULL;
smsg->flags |= ESXMSG_CLOSURE;
/* Q: can we remove the message from the tree there??? */
/* A: actually no */
/* first remove the message from tree */
if(smsg->flags & ESXMSG_RMONRETR) {
pthread_rwlock_wrlock(&(chan->msglock));
usrtc_delete(chan->msgs_tree, &(smsg->pendingq_node));
pthread_rwlock_unlock(&(chan->msglock));
}
if(smsg->flags & ESXMSG_PENDING) {
destroy_sexp(sx);
pthread_mutex_unlock(&(smsg->wait));
return r;
} else {
/* nobody want it */
destroy_sexp(smsg->initial_sx);
__destroy_msg(smsg);
}
}
__finish:
destroy_sexp(sx);
return r;
}
static int __default_msg_rapid(void *cctx, sexp_t *sx)
{
conn_t *co = (conn_t *)cctx;
usrtc_node_t *node = NULL;
chnl_t *chan = NULL;
int r = 0;
sexp_t *lsx = NULL, *sx_iter = NULL;
sexp_t *sx_sublist = NULL, *sx_value = NULL;
ulong_t chnl_id = -1;
ulong_t msg_id = -1;
sexp_t *msg = NULL;
sxmsg_t *smsg = NULL;
int idx;
/* get parameters from the message */
if(sexp_list_cdr(sx, &lsx)) {
r = ESXRCBADPROT;
goto __finish;
}
if(!SEXP_IS_LIST(lsx)) {
r = ESXRCBADPROT;
goto __finish;
}
/* get parameters */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
sx_sublist = sx_iter;
continue;
} else {
if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) {
if(strcmp(sx_iter->val, ":chid")) {
continue; // ignore it
}
sx_value = sx_iter->next;
if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) {
continue;
}
chnl_id = atol(sx_value->val);
} else continue; // ignore it
}
}
lsx = sx_sublist;
/* find message id */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
msg = sx_iter;
continue;
} else {
if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) {
if(strcmp(sx_iter->val, ":msgid")) {
continue; // ignore
}
sx_value = sx_iter->next;
if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) {
continue;
}
msg_id = atol(sx_value->val);
} else continue; // ignore it
}
}
if(msg_id < 0 || chnl_id < 0) {
r = ESXRCBADPROT;
goto __finish;
}
if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT;
else chan = (chnl_t *)usrtc_node_getdata(node);
/* lookup for the message */
pthread_rwlock_rdlock(&(chan->msglock));
if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) {
pthread_rwlock_unlock(&(chan->msglock));
r = ENOENT;
goto __finish;
} else {
pthread_rwlock_unlock(&(chan->msglock));
smsg = (sxmsg_t *)usrtc_node_getdata(node);
smsg->opcode = ESXRAPIDREPLY;
smsg->payload = copy_sexp(msg);
smsg->flags |= ESXMSG_ISREPLY;
/* are we'are ready to remove this ? */
if(smsg->flags & ESXMSG_RMONRETR) {
pthread_rwlock_wrlock(&(chan->msglock));
usrtc_delete(chan->msgs_tree, &(smsg->pendingq_node));
pthread_rwlock_unlock(&(chan->msglock));
}
if(smsg->flags & ESXMSG_PENDING) {
destroy_sexp(sx);
pthread_mutex_unlock(&(smsg->wait));
return r;
} else {
/* nobody want it */
destroy_sexp(smsg->initial_sx);
__destroy_msg(smsg);
}
}
__finish:
destroy_sexp(sx);
return r;
}
static int __default_msg_reply(void *cctx, sexp_t *sx)
{
conn_t *co = (conn_t *)cctx;
usrtc_node_t *node = NULL;
chnl_t *chan = NULL;
int r = 0;
sexp_t *lsx = NULL, *sx_iter = NULL;
sexp_t *sx_sublist = NULL, *sx_value = NULL;
ulong_t chnl_id = -1;
ulong_t msg_id = -1;
sexp_t *msg = NULL;
sxmsg_t *smsg = NULL;
int idx;
/* get parameters from the message */
if(sexp_list_cdr(sx, &lsx)) {
r = ESXRCBADPROT;
goto __finish;
}
if(!SEXP_IS_LIST(lsx)) {
r = ESXRCBADPROT;
goto __finish;
}
/* get parameters */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
sx_sublist = sx_iter;
continue;
} else {
if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) {
if(strcmp(sx_iter->val, ":chid")) {
continue; // ignore it
}
sx_value = sx_iter->next;
if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) {
continue;
}
chnl_id = atol(sx_value->val);
} else continue; // ignore it
}
}
lsx = sx_sublist;
/* find message id */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
msg = sx_iter;
continue;
} else {
if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) {
if(strcmp(sx_iter->val, ":msgid")) {
continue; // ignore
}
sx_value = sx_iter->next;
if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) {
continue;
}
msg_id = atol(sx_value->val);
} else continue; // ignore it
}
}
if(msg_id < 0 || chnl_id < 0) {
r = ESXRCBADPROT;
goto __finish;
}
if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT;
else chan = (chnl_t *)usrtc_node_getdata(node);
/* lookup for the message */
pthread_rwlock_rdlock(&(chan->msglock));
if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) {
pthread_rwlock_unlock(&(chan->msglock));
r = ENOENT;
goto __finish;
} else {
pthread_rwlock_unlock(&(chan->msglock));
smsg = (sxmsg_t *)usrtc_node_getdata(node);
smsg->opcode = ESXOREPLYREQ;
smsg->payload = copy_sexp(msg);
smsg->flags |= ESXMSG_ISREPLY;
pthread_mutex_unlock(&(smsg->wait));
}
__finish:
destroy_sexp(sx);
return r;
}
static int __init_systemrpc_tree(usrtc_t *rtree)
{
/* security context functions */
if(__insert_rpc_function(rtree, "auth-set-context", __default_auth_set_context)) goto __fail;
if(__insert_rpc_function(rtree, "auth-set-attr", __default_auth_set_attr)) goto __fail;
if(__insert_rpc_function(rtree, "auth-set-error", __default_auth_set_error)) goto __fail;
/* channels negotiation ops */
if(__insert_rpc_function(rtree, "ch-get-types", __default_ch_get_types)) goto __fail;
if(__insert_rpc_function(rtree, "ch-gl-error", __default_ch_gl_error)) goto __fail;
if(__insert_rpc_function(rtree, "ch-set-types", __default_ch_set_types)) goto __fail;
if(__insert_rpc_function(rtree, "ch-open", __default_ch_open)) goto __fail;
if(__insert_rpc_function(rtree, "ch-open-ret", __default_ch_open_ret)) goto __fail;
if(__insert_rpc_function(rtree, "ch-close", __default_ch_close)) goto __fail;
if(__insert_rpc_function(rtree, "ch-close-ret", __default_ch_close_ret)) goto __fail;
/* messaging functions */
if(__insert_rpc_function(rtree, "ch-msg", __default_msg)) goto __fail;
if(__insert_rpc_function(rtree, "ch-msg-rete", __default_msg_return)) goto __fail;
if(__insert_rpc_function(rtree, "ch-msg-rapid", __default_msg_rapid)) goto __fail;
if(__insert_rpc_function(rtree, "ch-msg-repl", __default_msg_reply)) goto __fail;
return 0;
__fail:
__destroy_rpc_list_tree(rtree);
return ENOMEM;
}
static int __eval_cstr(char *cstr, cx_rpc_list_t *rpc_list, void *ctx)
{
int r = ENOENT;
sexp_t *sx;
usrtc_node_t *node;
cx_rpc_t *rentry;
char *rpcf;
if(!(sx = parse_sexp(cstr, strlen(cstr)))) return EBADE;
if(sx->ty == SEXP_LIST)
rpcf = sx->list->val;
else goto __enoent;
/* find an appropriate function */
node = usrtc_lookup(rpc_list->rpc_tree, rpcf);
if(!node) {
__enoent:
fprintf(stderr, "Invalid S-expression catched.\n");
destroy_sexp(sx);
return ENOENT;
}
else rentry = (cx_rpc_t *)usrtc_node_getdata(node);
/* call it */
r = rentry->rpcf(ctx, sx);
//if(r) destroy_sexp(sx);
return r;
}
static void *__cxslave_thread_listener(void *wctx)
{
conn_t *co = (conn_t *)wctx;
conn_sys_t *ssys = co->ssys;
char *buf = malloc(4096);
int r;
while((r = __conn_read(co, buf, 4096)) != -1) {
buf[r] = '\0';
r = __eval_cstr(buf, ssys->system_rpc, co);
}
co->flags &= ~CXCONN_ESTABL;
co->flags |= CXCONN_BROKEN;
/* ok the first of all we're need to wake up all */
__wake_up_waiters(co, ESXNOCONNECT);
/* now we need to end the poll */
pth_dqtpoll_destroy(co->tpoll, 1); /* force */
__connection_free(co);
free(buf);
return NULL;
}
static void *__cxmaster_thread_listener(void *wctx)
{
conn_t *co = (conn_t *)wctx;
conn_sys_t *ssys = co->ssys;
char *buf = malloc(4096);
int r;
while((r = __conn_read(co, buf, 4096)) != -1) {
buf[r] = '\0';
r = __eval_cstr(buf, ssys->system_rpc, co);
}
co->flags &= ~CXCONN_ESTABL;
co->flags |= CXCONN_BROKEN;
/* ok the first of all we're need to wake up all */
__wake_up_waiters(co, ESXNOCONNECT);
/* now we need to end the poll */
pth_dqtpoll_destroy(co->tpoll, 1); /* force */
__connection_free(co);
free(buf);
return NULL;
}
static void *__rmsg_queue_thread(void *ctx)
{
conn_t *co = (conn_t *)ctx;
pth_msg_t *tmp = malloc(sizeof(pth_msg_t));
sxmsg_t *msg;
chnl_t *ch;
int r = 0;
char *rpcf;
sexp_t *sx;
usrtc_node_t *node = NULL;
cx_rpc_t *rpccall;
struct __rpc_job *rjob = NULL;
if(!tmp) return NULL;
while(1) {
r = pth_queue_get(co->rqueue, NULL, tmp);
if(r) {
__fini:
free(tmp);
return NULL;
} else if(tmp->msgtype == END_MSG) goto __fini;
msg = tmp->data;
if(!msg) continue; /* spurious !! */
/* check to right job */
if(!(msg->flags & ESXMSG_USR)) { /* not a regular message */
msg->flags |= ESXMSG_NOWAY; /* mark it's as undeliverable */
msg->flags &= ~ESXMSG_PENDING;
pthread_mutex_unlock(&(msg->wait)); /* wake up the waitee */
continue;
} else {
/* now we're need to have a deal with the rpc calling, other - we don't care */
ch = msg->pch;
sx = (sexp_t *)msg->payload;
if(!sx) {
r = ESXRCBADPROT;
goto __err_ret;
}
/* get the function name */
if((sx->ty == SEXP_LIST) && (sx->list != NULL))
rpcf = sx->list->val;
else {
r = ESXRCBADPROT;
goto __err_ret;
}
node = usrtc_lookup(ch->rpc_list->rpc_tree, rpcf);
if(!node) {
r = ENOENT;
__err_ret:
msg_return(msg, r);
} else {
rpccall = (cx_rpc_t *)usrtc_node_getdata(node);
/* call this ! */
rjob = malloc(sizeof(struct __rpc_job)); // TODO: check it
rjob->msg = msg;
rjob->sx = sx;
rjob->rpcf = rpccall->rpcf;
pth_dqtpoll_add(co->tpoll, (void *)rjob, USR_MSG); // TODO: check it
}
}
}
return NULL;
}
static void *__msg_queue_thread(void *ctx)
{
conn_t *co = (conn_t *)ctx;
pth_msg_t *tmp = malloc(sizeof(pth_msg_t));
sxmsg_t *msg;
chnl_t *ch;
int r = 0, len;
char *buf = malloc(4096), *tb;
sexp_t *sx;
if(!tmp || !buf) {
if(tmp) free(tmp);
if(buf) free(buf);
return NULL;
}
while(1) {
r = pth_queue_get(co->mqueue, NULL, tmp);
if(r) {
__fini:
free(buf);
free(tmp);
return NULL;
}
/* message workout */
if(tmp->msgtype == END_MSG) goto __fini;
msg = tmp->data;
if(!msg) continue; /* spurious message */
if(!(msg->flags & ESXMSG_USR)) { /* not a regular message */
msg->flags |= ESXMSG_NOWAY; /* mark it's as undeliverable */
msg->flags &= ~ESXMSG_PENDING;
pthread_mutex_unlock(&(msg->wait)); /* wake up the waitee */
continue;
} else {
ch = msg->pch;
/* now we need to complete the request */
sx = (sexp_t *)msg->payload; tb = buf;
if(!(msg->flags & ESXMSG_PULSE)) { /* %s))) */
if(!(msg->flags & ESXMSG_ISREPLY))
snprintf(buf, 4096, "(ch-msg (:chid %lu (:msgid %lu ", ch->cid,
msg->mid);
else {
if(!sx) {
snprintf(buf, 4096, "(ch-msg-rete (:chid %lu (:msgid %lu (%d))))", ch->cid,
msg->mid, msg->opcode);
/* mark it to close */
msg->flags |= ESXMSG_CLOSURE;
/* ok, here we will write it and wait, destroying dialog while reply */
goto __ssl_write;
} else {
if(msg->flags & ESXMSG_ISRAPID) {
msg->flags |= ESXMSG_CLOSURE;
pthread_mutex_unlock(&(msg->wait)); /* wake it up */
snprintf(buf, 4096, "(ch-msg-rapid (:chid %lu (:msgid %lu ", ch->cid,
msg->mid);
} else
snprintf(buf, 4096, "(ch-msg-repl (:chid %lu (:msgid %lu ", ch->cid,
msg->mid);
}
}
len = strlen(buf);
tb += len*sizeof(char);
if(print_sexp(tb, 4096 - (len + 4*sizeof(char)), sx) == -1) {
msg->opcode = ENOMEM;
/* we don't need to wake up anybody */
if(msg->flags & ESXMSG_TIMEDOUT) {
/* clean up all the shit:
* 1. remove from message tree
* 2. destroy message itself
*/
destroy_sexp(msg->initial_sx);
msg->initial_sx = NULL;
msg->payload = NULL;
if(msg->flags & ESXMSG_ISREPLY)
destroy_sexp(msg->payload);
} else {
pthread_mutex_unlock(&(msg->wait));
}
}
}
len = strlen(tb);
tb += len*sizeof(char);
strcat(tb, ")))");
__ssl_write:
if(msg->flags & ESXMSG_CLOSURE) {
/* wake up it */
pthread_mutex_unlock(&(msg->wait));
/* first remove the message from tree */
pthread_rwlock_wrlock(&(ch->msglock));
usrtc_delete(ch->msgs_tree, &(msg->pendingq_node));
pthread_rwlock_unlock(&(ch->msglock));
/* destroy */
destroy_sexp(msg->initial_sx);
if(msg->flags & ESXMSG_ISREPLY && msg->payload)
destroy_sexp((sexp_t *)msg->payload);
__destroy_msg(msg);
}
/* write it */
if(__conn_write(co, (void *)buf, strlen(buf) + sizeof(char)) < 0) {
co->flags &= ~CXCONN_ESTABL;
co->flags |= CXCONN_BROKEN;
__wake_up_waiters(co, ESXNOCONNECT);
}
}
len = 0;
}
free(buf);
return NULL;
}
/* this function is an ugly implementation to get C string with uuid */
extern char *__generate_uuid(void);
/* this is a callback to perform a custom SSL certs chain validation,
* as I promised here the comments, a lot of ...
* The first shit: 0 means validation failed, 1 otherwise
* The second shit: X509 API, I guess u will love it ;-)
* openssl calls this function for each certificate in chain,
* since our case is a simple (depth of chain is one, since we're
* don't care for public certificates lists or I cannot find any reasons to
* do it ...), amount of calls reduced, and in this case we're interested
* only in top of chain i.e. actual certificate used on client side,
* the validity of signing for other certificates within chain is
* guaranteed by the ssl itself.
* u know, we need to lookup in database, or elsewhere... some information
* about client certificate, and decide - is it valid, or not?, if so
* yep I mean it's valid, we can assign it's long fucking number to
* security context, to use in ongoing full scaled connection handshaking.
*/
static int __verify_certcall(int preverify_ok, X509_STORE_CTX *ctx)
{
// X509 *cert = X509_STORE_CTX_get_current_cert(ctx);
int err = X509_STORE_CTX_get_error(ctx), depth = X509_STORE_CTX_get_error_depth(ctx);
SSL *ssl = X509_STORE_CTX_get_ex_data(ctx, SSL_get_ex_data_X509_STORE_CTX_idx());
conn_t *co = SSL_get_ex_data(ssl, ex_ssldata_index); /* this is a custom data we're set before */
conn_sys_t *ssys = co->ssys;
/* now we need to check for certificates with a long chain,
* so since we have a short one, reject long ones */
if(depth > VERIFY_DEPTH) { /* longer than we expect */
preverify_ok = 0; /* yep, 0 means error for those function callback in openssl, fucking set */
err = X509_V_ERR_CERT_CHAIN_TOO_LONG;
X509_STORE_CTX_set_error(ctx, err);
}
if(!preverify_ok) return 0;
/* ok, now we're on top of SSL (depth == 0) certs chain,
* and we can validate client certificate */
if(!depth) {
co->pctx->certid =
ASN1_INTEGER_get((const ASN1_INTEGER *)X509_get_serialNumber(ctx->current_cert));
/* now we're need to check the ssl cert */
if(ssys->validate_sslpem) {
if(ssys->validate_sslpem(co)) return 0;
else return 1;
} else return 0;
}
return preverify_ok;
}
/* dummy just to check the server side */
static int __verify_certcall_dummy(int preverify_ok, X509_STORE_CTX *ctx)
{
return preverify_ok;
}
/* thread serve the whole connections set i.e. subsystem
* actually it works with system messages
*/
static void *__system_queue_listener(void *data)
{
conn_sys_t *ssys = (conn_sys_t *)data;
int r;
pth_msg_t *tmp = malloc(sizeof(pth_msg_t));
sxmsg_t *sysmsg;
sxpayload_t *payload;
chnl_t *chan;
conn_t *co;
if(!tmp) return NULL;
while(1) {
r = pth_queue_get(ssys->ioqueue, NULL, tmp);
if(r) {
free(tmp);
return NULL;
}
/* ok message is delivered */
sysmsg = tmp->data;
if(!sysmsg) continue; /* ignore dummy messages */
if(!(sysmsg->flags & ESXMSG_SYS)) { /* not a system message */
sysmsg->flags |= ESXMSG_NOWAY; /* mark it's as undeliverable */
sysmsg->flags &= ~ESXMSG_PENDING;
pthread_mutex_unlock(&(sysmsg->wait)); /* wake up the waitee */
continue;
} else {
chan = sysmsg->pch;
co = chan->connection;
payload = (sxpayload_t *)sysmsg->payload;
/* write the buf */
if(__conn_write(co, (void *)payload->cstr, strlen(payload->cstr) + 1) < 0) {
co->flags &= ~CXCONN_ESTABL;
co->flags |= CXCONN_BROKEN;
__wake_up_waiters(co, ESXNOCONNECT);
}
}
}
return NULL;
}
/* general initialization must be called within app uses connection layer */
int connections_init(conn_sys_t *ssys)
{
int r = 0;
if(!ssys) return EINVAL;
else if(!(ssys->connections = malloc(sizeof(usrtc_t)))) {
r = ENOMEM;
goto __fail;
}
/* zeroing */
ssys->rootca = ssys->certkey = ssys->certpem = NULL;
ssys->validate_sslpem = NULL;
ssys->secure_check = NULL;
ssys->on_destroy = NULL;
/* init connections list */
usrtc_init(ssys->connections, USRTC_REDBLACK, MAX_CONNECTIONS,
__cmp_cstr);
if((r = pthread_rwlock_init(&(ssys->rwlock), NULL)))
goto __fail_1;
/* init queues */
if(!(ssys->ioq = malloc(sizeof(pth_queue_t)))) { /* general io queue */
r = ENOMEM;
goto __fail_2;
}
if((r = pth_queue_init(ssys->ioq))) goto __fail_3;
if(!(ssys->ioqueue = malloc(sizeof(pth_queue_t)))) { /* system io queue */
r = ENOMEM;
goto __fail_2;
}
if((r = pth_queue_init(ssys->ioqueue))) goto __fail_3_1;
/* init SSL certificates checking functions */
/* init RPC list related functions */
if(!(ssys->system_rpc = malloc(sizeof(cx_rpc_list_t)))) {
r = ENOMEM;
goto __fail_3;
} else {
if(!(ssys->system_rpc->rpc_tree = malloc(sizeof(usrtc_t)))) {
r = ENOMEM;
__fail_rpc:
free(ssys->system_rpc);
goto __fail_3_1;
}
usrtc_init(ssys->system_rpc->rpc_tree, USRTC_SPLAY, 256, __cmp_cstr);
r = __init_systemrpc_tree(ssys->system_rpc->rpc_tree);
if(r) {
free(ssys->system_rpc->rpc_tree);
goto __fail_rpc;
}
}
/* init SSL library */
SSL_library_init();
OpenSSL_add_all_algorithms();
SSL_load_error_strings();
/* create threads for queue */
if((r = pthread_create(&ssys->ios_thread, NULL, __system_queue_listener, (void *)ssys))) {
goto __fail_rpc;
}
return 0;
__fail_3_1:
free(ssys->ioqueue);
__fail_3:
free(ssys->ioq);
__fail_2:
pthread_rwlock_destroy(&(ssys->rwlock));
__fail_1:
free(ssys->connections);
__fail:
return r;
}
/* load certificates */
int connections_setsslserts(conn_sys_t *ssys, const char *rootca,
const char *certpem, const char *certkey)
{
int r = ENOMEM;
if(!ssys) return EINVAL;
/* simply copying */
if(!(ssys->rootca = strdup(rootca))) return ENOMEM;
if(!(ssys->certkey = strdup(certkey))) goto __fail;
if(!(ssys->certpem = strdup(certpem))) goto __fail;
r = 0;
return 0;
__fail:
if(ssys->rootca) free(ssys->rootca);
if(ssys->certkey) free(ssys->certkey);
if(ssys->certpem) free(ssys->certpem);
return r;
}
int connections_setrpclist_function(conn_sys_t *ssys,
usrtc_t* (*get_rpc_typed_list_tree)
(conn_t *))
{
if(!ssys) return EINVAL;
ssys->get_rpc_typed_list_tree = get_rpc_typed_list_tree;
return 0;
}
static void __connections_subsystem_connection_remove(conn_t *co)
{
pthread_rwlock_wrlock(&(co->ssys->rwlock));
usrtc_delete(co->ssys->connections, &(co->csnode));
pthread_rwlock_unlock(&(co->ssys->rwlock));
return;
}
#define __TMPBUFLEN 2048
/* connection_initiate: perform a connection thru the socket to the
* host with master certificate, i.e. it's a slave one for client.
*/
int connection_initiate_m(conn_sys_t *ssys, conn_t *co, const char *host,
int port, const char *SSL_cert, perm_ctx_t *pctx)
{
int r = 0, sd;
int bytes = 0;
char *uuid;
char *buf = NULL;
struct hostent *host_;
struct sockaddr_in addr;
#ifdef WIN32
WSADATA wsaData;
#endif
usrtc_t *ch_tree, *rpc_tree;
pth_queue_t *mqueue = malloc(sizeof(pth_queue_t));
pth_queue_t *rqueue = malloc(sizeof(pth_queue_t));
pth_dqtpoll_t *tpoll = malloc(sizeof(pth_dqtpoll_t));
idx_allocator_t *idx_ch = malloc(sizeof(idx_allocator_t));
if(!mqueue) {
__fallenomem:
r = ENOMEM;
__fall0:
if(rqueue) free(rqueue);
if(mqueue) free(mqueue);
if(tpoll) free(tpoll);
if(idx_ch) free(idx_ch);
return r;
}
#ifdef WIN32
WSAStartup(MAKEWORD(2, 2), &wsaData);
#endif
if(!tpoll) goto __fallenomem;
if(!idx_ch) goto __fallenomem;
if(!co) {
__falleinval:
r = EINVAL;
goto __fall0;
} else if(!ssys) goto __falleinval;
if(!host) goto __falleinval;
if(!SSL_cert) goto __falleinval;
if(!pctx) goto __falleinval;
memset(co, 0, sizeof(conn_t));
/* setup connections set */
co->ssys = ssys;
pth_dqtpoll_init(tpoll, __rpc_callback);
if(!idx_ch) return ENOMEM;
else r = idx_allocator_init(idx_ch, MAX_CHANNELS*MAX_MULTI, 0);
if(r) return r;
if(!(uuid = __generate_uuid())) {
return ENOMEM;
}
if(!(ch_tree = malloc(sizeof(usrtc_t)))) {
r = ENOMEM;
goto __fail;
}
if(!(rpc_tree = malloc(sizeof(usrtc_t)))) {
r = ENOMEM;
goto __fail_1;
}
if((r = pthread_mutex_init(&co->oplock, NULL))) goto __fail_2;
if((r = pthread_rwlock_init(&co->chnl_lock, NULL))) goto __fail_3;
usrtc_init(rpc_tree, USRTC_REDBLACK, MAX_RPC_LIST, __cmp_int);
usrtc_init(ch_tree, USRTC_REDBLACK, MAX_CHANNELS, __cmp_ulong);
co->idx_ch = idx_ch;
/* assign message queue */
if((r = pth_queue_init(mqueue))) goto __fail_3;
co->mqueue = mqueue;
/* assign repl message queue */
if((r = pth_queue_init(rqueue))) goto __fail_3;
co->rqueue = rqueue;
/* init SSL certificates and context */
co->ctx = SSL_CTX_new(TLSv1_2_client_method());
if(!co->ctx) { ERR_print_errors_fp(stderr);
r = EINVAL; goto __fail_3; }
else {
SSL_CTX_set_verify(co->ctx, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT,
__verify_certcall_dummy);
SSL_CTX_set_verify_depth(co->ctx, 1); /* FIXME: use configuration */
}
/* load certificates */
SSL_CTX_load_verify_locations(co->ctx, ssys->rootca, NULL);
/* set the local certificate from CertFile */
if(SSL_CTX_use_certificate_file(co->ctx, SSL_cert,
SSL_FILETYPE_PEM)<=0) {
r = EINVAL;
goto __fail_3;
}
/* set the private key from KeyFile (may be the same as CertFile) */
if(SSL_CTX_use_PrivateKey_file(co->ctx, SSL_cert,
SSL_FILETYPE_PEM)<=0) {
r = EINVAL;
goto __fail_3;
}
/* verify private key */
if (!SSL_CTX_check_private_key(co->ctx)) {
r = EINVAL;
goto __fail_3;
}
/* assign allocated memory */
co->rpc_list = rpc_tree;
co->chnl_tree = ch_tree;
co->uuid = uuid;
/* connect to the pointed server */
/* resolve host */
if(!(buf = malloc(__TMPBUFLEN))) {
r = ENOMEM;
goto __fail_3;
}
//r=__resolvehost(host, buf, __TMPBUFLEN, &host_);
#ifdef WIN32
host_ = gethostbyname(host);
#else
r = __resolvehost(host, buf, __TMPBUFLEN, &host_);
#endif
if(r) {
r = ENOENT;
free(buf);
goto __fail_3;
}
// /* create a socket */
sd = socket(PF_INET, SOCK_STREAM, 0);
bzero(&addr, sizeof(addr));
/* try to connect it */
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = *(uint32_t*)(host_->h_addr);
r=connect(sd, (struct sockaddr*)&addr, sizeof(addr));
if ( r != 0) {
printf("connect error %d %s \n",r, strerror(errno));
close(sd);
free(buf);
r = ENOENT; /* couldn't connect to the desired host */
goto __fail_3;
}
/* now we will create an SSL connection */
co->ssl = SSL_new(co->ctx);
SSL_set_fd(co->ssl, sd); /* attach connected socket */
// BIO_set_nbio(SSL_get_rbio(co->ssl), 1);
SSL_set_connect_state(co->ssl);
if(SSL_connect(co->ssl) == -1) {
r = EBADE;
free(buf);
/* shutdown connection */
goto __fail_3;
} /* if success we're ready to use established SSL channel */
/* auth and RPC contexts sync */
co->pctx = pctx;
snprintf(buf, __TMPBUFLEN, "(auth-set-context ((:user \"%s\")(:passwd \"%s\")))",
pctx->login, pctx->passwd);
/* send an auth request */
if(__conn_write(co, buf, strlen(buf) + sizeof(char))) {
__finalize:
co->flags &= ~CXCONN_ESTABL;
co->flags |= CXCONN_BROKEN;
r = ESXNOCONNECT;
free(buf);
__retry_shut:
if(!SSL_shutdown(co->ssl)) {
usleep(100);
goto __retry_shut;
}
/* shutdown connection */
goto __fail_3;
}
/* read the message reply */
bytes = __conn_read(co, buf, __TMPBUFLEN);
if(bytes == -1) {
// we've lost the connection
co->flags &= ~CXCONN_ESTABL;
r = ESXNOCONNECT;
co->flags |= CXCONN_BROKEN;
free(buf);
/* shutdown connection */
goto __fail_3;
}
buf[bytes] = 0;
/* perform an rpc call */
r = __eval_cstr(buf, ssys->system_rpc, (void *)co);
if(!r) { /* all is fine security context is good */
snprintf(buf, __TMPBUFLEN, "(ch-get-types)"); /* now we should receive possible channel types */
if(__conn_write(co, buf, strlen(buf) + sizeof(char))) {
goto __finalize;
}
/* read the message reply */
bytes = __conn_read(co, buf, __TMPBUFLEN);
if(bytes == -1) {
goto __finalize;
}
buf[bytes] = 0;
/* perform an rpc call */
r = __eval_cstr(buf, ssys->system_rpc, (void *)co);
}
free(buf); /* now we can free the temporary buffer */
/* a listening thread creation (incoming messages) */
if(!r) { /* success let's start a listening thread */
r = pthread_create(&co->cthread, NULL, __cxslave_thread_listener, (void *)co);
if(!r) {
/* add connection to the list */
usrtc_node_init(&co->csnode, co);
co->flags = (CXCONN_SLAVE | CXCONN_ESTABL); /* set the right flags */
pthread_rwlock_wrlock(&ssys->rwlock);
usrtc_insert(ssys->connections, &co->csnode, (void *)co->uuid);
pthread_rwlock_unlock(&ssys->rwlock);
}
r = pthread_create(&co->msgthread, NULL, __msg_queue_thread, (void *)co);
if(r) goto __finalize;
r = pthread_create(&co->rmsgthread, NULL, __rmsg_queue_thread, (void *)co);
if(r) goto __finalize;
pth_dqtpoll_run(tpoll);
co->tpoll = tpoll;
return 0;
}
__fail_3:
pthread_mutex_destroy(&co->oplock);
__fail_2:
free(rpc_tree);
__fail_1:
free(ch_tree);
__fail:
free(uuid);
return r;
}
int connection_create_fapi_m(conn_sys_t *ssys, conn_t *co, int sck,
struct in_addr *addr)
{
int r = 0;
int bytes = 0;
char *uuid;
char *buf = NULL;
usrtc_t *ch_tree, *rpc_tree;
pth_queue_t *rqueue = malloc(sizeof(pth_queue_t));
pth_queue_t *mqueue = malloc(sizeof(pth_queue_t));
pth_dqtpoll_t *tpoll = malloc(sizeof(pth_dqtpoll_t));
if(!tpoll) return ENOMEM; // TODO: fallback
idx_allocator_t *idx_ch = malloc(sizeof(idx_allocator_t));
if(!co) return EINVAL;
else memset(co, 0, sizeof(conn_t));
pth_dqtpoll_init(tpoll, __rpc_callback); // TODO: check it
if(!idx_ch) return ENOMEM;
else r = idx_allocator_init(idx_ch, MAX_CHANNELS*MAX_MULTI, 0);
if(r) return r;
if(!(uuid = __generate_uuid())) return ENOMEM;
if(!(ch_tree = malloc(sizeof(usrtc_t)))) {
r = ENOMEM;
goto __fail;
}
if(!(rpc_tree = malloc(sizeof(usrtc_t)))) {
r = ENOMEM;
goto __fail_1;
}
if((r = pthread_mutex_init(&co->oplock, NULL))) goto __fail_2;
if((r = pthread_rwlock_init(&co->chnl_lock, NULL))) goto __fail_3;
usrtc_init(rpc_tree, USRTC_REDBLACK, MAX_RPC_LIST, __cmp_int);
usrtc_init(ch_tree, USRTC_REDBLACK, MAX_CHANNELS, __cmp_ulong);
co->idx_ch = idx_ch;
/* setup connections set */
co->ssys = ssys;
/* assign message queue */
r = pth_queue_init(rqueue);
if(r) goto __fail_3;
co->rqueue = rqueue;
/* assigned outbone message queue master also has this one */
r = pth_queue_init(mqueue);
if(r) goto __fail_3;
co->mqueue = mqueue;
/* init SSL certificates and context */
co->ctx = SSL_CTX_new(TLSv1_2_server_method());
if(!co->ctx) { r = EINVAL; printf("%s:%d\n", __FUNCTION__, __LINE__);goto __fail_3; }
else {
/* set verify context */
SSL_CTX_set_verify(co->ctx, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT,
__verify_certcall);
/* set verify depth */
SSL_CTX_set_verify_depth(co->ctx, VERIFY_DEPTH);
}
/* load certificates */
SSL_CTX_load_verify_locations(co->ctx, ssys->rootca, NULL);
/* set the local certificate from CertFile */
if(SSL_CTX_use_certificate_file(co->ctx, ssys->certpem,
SSL_FILETYPE_PEM)<=0) {
printf("certpem1 = %s\n", ssys->certpem);
ERR_print_errors_fp(stderr);
r = EINVAL; printf("%s:%d\n", __FUNCTION__, __LINE__);
goto __fail_3;
}
/* set the private key from KeyFile (may be the same as CertFile) */
if(SSL_CTX_use_PrivateKey_file(co->ctx, ssys->certkey,
SSL_FILETYPE_PEM)<=0) {
r = EINVAL; printf("%s:%d\n", __FUNCTION__, __LINE__);
goto __fail_3;
}
/* verify private key */
if (!SSL_CTX_check_private_key(co->ctx)) {
r = EINVAL; printf("%s:%d\n", __FUNCTION__, __LINE__);
goto __fail_3;
}
/* assign allocated memory */
co->rpc_list = rpc_tree;
co->chnl_tree = ch_tree;
co->uuid = uuid;
if(!(buf = malloc(__TMPBUFLEN))) {
r = ENOMEM;
goto __fail_3;
}
/* now we will create an SSL connection */
co->ssl = SSL_new(co->ctx);
co->pctx = malloc(sizeof(perm_ctx_t));
SSL_set_fd(co->ssl, sck); /* attach connected socket */
/* ok now we need to initialize address */
if(addr) {
co->pctx->addr = malloc(sizeof(struct in_addr));
memcpy(co->pctx->addr, addr, sizeof(struct in_addr));
} else co->pctx->addr = NULL;
SSL_set_accept_state(co->ssl);
/* set the context to verify ssl connection */
SSL_set_ex_data(co->ssl, ex_ssldata_index, (void *)co);
//BIO_set_nbio(SSL_get_rbio(co->ssl), 1);
SSL_set_accept_state(co->ssl);
if(SSL_accept(co->ssl) == -1) {
r = EBADE;
free(buf);
/* shutdown connection */
goto __fail_3;
} /* if success we're ready to use established SSL channel */
/*******************************************/
/*-=Protocol part of connection establish=-*/
/*******************************************/
while(!(co->flags & CXCONN_ESTABL)) { /* read the initiation stage connections */
bytes = __conn_read(co, buf, __TMPBUFLEN);
if(bytes > 0) {
buf[bytes] = 0;
r = __eval_cstr(buf, ssys->system_rpc, (void *)co);
if(r) {
fprintf(stderr, "Initiation func return %d\n", r);
free(buf);
SSL_shutdown(co->ssl);
goto __fail_3;
}
} else {
if(bytes < 0) {
printf("Terminate SSL connection, the other end is lost.\n");
co->flags &= ~CXCONN_ESTABL;
co->flags |= CXCONN_BROKEN;
free(buf);
if(ssys->on_destroy) ssys->on_destroy(co);
SSL_shutdown(co->ssl);
r = ESXNOCONNECT;
goto __fail_3;
}
}
}
/* before it will be done assign rpc list */
if(ssys->get_rpc_typed_list_tree)
co->rpc_list = ssys->get_rpc_typed_list_tree(co);
free(buf);
r = pthread_create(&co->cthread, NULL, __cxmaster_thread_listener, (void *)co);
if(!r) {
/* add connection to the list */
usrtc_node_init(&co->csnode, co);
co->flags |= CXCONN_MASTER; /* set the right flags */
pthread_rwlock_wrlock(&ssys->rwlock);
usrtc_insert(ssys->connections, &co->csnode, (void *)co->uuid);
pthread_rwlock_unlock(&ssys->rwlock);
/* threads poll --- */
r = pthread_create(&co->msgthread, NULL, __msg_queue_thread, (void *)co);
if(r) goto __fail_3;
r = pthread_create(&co->rmsgthread, NULL, __rmsg_queue_thread, (void *)co);
if(r) goto __fail_3;
pth_dqtpoll_run(tpoll);
co->tpoll = tpoll;
}
return r;
__fail_3:
pthread_mutex_destroy(&co->oplock);
__fail_2:
free(rpc_tree);
__fail_1:
free(ch_tree);
__fail:
free(uuid);
return r;
}
int connection_close(conn_t *co)
{
void *nil;
pthread_cancel(co->cthread);
/* wait for the main listener */
pthread_join(co->cthread, &nil);
/* ok the first of all we're need to wake up all */
__wake_up_waiters(co, ESXNOCONNECT);
/* now we need to end the poll */
pth_dqtpoll_destroy(co->tpoll, 1); /* force */
__connection_free(co);
return 0;
}
extern int __create_reg_msg(sxmsg_t **msg, chnl_t *ch);
extern int __create_sys_msg(sxmsg_t **msg, char *uuid, chnl_t *ch, sxpayload_t *data);
static void __connection_free(conn_t *co)
{
pth_msg_t msg;
void *nil;
/* since we doesn't works with IN queue and job dispatching
* close the thread
*/
msg.msgtype = END_MSG;
msg.data = NULL;
pth_queue_add(co->rqueue, &msg, END_MSG); /* it will drop the thread */
pthread_join(co->rmsgthread, &nil); /* wait for it */
/* message sending dispatch thread must be finished too */
pth_queue_add(co->mqueue, &msg, END_MSG); /* it will drop the thread */
pthread_join(co->msgthread, &nil); /* wait for it */
/* since we don't have any threads working with channels destroy them */
__destroy_all_channels(co);
/* permission context and callback of exists */
if(co->ssys->on_destroy) co->ssys->on_destroy(co);
else { /* we don't have a handler */
if(co->pctx->login) free(co->pctx->login);
if(co->pctx->passwd) free(co->pctx->passwd);
}
__connections_subsystem_connection_remove(co);
/* now we're ready to free other resources */
if(co->uuid) free(co->uuid);
/* idx allocator */
idx_allocator_destroy(co->idx_ch);
free(co->idx_ch);
free(co->chnl_tree);
/* kill SSL context */
SSL_shutdown(co->ssl);
close(SSL_get_fd(co->ssl));
SSL_free(co->ssl);
SSL_CTX_free(co->ctx);
/* destroy queues */
pth_queue_destroy(co->mqueue, 0, NULL);
pth_queue_destroy(co->rqueue, 0, NULL);
/* locks */
pthread_rwlock_destroy(&(co->chnl_lock));
pthread_mutex_destroy(&(co->oplock));
/* kill permission context */
if(co->pctx) free(co->pctx);
return;
}