You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
libsxmp/lib/connection.c

2814 lines
75 KiB
C

/*
* Secure Network Transport Layer Library implementation.
* This is a proprietary software. See COPYING for further details.
*
* (c) 2013-2014 Copyright Askele, inc. <http://askele.com>
* (c) 2013-2014 Copyright Askele Ingria, inc. <http://askele-ingria.com>
*/
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/select.h>
#include <unistd.h>
#include <fcntl.h>
#include <netdb.h>
#include <uuid/uuid.h>
#include <openssl/ssl.h>
#include <openssl/err.h>
#include <tdata/usrtc.h>
#include <sexpr/sexp.h>
#include <sntl/connection.h>
struct __rpc_job
{
sxmsg_t *msg;
sexp_t *sx;
int (*rpcf) (void *, sexp_t *);
};
conn_sys_t *conn_sys = NULL;
static long __cmp_ulong(const void *a, const void *b);
/* message alloc and destroy */
static sxmsg_t *__allocate_msg(int *res);
static void __destroy_msg(sxmsg_t *msg);
/* examination */
static inline int __exam_connection(conn_t *co)
{
int r = 0;
pthread_mutex_lock(&co->oplock);
if(co->flags | CXCONN_BROKEN) {
/* wake up all */
/* destroy thread poll */
/* free all memory and sync primitives */
r = 1;
}
pthread_mutex_unlock(&co->oplock);
return r;
}
static int __rpc_callback(void *data)
{
struct __rpc_job *job = (struct __rpc_job *)data;
int rc = 0;
rc = job->rpcf((void *)(job->msg), job->sx);
free(job);
return rc;
}
int __alloc_channel(ulong_t cid, conn_t *co, rpc_typed_list_t *rlist, chnl_t **channel)
{
int r = 0;
chnl_t *ch = malloc(sizeof(chnl_t));
usrtc_t *msg_tree = malloc(sizeof(usrtc_t));
idx_allocator_t *idx_msg = malloc(sizeof(idx_allocator_t));
if(!idx_msg) goto __fin_enomem;
else if(idx_allocator_init(idx_msg, MAX_MSGINDEX, 0)) goto __fin_enomem;
if(!ch || !msg_tree) {
__fin_enomem:
r = ENOMEM;
goto __fin_up;
} else {
usrtc_init(msg_tree, USRTC_REDBLACK, MAX_PENDINGMSG, __cmp_ulong);
ch->cid = cid;
ch->flags = ch->use_count = 0;
usrtc_node_init(&ch->node, ch);
if(rlist) ch->rpc_list = rlist->rpc_list;
/* init locks */
if(pthread_rwlock_init(&(ch->msglock), NULL)) {
r = ENOMEM;
goto __fin_up;
}
if(pthread_mutex_init(&(ch->oplock), NULL)) {
pthread_rwlock_destroy(&(ch->msglock));
r = ENOMEM;
goto __fin_up;
}
/* assign all the stuff */
ch->idx_msg = idx_msg;
ch->msgs_tree = msg_tree;
ch->connection = co;
}
__fin_up:
if(r) {
if(idx_msg) free(idx_msg);
if(ch) free(ch);
if(msg_tree) free(msg_tree);
return r;
} else {
*channel = ch;
return 0;
}
}
static int __conn_read(conn_t *co, void *buf, size_t buf_len)
{
int rfd = SSL_get_fd(co->ssl), r;
fd_set readset;
fprintf(stderr, "\tListening ... on %s\n", co->uuid);
/* get prepare to select */
FD_ZERO(&readset);
FD_SET(rfd, &readset);
/* waits until something will be ready to read */
r = select(FD_SETSIZE, &readset, NULL, NULL, NULL);
if(r < 0) {
printf("select (%d)\n", errno);
return -1;
}
if(!r) {
printf("Nothing to wait for\n");
return 0;
}
if(r && FD_ISSET(rfd, &readset)) {
do {
//pthread_mutex_lock(&(co->oplock));
/* ok, now we're ready to perform SSL_read */
r = SSL_read(co->ssl, buf, (int)buf_len);
switch(SSL_get_error(co->ssl, r)) {
case SSL_ERROR_NONE:
//printf("Read done (f:%d)\n", rfd);
/* this is means we're get ridden it all */
return r; break;
case SSL_ERROR_ZERO_RETURN:
printf("No data to read\n");
/* no data to read ... */
return 0; break;
case SSL_ERROR_WANT_READ:
case SSL_ERROR_WANT_WRITE:
printf("Bypass until SSL buffer not ready.\n");
return 0;
default: /* seems the connection lost */
fprintf(stderr, "(RD)Unknown error on %s\n", co->uuid);
return -1;
}
//pthread_mutex_unlock(&(co->oplock));
} while(SSL_pending(co->ssl));
}
return 0;
}
static int __conn_write(conn_t *co, void *buf, size_t buf_len)
{
int r;
pthread_mutex_lock(&(co->oplock));
__retry:
r = SSL_write(co->ssl, buf, (int)buf_len);
switch(SSL_get_error(co->ssl, r)) {
case SSL_ERROR_WANT_READ:
case SSL_ERROR_WANT_WRITE:
goto __retry;
break;
default:
pthread_mutex_unlock(&(co->oplock));
if(r < 0) {
fprintf(stderr, "(WR)Unknown error on %s (%d)\n", co->uuid, r);
return -1;
} else return 0;
}
pthread_mutex_unlock(&(co->oplock));
return 0;
}
static long __cmp_cstr(const void *a, const void *b)
{
return strcmp((char *)a, (char *)b);
}
static long __cmp_int(const void *a, const void *b)
{
return *(int *)a - *(int *)b;
}
static long __cmp_ulong(const void *a, const void *b)
{
return *(ulong_t *)a - *(ulong_t *)b;
}
static int __resolvehost(const char *hostname, char *buf, int buf_len,
struct hostent **rhp)
{
struct hostent *hostbuf = malloc(sizeof(struct hostent));
struct hostent *hp = *rhp = NULL;
int herr = 0, hres = 0;
if(!hostbuf) return NO_ADDRESS;
hres = gethostbyname_r(hostname, hostbuf,
buf, buf_len, &hp, &herr);
if (!hp) return NO_ADDRESS;
if(hres) return NO_ADDRESS;
*rhp = hp;
return NETDB_SUCCESS;
}
static void __destroy_rpc_list_tree(usrtc_t *tree)
{
usrtc_node_t *node;
cx_rpc_t *ent;
for(node = usrtc_first(tree); node != NULL; node = usrtc_first(tree)) {
ent = (cx_rpc_t *)usrtc_node_getdata(node);
usrtc_delete(tree, node);
free(ent->name);
free(ent);
}
return;
}
static int __insert_rpc_function(usrtc_t *tree, const char *name, int (*rpcf)(void *, sexp_t *))
{
cx_rpc_t *ent = malloc(sizeof(cx_rpc_t));
usrtc_node_t *node;
if(!ent) return ENOMEM;
else node = &ent->node;
if(!(ent->name = strdup(name))) {
free(ent);
return ENOMEM;
} else ent->rpcf = rpcf;
usrtc_node_init(node, ent);
usrtc_insert(tree, node, ent->name);
return 0;
}
/* wake up all waiters on messages with given opcode */
static void __wake_up_waiters(conn_t *co, int opcode)
{
usrtc_node_t *node = NULL, *last_node = NULL;
usrtc_node_t *msg_node = NULL, *last_msg_node = NULL;
chnl_t *ch;
sxmsg_t *smsg = NULL;
pthread_rwlock_wrlock(&(co->chnl_lock));
node = usrtc_first(co->chnl_tree);
last_node = usrtc_last(co->chnl_tree);
/* going through channels tree */
while(!usrtc_isempty(co->chnl_tree)) {
ch = (chnl_t *)usrtc_node_getdata(node);
pthread_rwlock_rdlock(&(ch->msglock));
msg_node = usrtc_first(ch->msgs_tree);
last_msg_node = usrtc_last(ch->msgs_tree);
while(!usrtc_isempty(ch->msgs_tree)) { /* messages bypassing */
smsg = (sxmsg_t *)usrtc_node_getdata(msg_node);
smsg->opcode = opcode;
/* wake up waiting thread */
pthread_mutex_unlock(&(smsg->wait));
if(msg_node == last_msg_node) break;
msg_node = usrtc_next(ch->msgs_tree, msg_node);
}
pthread_rwlock_unlock(&(ch->msglock));
if(node == last_node) break;
node = usrtc_next(co->chnl_tree, node);
}
pthread_rwlock_unlock(&(co->chnl_lock));
return;
}
static int __default_auth_set_context(void *cctx, sexp_t *sx)
{
conn_t *co = (conn_t *)cctx;
char *val, *var, *tbuf = NULL;
sexp_t *lsx, *sx_iter, *sx_in;
int llen, idx, err = 0;
//co->pctx = malloc(sizeof(perm_ctx_t));
/* skip keyword itself */
lsx = sx->list->next;
/* now we expect a list of lists */
if(lsx->ty != SEXP_LIST) {
err = ESXRCBADPROT;
goto __reply;
}
/* take length of the list */
llen = sexp_list_length(lsx);
if(!llen) return 0; /* other side will not set any security attributes */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
sexp_list_car(sx_iter, &sx_in);
if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) {
err = ESXRCBADPROT;
goto __reply;
} else val = sx_in->val;
if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */
sexp_list_cdr(sx_iter, &sx_in);
if(!SEXP_IS_TYPE(sx_in, SEXP_DQUOTE)) {
err = ESXRCBADPROT;
goto __reply;
} else var = sx_in->val;
/* ok, now we need to analyze parameters */
if(!strcmp(val, ":user") && var) {
co->pctx->login = strdup(var);
} else if(!strcmp(val, ":passwd") && var) {
co->pctx->passwd = strdup(var);
} else {
/* just ignore in default implementation */
}
} else continue; /* ignore */
}
/* ok, now we need to fill security context */
tbuf = malloc(2048);
if(!tbuf) {
err = ENOMEM;
goto __reply;
}
if(conn_sys->secure_check)
err = conn_sys->secure_check(co);
__reply:
if(err) {
snprintf(tbuf, 2048, "(auth-set-error (%d))", err);
} else {
snprintf(tbuf, 2048, "(auth-set-attr (:attr %d)(:uid %ld)(:gid %ld))",
co->pctx->p_attr, co->pctx->uid, co->pctx->gid);
}
/* we will send it */
if(__conn_write(co, tbuf, strlen(tbuf)) < 0) {
co->flags &= ~CXCONN_ESTABL;
co->flags |= CXCONN_BROKEN;
__wake_up_waiters(co, ESXNOCONNECT);
}
destroy_sexp(sx);
free(tbuf);
return err;
}
static int __default_auth_set_attr(void *cctx, sexp_t *sx)
{
conn_t *co = (conn_t *)cctx;
char *val, *var;
sexp_t *lsx, *sx_iter, *sx_in;
int llen, idx, r = 0;
/* skip keyword itself */
lsx = sx->list->next;
/* now we expect a list of lists */
if(lsx->ty != SEXP_LIST) {
// printf("%s:%d\n", __FUNCTION__, __LINE__);
r = ESXRCBADPROT;
goto __finish;
}
/* take length of the list */
llen = sexp_list_length(lsx);
if(!llen) return 0; /* other side will not set any security attributes */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
sexp_list_car(sx_iter, &sx_in);
if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) {
r = ESXRCBADPROT;
goto __finish;
} else val = sx_in->val;
if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */
sexp_list_cdr(sx_iter, &sx_in);
if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) {
r = ESXRCBADPROT;
goto __finish;
} else var = sx_in->val;
/* ok, now we need to analyze parameters */
if(!strcmp(val, ":attr")) {
co->pctx->p_attr = atoi(var);
} else if(!strcmp(val, ":uid")) {
co->pctx->uid = (ulong_t)atoll(var);
} else if(!strcmp(val, ":gid")) {
co->pctx->gid = (ulong_t)atoll(var);
} else {
/* just ignore in default implementation */
}
} else continue; /* ignore */
}
__finish:
destroy_sexp(sx);
return r;
}
static int __default_auth_set_error(void *cctx, sexp_t *sx)
{
char *errstr = NULL;
int r;
/* skip keyword itself */
sx->list = sx->list->next;
/* be sure - this is a list */
if(sx->ty != SEXP_LIST) return ESXRCBADPROT;
else sx = sx->list; /* get it */
errstr = sx->list->val;
r = atoi(errstr);
destroy_sexp(sx);
return r;
}
static int __default_ch_get_types(void *cctx, sexp_t *sx)
{
conn_t *co = (conn_t *)cctx;
usrtc_node_t *node;
rpc_typed_list_t *list_ent;
char *tbuf = malloc(4096), *tt;
int err = 0;
/* if we cannot allocate anything ... */
if(!tbuf) return ENOMEM;
/* ok here we go */
co->rpc_list = conn_sys->get_rpc_typed_list_tree(co);
/* ok, here we're don't need to parse anything */
if(!usrtc_count(co->rpc_list)) {
err = ENXIO;
snprintf(tbuf, 4096, "(ch-gl-error (%d))", err);
} else {
tt = tbuf;
snprintf(tt, 4096, "(ch-set-types (");
tt += strlen(tt);
for(node = usrtc_first(co->rpc_list); node != NULL;
node = usrtc_next(co->rpc_list, node), tt += strlen(tt)) {
list_ent = (rpc_typed_list_t *)usrtc_node_getdata(node);
snprintf(tt, 4096, "(:%d \"%s\")", list_ent->type_id, list_ent->description);
}
snprintf(tt, 4096, "))");
}
/* reply to this rpc */
if(__conn_write(co, tbuf, strlen(tbuf)) < 0) {
co->flags &= ~CXCONN_ESTABL;
co->flags |= CXCONN_BROKEN;
__wake_up_waiters(co, ESXNOCONNECT);
}
free(tbuf);
destroy_sexp(sx);
return err;
}
static int __default_ch_set_types(void *cctx, sexp_t *sx)
{
conn_t *co = (conn_t *)cctx;
char buf[1024], *val, *var;
int r = 0, llen, typeid, idx;
sexp_t *lsx, *sx_iter, *sx_in;
/* skip keyword itself */
lsx = sx->list->next;
/* now we expect a list of lists */
if(lsx->ty != SEXP_LIST) {
r = ESXRCBADPROT;
goto __send_reply;
}
/* take length of the list */
llen = sexp_list_length(lsx);
if(!llen) return 0; /* other side will not set any security attributes */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
sexp_list_car(sx_iter, &sx_in);
if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) {
r = ESXRCBADPROT;
goto __send_reply;
} else val = sx_in->val;
10 years ago
if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */
10 years ago
sexp_list_cdr(sx_iter, &sx_in);
10 years ago
if(!SEXP_IS_TYPE(sx_in, SEXP_DQUOTE)) {
r = ESXRCBADPROT;
goto __send_reply;
} else var = sx_in->val;
10 years ago
/* ok, now we need to analyze parameters */
if(*val != ':') {
r = ESXRCBADPROT;
goto __send_reply;
} else {
if(conn_sys->set_typed_list_callback) {
typeid = atoi((char *)(val + sizeof(char)));
if(conn_sys->set_typed_list_callback(co, typeid, var)) {
destroy_sexp(sx);
return ENXIO;
}
} /* FIXME: if no function, accept or decline ? */
}
} else continue; /* ignore */
}
__send_reply:
snprintf(buf, 1024, "(ch-gl-error (%d))", r);
if(__conn_write(co, buf, strlen(buf)) < 0) {
co->flags &= ~CXCONN_ESTABL;
co->flags |= CXCONN_BROKEN;
__wake_up_waiters(co, ESXNOCONNECT);
}
destroy_sexp(sx);
return r;
}
static int __default_ch_gl_error(void *cctx, sexp_t *sx)
{
int r;
char *errstr;
conn_t *co = (conn_t *)cctx;
if(co->flags & CXCONN_ESTABL) return EINVAL; /* error, we're already have channels list */
/* skip keyword itself */
sx->list = sx->list->next;
/* be sure - this is a list */
if(sx->ty != SEXP_LIST) return ESXRCBADPROT;
else sx = sx->list; /* get it */
errstr = sx->list->val;
r = atoi(errstr);
if(!r) co->flags |= CXCONN_ESTABL;
return r;
}
static int __default_ch_open(void *cctx, sexp_t *sx)
{
conn_t *co = (conn_t *)cctx;
usrtc_node_t *node;
char *val, *var, *uuid = NULL, *buf;
int typ = -1, idx, llen, r;
ulong_t cid;
sexp_t *lsx, *sx_iter, *sx_in;
rpc_typed_list_t *rlist;
chnl_t *channel;
/* skip keyword itself */
lsx = sx->list->next;
/* now we expect a list of lists */
if(lsx->ty != SEXP_LIST) {
printf("%s:%d\n", __FUNCTION__, __LINE__);
r = ESXRCBADPROT;
goto __send_repl;
}
/* take length of the list */
llen = sexp_list_length(lsx);
if(!llen) return 0; /* other side will not set any security attributes */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
sexp_list_car(sx_iter, &sx_in);
if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) {
printf("%s:%d\n", __FUNCTION__, __LINE__);
r = ESXRCBADPROT;
goto __send_repl;
} else val = sx_in->val;
if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */
sexp_list_cdr(sx_iter, &sx_in);
if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) {
r = ESXRCBADPROT;
printf("%s:%d\n", __FUNCTION__, __LINE__);
goto __send_repl;
} else var = sx_in->val;
/* ok, now we need to analyze parameters */
if(*val != ':') {
r = ESXRCBADPROT;
goto __send_repl;
} else {
if(!strcmp((char *)(val + sizeof(char)), "type"))
typ = atoi(var);
else if(!strcmp((char *)(val + sizeof(char)), "id"))
cid = atoll(var);
else if(!strcmp((char *)(val + sizeof(char)), "uuid"))
uuid = var;
}
} else continue; /* ignore */
}
/* additional check for type of the channel */
node = usrtc_lookup(co->rpc_list, &typ);
if(!node) {
r = ESXNOCHANSUP;
/* printf("%s:%d (usrtc count: %d) (typ %d)\n", __FUNCTION__, __LINE__,
usrtc_count(co->rpc_list), typ);*/
node = usrtc_first(co->rpc_list);
rlist = (rpc_typed_list_t *)usrtc_node_getdata(node);
printf("---- rlist->type_id = %d\n", rlist->type_id);
goto __send_repl;
} else rlist = (rpc_typed_list_t *)usrtc_node_getdata(node);
/* now we need to check up the channel */
pthread_rwlock_rdlock(&(co->chnl_lock));
node = usrtc_lookup(co->chnl_tree, &cid);
if(node) {
pthread_rwlock_unlock(&(co->chnl_lock));
r = EEXIST;
goto __send_repl;
} else {
idx_reserve(co->idx_ch, cid);
pthread_rwlock_unlock(&(co->chnl_lock)); /* now we should alloc channel */
if((r = __alloc_channel(cid, co, rlist, &channel))) {
pthread_rwlock_wrlock(&(co->chnl_lock));
idx_free(co->idx_ch, cid);
pthread_rwlock_unlock(&(co->chnl_lock));
goto __send_repl;
} else {
/* now we ready to confirm channel creation */
pthread_rwlock_wrlock(&(co->chnl_lock));
usrtc_insert(co->chnl_tree, &(channel->node), &(channel->cid));
pthread_rwlock_unlock(&(co->chnl_lock));
r = 0;
}
}
__send_repl:
buf = malloc(2048);
snprintf(buf, 2048, "(ch-open-ret ((:error %d)(:uuid %s)(:id %ld)))", r,
uuid, cid);
if(__conn_write(co, buf, strlen(buf)) < 0) {
co->flags &= ~CXCONN_ESTABL;
co->flags |= CXCONN_BROKEN;
__wake_up_waiters(co, ESXNOCONNECT);
}
destroy_sexp(sx);
free(buf);
return r;
}
static int __default_ch_open_ret(void *cctx, sexp_t *sx)
{
conn_t *co = (conn_t *)cctx;
chnl_t *chan;
usrtc_node_t *node;
int err = 0, r, llen, idx;
ulong_t id;
char *val, *var;
sexp_t *lsx, *sx_iter, *sx_in;
sxmsg_t *sms = NULL;
/* skip keyword itself */
lsx = sx->list->next;
/* now we expect a list of lists */
if(lsx->ty != SEXP_LIST) {
10 years ago
//printf("%s:%d\n", __FUNCTION__, __LINE__);
r = ESXRCBADPROT;
goto __mark_msg;
}
/* take length of the list */
llen = sexp_list_length(lsx);
if(!llen) return EINVAL; /* !! other side will not set any security attributes */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
sexp_list_car(sx_iter, &sx_in);
if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) {
r = ESXRCBADPROT;
goto __mark_msg;
} else val = sx_in->val;
if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */
sexp_list_cdr(sx_iter, &sx_in);
if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) {
r = ESXRCBADPROT;
goto __mark_msg;
} else var = sx_in->val;
/* ok, now we need to analyze parameters */
if(*val != ':') {
r = ESXRCBADPROT;
goto __mark_msg;
} else {
if(!strcmp((char *)(val + sizeof(char)), "error"))
err = atoi(var);
else if(!strcmp((char *)(val + sizeof(char)), "id"))
id = atoll(var);
}
} else continue; /* ignore */
}
/* try to find desired channel to intercept message */
pthread_rwlock_rdlock(&(co->chnl_lock));
node = usrtc_lookup(co->chnl_tree, (void *)&id);
//printf("channels (%d)\n", usrtc_count(co->chnl_tree));
pthread_rwlock_unlock(&(co->chnl_lock));
if(node) {
chan = (chnl_t *)usrtc_node_getdata(node);
sms = chan->sysmsg;
}
__mark_msg:
if(!sms) return r;
sms->flags &= ~ESXMSG_PENDING; /* the message is done */
sms->opcode = err;
destroy_sexp(sx);
/* unlock mutex to wake up the waiting thread */
pthread_mutex_unlock(&(sms->wait));
return 0;
}
static int __default_ch_close(void *cctx, sexp_t *sx)
{
conn_t *co = (conn_t *)cctx;
usrtc_node_t *node;
char *val, *var, *buf;
int idx, llen, r;
ulong_t cid = -1;
sexp_t *lsx, *sx_iter, *sx_in;
chnl_t *channel = NULL;
r = 0;
/* skip keyword itself */
lsx = sx->list->next;
/* now we expect a list of lists */
if(lsx->ty != SEXP_LIST) {
printf("%s:%d\n", __FUNCTION__, __LINE__);
r = ESXRCBADPROT;
goto __send_repl;
}
/* take length of the list */
llen = sexp_list_length(lsx);
if(!llen) return 0; /* other side will not set any security attributes */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
continue;
}
if(!SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) {
printf("%s:%d\n", __FUNCTION__, __LINE__);
r = ESXRCBADPROT;
goto __send_repl;
} else val = sx_iter->val;
sx_in = sx_iter->next;
if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) {
r = ESXRCBADPROT;
printf("%s:%d\n", __FUNCTION__, __LINE__);
goto __send_repl;
} else var = sx_in->val;
/* ok, now we need to analyze parameters */
if(*val != ':') {
r = ESXRCBADPROT;
goto __send_repl;
} else {
if(!strcmp((char *)(val + sizeof(char)), "id")) {
cid = atoll(var);
break;
}
}
}
//printf("%s(%ld)\n", __FUNCTION__, cid);
/* additional check for type of the channel */
pthread_rwlock_rdlock(&(co->chnl_lock));
node = usrtc_lookup(co->chnl_tree, &cid);
pthread_rwlock_unlock(&(co->chnl_lock));
if(!node) {
r = ENOENT;
printf("there is no channel with id=%ld\n", cid);
goto __send_repl;
}
channel = (chnl_t *)usrtc_node_getdata(node);
__send_repl:
buf = malloc(2048);
snprintf(buf, 2048, "(ch-close-ret ((:id %ld) (:error %d)))",
channel->cid, r);
/* check up the message queue */
pthread_rwlock_rdlock(&(channel->msglock));
if(usrtc_count(channel->msgs_tree)) {
fprintf(stderr, "Operation is not permitted. There are some "
"undelivered messages in the message tree");
free(buf);
destroy_sexp(sx);
return EPERM;
}
pthread_rwlock_unlock(&(channel->msglock));
/* remove channel from the search tree */
pthread_rwlock_wrlock(&(co->chnl_lock));
usrtc_delete(co->chnl_tree, &(channel->node));
/* free index */
idx_free(co->idx_ch, channel->cid);
pthread_rwlock_unlock(&(co->chnl_lock));
idx_allocator_destroy(channel->idx_msg);
free(channel->idx_msg);
free(channel->msgs_tree);
pthread_mutex_destroy(&(channel->oplock));
pthread_rwlock_destroy(&(channel->msglock));
free(channel);
destroy_sexp(sx);
if(__conn_write(co, buf, strlen(buf)) < 0) {
co->flags &= ~CXCONN_ESTABL;
co->flags |= CXCONN_BROKEN;
__wake_up_waiters(co, ESXNOCONNECT);
}
free(buf);
return 0;
}
static int __default_ch_close_ret(void *cctx, sexp_t *sx)
{
;
conn_t *co = (conn_t *)cctx;
chnl_t *chan;
usrtc_node_t *node;
int err = 0, r, llen, idx;
ulong_t id;
char *val, *var;
sexp_t *lsx, *sx_iter, *sx_in;
sxmsg_t *sms = NULL;
/* skip keyword itself */
lsx = sx->list->next;
/* now we expect a list of lists */
if(lsx->ty != SEXP_LIST) {
//printf("%s:%d\n", __FUNCTION__, __LINE__);
r = ESXRCBADPROT;
goto __mark_msg;
}
/* take length of the list */
llen = sexp_list_length(lsx);
if(!llen) return EINVAL; /* !! other side will not set any security attributes */
;
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
sexp_list_car(sx_iter, &sx_in);
if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) {
r = ESXRCBADPROT;
goto __mark_msg;
} else val = sx_in->val;
if(sexp_list_length(sx_iter) < 2) continue; /* we will ignore it */
sexp_list_cdr(sx_iter, &sx_in);
if(!SEXP_IS_TYPE(sx_in, SEXP_BASIC)) {
r = ESXRCBADPROT;
goto __mark_msg;
} else var = sx_in->val;
/* ok, now we need to analyze parameters */
if(*val != ':') {
r = ESXRCBADPROT;
goto __mark_msg;
} else {
if(!strcmp((char *)(val + sizeof(char)), "error"))
err = atoi(var);
else if(!strcmp((char *)(val + sizeof(char)), "id"))
id = atoll(var);
}
} else continue; /* ignore */
}
;
/* try to find desired channel to intercept message */
pthread_rwlock_rdlock(&(co->chnl_lock));
node = usrtc_lookup(co->chnl_tree, (void *)&id);
//printf("channels (%d)\n", usrtc_count(co->chnl_tree));
pthread_rwlock_unlock(&(co->chnl_lock));
if(node) {
chan = (chnl_t *)usrtc_node_getdata(node);
sms = chan->sysmsg;
}
;
__mark_msg:
;
if(!sms) return r;
sms->flags &= ~ESXMSG_PENDING; /* the message is done */
sms->opcode = err;
destroy_sexp(sx);
/* unlock mutex to wake up the waiting thread */
pthread_mutex_unlock(&(sms->wait));
return 0;
}
/* create a nould of the message */
static int __create_reg_msg_mould(sxmsg_t **msg, chnl_t *ch, ulong_t mid)
{
int r = 0;
sxmsg_t *sm = __allocate_msg(&r);
if(r) return r;
else {
sm->pch = ch;
sm->flags = (ESXMSG_USR | ESXMSG_PENDING);
sm->mid = mid;
/* ok reserve message ID */
pthread_mutex_lock(&(ch->oplock));
idx_reserve(ch->idx_msg, mid);
pthread_mutex_unlock(&(ch->oplock));
pthread_mutex_lock(&(sm->wait));
*msg = sm;
}
return 0;
}
// TODO: check and continue
static int __default_msg_pulse(void *cctx, sexp_t *sx)
{
conn_t *co = (conn_t *)cctx;
usrtc_node_t *node = NULL;
chnl_t *chan = NULL;
int r = 0;
sexp_t *lsx = NULL, *sx_iter = NULL;
sexp_t *sx_sublist = NULL, *sx_value = NULL;
ulong_t chnl_id = -1;
ulong_t msg_id = -1;
sexp_t *msg = NULL;
sxmsg_t *smsg = NULL;
int idx;
/* get parameters from the message */
if(sexp_list_cdr(sx, &lsx)) return ESXRCBADPROT;
if(!SEXP_IS_LIST(lsx)) return ESXRCBADPROT;
/* find channel id */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
sx_sublist = sx_iter;
continue;
} else {
if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) {
if(strcmp(sx_iter->val, ":chid")) {
continue; // ignore it
}
sx_value = sx_iter->next;
if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) {
continue;
}
chnl_id = atol(sx_value->val);
} else continue; // ignore it
}
}
lsx = sx_sublist;
/* find message id */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
msg = sx_iter;
continue;
} else {
if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) {
if(strcmp(sx_iter->val, ":msgid")) {
continue; // ignore
}
sx_value = sx_iter->next;
if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) {
continue;
}
msg_id = atol(sx_value->val);
} else continue; // ignore it
}
}
if(msg_id < 0 || chnl_id < 0) {
return ESXRCBADPROT;
}
/* find channel */
if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT;
else chan = (chnl_t *)usrtc_node_getdata(node);
/* lookup for the message */
pthread_rwlock_rdlock(&(chan->msglock));
if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) {
pthread_rwlock_unlock(&(chan->msglock));
/* here, rpc lookup has no sense, just put this job to the queue */
/* btw, we're need to create a message first */
r = __create_reg_msg_mould(&smsg, chan, msg_id);
if(r) return r;
/* assign the message */
smsg->opcode = 0;
smsg->payload = (void *)msg;
/* assign initial S-expression structure */
smsg->initial_sx = sx;
/* put the message to the search tree */
pthread_rwlock_wrlock(&(chan->msglock));
usrtc_insert(chan->msgs_tree, &(smsg->pendingq_node), &(smsg->mid));
pthread_rwlock_unlock(&(chan->msglock));
} else {
//printf(">>>>>>>>>>>>>>>>>>>msg_id = %lu\n", msg_id);
pthread_rwlock_unlock(&(chan->msglock));
smsg = (sxmsg_t *)usrtc_node_getdata(node);
msg_return(smsg, EEXIST);
return EEXIST;
}
/* put job to the queue and give up */
r = pth_queue_add(co->rqueue, (void *)smsg, USR_MSG);
if(r) { /* cannot put job to the queue */
pthread_rwlock_wrlock(&(chan->msglock));
usrtc_delete(chan->msgs_tree, &(smsg->pendingq_node));
pthread_rwlock_unlock(&(chan->msglock));
__destroy_msg(smsg);
return r;
}
//msg_return(smsg, r);
/* put to the IN queue */
return r;
}
static int __default_msg_pulse_ret(void *cctx, sexp_t *sx)
{
return 0;
}
static int __default_msg(void *cctx, sexp_t *sx)
{
conn_t *co = (conn_t *)cctx;
usrtc_node_t *node = NULL;
chnl_t *chan = NULL;
int r = 0;
sexp_t *lsx = NULL, *sx_iter = NULL;
sexp_t *sx_sublist = NULL, *sx_value = NULL;
ulong_t chnl_id = -1;
ulong_t msg_id = -1;
sexp_t *msg = NULL;
sxmsg_t *smsg = NULL;
int idx;
/* get parameters from the message */
if(sexp_list_cdr(sx, &lsx)) return ESXRCBADPROT;
if(!SEXP_IS_LIST(lsx)) return ESXRCBADPROT;
/* find channel id */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
sx_sublist = sx_iter;
continue;
} else {
if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) {
if(strcmp(sx_iter->val, ":chid")) {
continue; // ignore it
}
sx_value = sx_iter->next;
if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) {
continue;
}
chnl_id = atol(sx_value->val);
} else continue; // ignore it
}
}
lsx = sx_sublist;
/* find message id */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
msg = sx_iter;
continue;
} else {
if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) {
if(strcmp(sx_iter->val, ":msgid")) {
continue; // ignore
}
sx_value = sx_iter->next;
if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) {
continue;
}
msg_id = atol(sx_value->val);
} else continue; // ignore it
}
}
if(msg_id < 0 || chnl_id < 0) {
return ESXRCBADPROT;
}
/* find channel */
if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT;
else chan = (chnl_t *)usrtc_node_getdata(node);
/* lookup for the message */
pthread_rwlock_rdlock(&(chan->msglock));
if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) {
pthread_rwlock_unlock(&(chan->msglock));
/* here, rpc lookup has no sense, just put this job to the queue */
/* btw, we're need to create a message first */
r = __create_reg_msg_mould(&smsg, chan, msg_id);
if(r) return r;
/* assign the message */
smsg->opcode = 0;
smsg->payload = (void *)msg;
/* assign initial S-expression structure */
smsg->initial_sx = sx;
/* put the message to the search tree */
pthread_rwlock_wrlock(&(chan->msglock));
usrtc_insert(chan->msgs_tree, &(smsg->pendingq_node), &(smsg->mid));
pthread_rwlock_unlock(&(chan->msglock));
} else {
//printf(">>>>>>>>>>>>>>>>>>>msg_id = %lu\n", msg_id);
pthread_rwlock_unlock(&(chan->msglock));
smsg = (sxmsg_t *)usrtc_node_getdata(node);
msg_return(smsg, EEXIST);
return EEXIST;
}
/* put job to the queue and give up */
r = pth_queue_add(co->rqueue, (void *)smsg, USR_MSG);
if(r) { /* cannot put job to the queue */
msg_return(smsg, r);
pthread_rwlock_wrlock(&(chan->msglock));
usrtc_delete(chan->msgs_tree, &(smsg->pendingq_node));
pthread_rwlock_unlock(&(chan->msglock));
__destroy_msg(smsg);
return r;
}
/* put to the IN queue */
return r;
}
/* TODO: optimize amount of code: (must be first)
* __default_msg_return
* __default_msg_reply
* there are many copy-n-paste code!!!!!
*/
static int __default_msg_return(void *cctx, sexp_t *sx)
{
conn_t *co = (conn_t *)cctx;
usrtc_node_t *node = NULL;
chnl_t *chan = NULL;
int r = 0;
sexp_t *lsx = NULL, *sx_iter = NULL;
sexp_t *sx_sublist = NULL, *sx_value = NULL;
ulong_t chnl_id = -1;
ulong_t msg_id = -1;
sexp_t *msg = NULL;
sxmsg_t *smsg = NULL;
int idx, opcode;
/* get parameters from the message */
if(sexp_list_cdr(sx, &lsx)) {
r = ESXRCBADPROT;
goto __finish;
}
if(!SEXP_IS_LIST(lsx)) {
r = ESXRCBADPROT;
goto __finish;
}
/* get parameters */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
sx_sublist = sx_iter;
continue;
} else {
if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) {
if(strcmp(sx_iter->val, ":chid")) {
continue; // ignore it
}
sx_value = sx_iter->next;
if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) {
continue;
}
chnl_id = atol(sx_value->val);
} else continue; // ignore it
}
}
lsx = sx_sublist;
/* find message id */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
msg = sx_iter;
continue;
} else {
if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) {
if(strcmp(sx_iter->val, ":msgid")) {
continue; // ignore
}
sx_value = sx_iter->next;
if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) {
continue;
}
msg_id = atol(sx_value->val);
} else continue; // ignore it
}
}
/* get opcode */
sexp_list_car(msg, &lsx);
opcode = atoi(lsx->val);
if(msg_id < 0 || chnl_id < 0) {
r = ESXRCBADPROT;
goto __finish;
}
//printf("chnl_id = %ld\n", chnl_id);
if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT;
else chan = (chnl_t *)usrtc_node_getdata(node);
/* lookup for the message */
pthread_rwlock_rdlock(&(chan->msglock));
if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) {
pthread_rwlock_unlock(&(chan->msglock));
r = ENOENT;
goto __finish;
} else {
pthread_rwlock_unlock(&(chan->msglock));
smsg = (sxmsg_t *)usrtc_node_getdata(node);
smsg->opcode = opcode;
if(smsg->flags & ESXMSG_ISREPLY)
destroy_sexp((sexp_t *)smsg->payload);
smsg->payload = NULL;
smsg->flags |= ESXMSG_CLOSURE;
10 years ago
/* Q: can we remove the message from the tree there??? */
/* A: actually no */
/* first remove the message from tree */
if(smsg->flags & ESXMSG_RMONRETR) {
pthread_rwlock_wrlock(&(chan->msglock));
usrtc_delete(chan->msgs_tree, &(smsg->pendingq_node));
pthread_rwlock_unlock(&(chan->msglock));
}
pthread_mutex_unlock(&(smsg->wait));
}
10 years ago
__finish:
destroy_sexp(sx);
return r;
}
static int __default_msg_reply(void *cctx, sexp_t *sx)
{
conn_t *co = (conn_t *)cctx;
usrtc_node_t *node = NULL;
chnl_t *chan = NULL;
int r = 0;
sexp_t *lsx = NULL, *sx_iter = NULL;
sexp_t *sx_sublist = NULL, *sx_value = NULL;
ulong_t chnl_id = -1;
ulong_t msg_id = -1;
sexp_t *msg = NULL;
sxmsg_t *smsg = NULL;
int idx;
/* get parameters from the message */
if(sexp_list_cdr(sx, &lsx)) {
r = ESXRCBADPROT;
goto __finish;
}
if(!SEXP_IS_LIST(lsx)) {
r = ESXRCBADPROT;
goto __finish;
}
/* get parameters */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
sx_sublist = sx_iter;
continue;
} else {
if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) {
if(strcmp(sx_iter->val, ":chid")) {
continue; // ignore it
}
sx_value = sx_iter->next;
if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) {
continue;
}
chnl_id = atol(sx_value->val);
} else continue; // ignore it
}
}
lsx = sx_sublist;
/* find message id */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
msg = sx_iter;
continue;
} else {
if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) {
if(strcmp(sx_iter->val, ":msgid")) {
continue; // ignore
}
sx_value = sx_iter->next;
if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) {
continue;
}
msg_id = atol(sx_value->val);
} else continue; // ignore it
}
}
if(msg_id < 0 || chnl_id < 0) {
r = ESXRCBADPROT;
goto __finish;
}
if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT;
else chan = (chnl_t *)usrtc_node_getdata(node);
/* lookup for the message */
pthread_rwlock_rdlock(&(chan->msglock));
if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) {
pthread_rwlock_unlock(&(chan->msglock));
r = ENOENT;
goto __finish;
} else {
pthread_rwlock_unlock(&(chan->msglock));
smsg = (sxmsg_t *)usrtc_node_getdata(node);
smsg->opcode = SXOREPLYREQ;
smsg->payload = copy_sexp(msg);
smsg->flags |= ESXMSG_ISREPLY;
pthread_mutex_unlock(&(smsg->wait));
}
__finish:
destroy_sexp(sx);
return r;
}
static int __init_systemrpc_tree(usrtc_t *rtree)
{
/* security context functions */
if(__insert_rpc_function(rtree, "auth-set-context", __default_auth_set_context)) goto __fail;
if(__insert_rpc_function(rtree, "auth-set-attr", __default_auth_set_attr)) goto __fail;
if(__insert_rpc_function(rtree, "auth-set-error", __default_auth_set_error)) goto __fail;
/* channels negotiation ops */
if(__insert_rpc_function(rtree, "ch-get-types", __default_ch_get_types)) goto __fail;
if(__insert_rpc_function(rtree, "ch-gl-error", __default_ch_gl_error)) goto __fail;
if(__insert_rpc_function(rtree, "ch-set-types", __default_ch_set_types)) goto __fail;
if(__insert_rpc_function(rtree, "ch-open", __default_ch_open)) goto __fail;
if(__insert_rpc_function(rtree, "ch-open-ret", __default_ch_open_ret)) goto __fail;
if(__insert_rpc_function(rtree, "ch-close", __default_ch_close)) goto __fail;
if(__insert_rpc_function(rtree, "ch-close-ret", __default_ch_close_ret)) goto __fail;
/* messaging functions */
if(__insert_rpc_function(rtree, "ch-msg-pulse", __default_msg_pulse)) goto __fail;
if(__insert_rpc_function(rtree, "ch-msg-pulse-ret", __default_msg_pulse_ret)) goto __fail;
if(__insert_rpc_function(rtree, "ch-msg", __default_msg)) goto __fail;
if(__insert_rpc_function(rtree, "ch-msg-rete", __default_msg_return)) goto __fail;
if(__insert_rpc_function(rtree, "ch-msg-repl", __default_msg_reply)) goto __fail;
return 0;
__fail:
__destroy_rpc_list_tree(rtree);
return ENOMEM;
}
static int __eval_cstr(char *cstr, cx_rpc_list_t *rpc_list, void *ctx)
{
int r = ENOENT;
sexp_t *sx;
usrtc_node_t *node;
cx_rpc_t *rentry;
char *rpcf;
if(!(sx = parse_sexp(cstr, strlen(cstr)))) return EBADE;
if(sx->ty == SEXP_LIST)
rpcf = sx->list->val;
else rpcf = sx->val;
/* find an appropriate function */
node = usrtc_lookup(rpc_list->rpc_tree, rpcf);
if(!node) return ENOENT;
else rentry = (cx_rpc_t *)usrtc_node_getdata(node);
/* call it */
r = rentry->rpcf(ctx, sx);
return r;
}
static void *__cxslave_thread_listener(void *wctx)
{
conn_t *co = (conn_t *)wctx;
char *buf = malloc(4096);
int r;
while((r = __conn_read(co, buf, 4096)) != -1) {
buf[r] = '\0';
r = __eval_cstr(buf, conn_sys->system_rpc, co);
}
co->flags &= ~CXCONN_ESTABL;
co->flags |= CXCONN_BROKEN;
__wake_up_waiters(co, ESXNOCONNECT);
free(buf);
return NULL;
}
static void *__cxmaster_thread_listener(void *wctx)
{
conn_t *co = (conn_t *)wctx;
char *buf = malloc(4096);
int r;
while((r = __conn_read(co, buf, 4096)) != -1) {
buf[r] = '\0';
10 years ago
// if(r) printf("Got the message %s (%d bytes)\n", buf, r);
r = __eval_cstr(buf, conn_sys->system_rpc, co);
}
co->flags &= ~CXCONN_ESTABL;
co->flags |= CXCONN_BROKEN;
__wake_up_waiters(co, ESXNOCONNECT);
free(buf);
return NULL;
}
static void *__rmsg_queue_thread(void *ctx)
{
conn_t *co = (conn_t *)ctx;
pth_msg_t *tmp = malloc(sizeof(pth_msg_t));
sxmsg_t *msg;
chnl_t *ch;
int r = 0;
char *rpcf;
sexp_t *sx;
usrtc_node_t *node = NULL;
cx_rpc_t *rpccall;
struct __rpc_job *rjob = NULL;
if(!tmp) return NULL;
while(1) {
r = pth_queue_get(co->rqueue, NULL, tmp);
if(r) {
free(tmp);
return NULL;
}
msg = tmp->data;
if(!msg) continue; /* spurious !! */
/* check to right job */
if(!(msg->flags & ESXMSG_USR)) { /* not a regular message */
msg->flags |= ESXMSG_NOWAY; /* mark it's as undeliverable */
msg->flags &= ~ESXMSG_PENDING;
10 years ago
pthread_mutex_unlock(&(msg->wait)); /* wake up the waitee */
continue;
} else {
/* now we're need to have a deal with the rpc calling, other - we don't care */
ch = msg->pch;
sx = (sexp_t *)msg->payload;
/* get the function name */
if(sx->ty == SEXP_LIST) rpcf = sx->list->val;
else rpcf = sx->val;
node = usrtc_lookup(ch->rpc_list->rpc_tree, rpcf);
if(!node) {
r = ENOENT;
msg_return(msg, r);
} else {
rpccall = (cx_rpc_t *)usrtc_node_getdata(node);
/* call this ! */
rjob = malloc(sizeof(struct __rpc_job)); // TODO: check it
rjob->msg = msg;
rjob->sx = sx;
rjob->rpcf = rpccall->rpcf;
pth_dqtpoll_add(co->tpoll, (void *)rjob, USR_MSG); // TODO: check it
}
}
}
return NULL;
}
10 years ago
static void *__msg_queue_thread(void *ctx)
{
conn_t *co = (conn_t *)ctx;
pth_msg_t *tmp = malloc(sizeof(pth_msg_t));
sxmsg_t *msg;
chnl_t *ch;
int r = 0, len;
char *buf = malloc(4096), *tb;
sexp_t *sx;
if(!tmp || !buf) {
if(tmp) free(tmp);
if(buf) free(buf);
return NULL;
}
10 years ago
10 years ago
while(1) {
r = pth_queue_get(co->mqueue, NULL, tmp);
if(r) {
free(buf);
free(tmp);
return NULL;
}
;
10 years ago
/* message workout */
msg = tmp->data;
if(!msg) continue; /* spurious message */
10 years ago
if(!(msg->flags & ESXMSG_USR)) { /* not a regular message */
10 years ago
msg->flags |= ESXMSG_NOWAY; /* mark it's as undeliverable */
msg->flags &= ~ESXMSG_PENDING;
;
10 years ago
pthread_mutex_unlock(&(msg->wait)); /* wake up the waitee */
continue;
} else {
ch = msg->pch;
/* now we need to complete the request */
sx = (sexp_t *)msg->payload; tb = buf;
if(!(msg->flags & ESXMSG_PULSE)) { /* %s))) */
10 years ago
if(!(msg->flags & ESXMSG_ISREPLY))
snprintf(buf, 4096, "(ch-msg (:chid %lu (:msgid %lu ", ch->cid,
msg->mid);
else {
if(!sx) {
snprintf(buf, 4096, "(ch-msg-rete (:chid %lu (:msgid %lu (%d))))", ch->cid,
msg->mid, msg->opcode);
/* mark it to close */
msg->flags |= ESXMSG_CLOSURE;
/* ok, here we will write it and wait, destroying dialog while reply */
;
10 years ago
goto __ssl_write;
} else snprintf(buf, 4096, "(ch-msg-repl (:chid %lu (:msgid %lu ", ch->cid,
msg->mid);
}
;
10 years ago
len = strlen(buf);
tb += len*sizeof(char);
if(print_sexp(tb, 4096 - (len + 4*sizeof(char)), sx) == -1) {
10 years ago
msg->opcode = ENOMEM;
10 years ago
/* we don't need to wake up anybody */
10 years ago
if(msg->flags & ESXMSG_TIMEDOUT) {
10 years ago
/* clean up all the shit:
* 1. remove from message tree
* 2. destroy message itself
*/
destroy_sexp(msg->initial_sx);
msg->initial_sx = NULL;
msg->payload = NULL;
if(msg->flags & ESXMSG_ISREPLY)
destroy_sexp(msg->payload);
} else {
;
10 years ago
pthread_mutex_unlock(&(msg->wait));
}
10 years ago
}
} else { /* pulse messages */
10 years ago
/* here we're shouldn't process reply procedure */
snprintf(buf, 4096, "(ch-msg-pulse (:chid %lu (:msgid %lu ", ch->cid,
msg->mid);
len = strlen(buf); /* FIXME: code double shit ! */
tb += len*sizeof(char);
if(print_sexp(tb, 4096 - (len + 4*sizeof(char)), sx) == -1) {
10 years ago
msg->opcode = ENOMEM;
/* we don't need to wake up anybody */
if(msg->flags & ESXMSG_TIMEDOUT) {
/* clean up all the shit:
* 1. remove from message tree
* 2. destroy message itself
*/
destroy_sexp(msg->initial_sx);
msg->initial_sx = NULL;
msg->payload = NULL;
__destroy_msg(msg);
10 years ago
} else
pthread_mutex_unlock(&(msg->wait));
}
10 years ago
}
10 years ago
len = strlen(tb);
tb += len*sizeof(char);
strcat(tb, ")))");
10 years ago
10 years ago
__ssl_write:
/* write it */
if(__conn_write(co, (void *)buf, strlen(buf) + sizeof(char)) < 0) {
co->flags &= ~CXCONN_ESTABL;
co->flags |= CXCONN_BROKEN;
__wake_up_waiters(co, ESXNOCONNECT);
}
10 years ago
if(msg->flags & ESXMSG_CLOSURE) {
/* first remove the message from tree */
pthread_rwlock_wrlock(&(ch->msglock));
usrtc_delete(ch->msgs_tree, &(msg->pendingq_node));
pthread_rwlock_unlock(&(ch->msglock));
/* destroy */
destroy_sexp(msg->initial_sx);
if(msg->flags & ESXMSG_ISREPLY && msg->payload)
destroy_sexp((sexp_t *)msg->payload);
__destroy_msg(msg);
}
10 years ago
}
len = 0;
}
free(buf);
return NULL;
}
/* this function is an ugly implementation to get C string with uuid */
static char *__generate_uuid(void)
{
char *uuidc = NULL;
uuid_t uuid;
int len, i = 0;
len = sizeof(char)*(sizeof(uuid_t)*2) + sizeof(char);
if(!(uuidc = malloc(len))) return NULL;
uuid_generate_time_safe(uuid);
for(i = 0; i < sizeof(uuid_t); i++)
snprintf(uuidc+(2*i*sizeof(char)), len, "%02x", uuid[i]);
return uuidc;
}
/* this is a callback to perform a custom SSL certs chain validation,
* as I promised here the comments, a lot of ...
* The first shit: 0 means validation failed, 1 otherwise
* The second shit: X509 API, I guess u will love it ;-)
* openssl calls this function for each certificate in chain,
* since our case is a simple (depth of chain is one, since we're
* don't care for public certificates lists or I cannot find any reasons to
* do it ...), amount of calls reduced, and in this case we're interested
* only in top of chain i.e. actual certificate used on client side,
* the validity of signing for other certificates within chain is
* guaranteed by the ssl itself.
* u know, we need to lookup in database, or elsewhere... some information
* about client certificate, and decide - is it valid, or not?, if so
* yep I mean it's valid, we can assign it's long fucking number to
* security context, to use in ongoing full scaled connection handshaking.
*/
static int __verify_certcall(int preverify_ok, X509_STORE_CTX *ctx)
{
// X509 *cert = X509_STORE_CTX_get_current_cert(ctx);
int err = X509_STORE_CTX_get_error(ctx), depth = X509_STORE_CTX_get_error_depth(ctx);
SSL *ssl = X509_STORE_CTX_get_ex_data(ctx, SSL_get_ex_data_X509_STORE_CTX_idx());
conn_t *co = SSL_get_ex_data(ssl, conn_sys->ex_ssldata_index); /* this is a custom data we're set before */
/* now we need to check for certificates with a long chain,
* so since we have a short one, reject long ones */
if(depth > VERIFY_DEPTH) { /* longer than we expect */
preverify_ok = 0; /* yep, 0 means error for those function callback in openssl, fucking set */
err = X509_V_ERR_CERT_CHAIN_TOO_LONG;
X509_STORE_CTX_set_error(ctx, err);
}
if(!preverify_ok) return 0;
/* ok, now we're on top of SSL (depth == 0) certs chain,
* and we can validate client certificate */
if(!depth) {
co->pctx = malloc(sizeof(perm_ctx_t));
co->pctx->certid =
ASN1_INTEGER_get((const ASN1_INTEGER *)X509_get_serialNumber(ctx->current_cert));
printf("Certificate ID: %lu\n", co->pctx->certid);
/* now we're need to check the ssl cert */
if(conn_sys->validate_sslpem) {
if(conn_sys->validate_sslpem(co)) return 0;
else return 1;
} else return 0;
}
return preverify_ok;
}
/* dummy just to check the server side */
static int __verify_certcall_dummy(int preverify_ok, X509_STORE_CTX *ctx)
{
return preverify_ok;
}
/* subsystem: here u can told me about how it's ugly to use global pointers,
* yep, it's a business of fucking morons, btw it works (heh, openssl uses this
* ancient shit method too, many many and many others too, trust me ...).
* subsystem required to define varios RPC lists, control list for connections,
* general queues, certificates (all connections uses the same set of certificates
* within application), general calls such as ... calls to get info about client
* cert and ... many other things.
*/
void *__system_queue_listener(void *data)
{
int r;
pth_msg_t *tmp = malloc(sizeof(pth_msg_t));
sxmsg_t *sysmsg;
sxpayload_t *payload;
chnl_t *chan;
conn_t *co;
if(!tmp) return NULL;
while(1) {
r = pth_queue_get(conn_sys->ioqueue, NULL, tmp);
if(r) {
free(tmp);
return NULL;
}
/* ok message is delivered */
sysmsg = tmp->data;
if(!sysmsg) continue; /* ignore dummy messages */
if(!(sysmsg->flags & ESXMSG_SYS)) { /* not a system message */
sysmsg->flags |= ESXMSG_NOWAY; /* mark it's as undeliverable */
10 years ago
sysmsg->flags &= ~ESXMSG_PENDING;
pthread_mutex_unlock(&(sysmsg->wait)); /* wake up the waitee */
continue;
} else {
chan = sysmsg->pch;
co = chan->connection;
payload = (sxpayload_t *)sysmsg->payload;
/* write the buf */
if(__conn_write(co, (void *)payload->cstr, strlen(payload->cstr) + 1) < 0) {
co->flags &= ~CXCONN_ESTABL;
co->flags |= CXCONN_BROKEN;
__wake_up_waiters(co, ESXNOCONNECT);
}
}
}
return NULL;
}
/* general initialization must be called within app uses connection layer */
int connections_subsystem_init(void)
{
int r = 0;
if(!(conn_sys = malloc(sizeof(conn_sys_t)))) return ENOMEM;
else if(!(conn_sys->connections = malloc(sizeof(usrtc_t)))) {
r = ENOMEM;
goto __fail;
}
/* zeroing */
conn_sys->rootca = conn_sys->certkey = conn_sys->certpem = NULL;
conn_sys->validate_sslpem = NULL;
conn_sys->secure_check = NULL;
/* init connections list */
usrtc_init(conn_sys->connections, USRTC_REDBLACK, MAX_CONNECTIONS,
__cmp_cstr);
if((r = pthread_rwlock_init(&(conn_sys->rwlock), NULL)))
goto __fail_1;
/* init queues */
if(!(conn_sys->ioq = malloc(sizeof(pth_queue_t)))) { /* general io queue */
r = ENOMEM;
goto __fail_2;
}
if((r = pth_queue_init(conn_sys->ioq))) goto __fail_3;
if(!(conn_sys->ioqueue = malloc(sizeof(pth_queue_t)))) { /* system io queue */
r = ENOMEM;
goto __fail_2;
}
if((r = pth_queue_init(conn_sys->ioqueue))) goto __fail_3_1;
/* init SSL certificates checking functions */
/* init RPC list related functions */
if(!(conn_sys->system_rpc = malloc(sizeof(cx_rpc_list_t)))) {
r = ENOMEM;
goto __fail_3;
} else {
if(!(conn_sys->system_rpc->rpc_tree = malloc(sizeof(usrtc_t)))) {
r = ENOMEM;
__fail_rpc:
free(conn_sys->system_rpc);
goto __fail_3_1;
}
usrtc_init(conn_sys->system_rpc->rpc_tree, USRTC_SPLAY, 256, __cmp_cstr);
r = __init_systemrpc_tree(conn_sys->system_rpc->rpc_tree);
if(r) {
free(conn_sys->system_rpc->rpc_tree);
goto __fail_rpc;
}
}
/* init SSL library */
SSL_library_init();
OpenSSL_add_all_algorithms();
SSL_load_error_strings();
conn_sys->ex_ssldata_index = SSL_get_ex_new_index(0, "__ssldata index", NULL, NULL, NULL);
/* create threads for queue */
if((r = pthread_create(&conn_sys->ios_thread, NULL, __system_queue_listener, NULL))) {
goto __fail_rpc;
}
return 0;
__fail_3_1:
free(conn_sys->ioqueue);
__fail_3:
free(conn_sys->ioq);
__fail_2:
pthread_rwlock_destroy(&(conn_sys->rwlock));
__fail_1:
free(conn_sys->connections);
__fail:
free(conn_sys);
return r;
}
/* load certificates */
int connections_subsystem_setsslserts(const char *rootca, const char *certpem,
const char *certkey)
{
int r = ENOMEM;
if(!conn_sys) return EINVAL;
/* simply copying */
if(!(conn_sys->rootca = strdup(rootca))) return ENOMEM;
if(!(conn_sys->certkey = strdup(certkey))) goto __fail;
if(!(conn_sys->certpem = strdup(certpem))) goto __fail;
r = 0;
return 0;
__fail:
if(conn_sys->rootca) free(conn_sys->rootca);
if(conn_sys->certkey) free(conn_sys->certkey);
if(conn_sys->certpem) free(conn_sys->certpem);
return r;
}
int connections_subsystem_setrpclist_function(usrtc_t* (*get_rpc_typed_list_tree)(conn_t *))
{
conn_sys->get_rpc_typed_list_tree = get_rpc_typed_list_tree;
return 0;
}
#define __TMPBUFLEN 2048
/* connection_initiate: perform a connection thru the socket to the
* host with master certificate, i.e. it's a slave one for client.
*/
int connection_initiate(conn_t *co, const char *host, int port,
const char *SSL_cert, perm_ctx_t *pctx)
{
int r = 0, sd;
int bytes = 0;
char *uuid;
char *buf = NULL;
struct hostent *host_;
struct sockaddr_in addr;
usrtc_t *ch_tree, *rpc_tree;
pth_queue_t *mqueue = malloc(sizeof(pth_queue_t));
pth_dqtpoll_t *tpoll = malloc(sizeof(pth_dqtpoll_t));
idx_allocator_t *idx_ch = malloc(sizeof(idx_allocator_t));
if(!mqueue) {
__fallenomem:
r = ENOMEM;
__fall0:
if(mqueue) free(mqueue);
if(tpoll) free(tpoll);
if(idx_ch) free(idx_ch);
return r;
}
if(!tpoll) goto __fallenomem;
if(!idx_ch) goto __fallenomem;
if(!co) {
__falleinval:
r = EINVAL;
goto __fall0;
}
if(!host) goto __falleinval;
if(!SSL_cert) goto __falleinval;
if(!pctx) goto __falleinval;
memset(co, 0, sizeof(conn_t));
pth_dqtpoll_init(tpoll, __rpc_callback);
if(!idx_ch) return ENOMEM;
else r = idx_allocator_init(idx_ch, MAX_CHANNELS*MAX_MULTI, 0);
if(r) return r;
if(!(uuid = __generate_uuid())) return ENOMEM;
if(!(ch_tree = malloc(sizeof(usrtc_t)))) {
r = ENOMEM;
goto __fail;
}
if(!(rpc_tree = malloc(sizeof(usrtc_t)))) {
r = ENOMEM;
goto __fail_1;
}
if((r = pthread_mutex_init(&co->oplock, NULL))) goto __fail_2;
if((r = pthread_rwlock_init(&co->chnl_lock, NULL))) goto __fail_3;
usrtc_init(rpc_tree, USRTC_REDBLACK, MAX_RPC_LIST, __cmp_int);
usrtc_init(ch_tree, USRTC_REDBLACK, MAX_CHANNELS, __cmp_ulong);
co->idx_ch = idx_ch;
/* assign message queue */
if((r = pth_queue_init(mqueue))) goto __fail_3;
co->mqueue = mqueue;
/* init SSL certificates and context */
co->ctx = SSL_CTX_new(TLSv1_2_client_method());
if(!co->ctx) { ERR_print_errors_fp(stderr);
r = EINVAL; goto __fail_3; }
else {
SSL_CTX_set_verify(co->ctx, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT,
__verify_certcall_dummy);
SSL_CTX_set_verify_depth(co->ctx, 1); /* FIXME: use configuration */
}
/* load certificates */
SSL_CTX_load_verify_locations(co->ctx, conn_sys->rootca, NULL);
/* set the local certificate from CertFile */
if(SSL_CTX_use_certificate_file(co->ctx, SSL_cert,
SSL_FILETYPE_PEM)<=0) {
r = EINVAL;
goto __fail_3;
}
/* set the private key from KeyFile (may be the same as CertFile) */
if(SSL_CTX_use_PrivateKey_file(co->ctx, SSL_cert,
SSL_FILETYPE_PEM)<=0) {
r = EINVAL;
goto __fail_3;
}
/* verify private key */
if (!SSL_CTX_check_private_key(co->ctx)) {
r = EINVAL;
goto __fail_3;
}
/* assign allocated memory */
co->rpc_list = rpc_tree;
co->chnl_tree = ch_tree;
co->uuid = uuid;
/* connect to the pointed server */
/* resolve host */
if(!(buf = malloc(__TMPBUFLEN))) {
r = ENOMEM;
goto __fail_3;
}
if(__resolvehost(host, buf, __TMPBUFLEN, &host_) != NETDB_SUCCESS) {
r = ENOENT;
free(buf);
goto __fail_3;
}
/* create a socket */
sd = socket(PF_INET, SOCK_STREAM, 0);
bzero(&addr, sizeof(addr));
/* try to connect it */
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = *(uint32_t*)(host_->h_addr);
free(host_);
if (connect(sd, (struct sockaddr*)&addr, sizeof(addr)) != 0) {
close(sd);
free(buf);
r = ENOENT; /* couldn't connect to the desired host */
goto __fail_3;
}
/* now we will create an SSL connection */
co->ssl = SSL_new(co->ctx);
SSL_set_fd(co->ssl, sd); /* attach connected socket */
if(SSL_connect(co->ssl) == -1) {
r = EBADE;
free(buf);
/* shutdown connection */
goto __fail_3;
} /* if success we're ready to use established SSL channel */
/* auth and RPC contexts sync */
co->pctx = pctx;
snprintf(buf, __TMPBUFLEN, "(auth-set-context ((:user \"%s\")(:passwd \"%s\")))",
pctx->login, pctx->passwd);
/* send an auth request */
SSL_write(co->ssl, buf, strlen(buf) + sizeof(char));
/* read the message reply */
bytes = __conn_read(co, buf, __TMPBUFLEN);
if(bytes == -1) {
// we've lost the connection
co->flags &= ~CXCONN_ESTABL;
r = ESXNOCONNECT;
co->flags |= CXCONN_BROKEN;
//__wake_up_waiters(co, ESXNOCONNECT);
free(buf);
/* shutdown connection */
goto __fail_3;
}
buf[bytes] = 0;
/* perform an rpc call */
r = __eval_cstr(buf, conn_sys->system_rpc, (void *)co);
if(!r) { /* all is fine security context is good */
snprintf(buf, __TMPBUFLEN, "(ch-get-types)"); /* now we should receive possible channel types */
SSL_write(co->ssl, buf, strlen(buf) + sizeof(char));
/* read the message reply */
bytes = __conn_read(co, buf, __TMPBUFLEN);
if(bytes == -1) {
// we've lost the connection
co->flags &= ~CXCONN_ESTABL;
co->flags |= CXCONN_BROKEN;
r = ESXNOCONNECT;
//__wake_up_waiters(co, ESXNOCONNECT);
free(buf);
/* shutdown connection */
goto __fail_3;
}
buf[bytes] = 0;
/* perform an rpc call */
r = __eval_cstr(buf, conn_sys->system_rpc, (void *)co);
}
free(buf); /* now we can free the temporary buffer */
/* a listening thread creation (incoming messages) */
if(!r) { /* success let's start a listening thread */
r = pthread_create(&co->cthread, NULL, __cxslave_thread_listener, (void *)co);
if(!r) {
/* add connection to the list */
usrtc_node_init(&co->csnode, co);
co->flags = (CXCONN_SLAVE | CXCONN_ESTABL); /* set the right flags */
pthread_rwlock_wrlock(&conn_sys->rwlock);
usrtc_insert(conn_sys->connections, &co->csnode, (void *)co->uuid);
pthread_rwlock_unlock(&conn_sys->rwlock);
}
r = pthread_create(&co->msgthread, NULL, __msg_queue_thread, (void *)co);
if(r) goto __fail_3;
pth_dqtpoll_run(tpoll);
co->tpoll = tpoll;
return 0;
}
__fail_3:
pthread_mutex_destroy(&co->oplock);
__fail_2:
free(rpc_tree);
__fail_1:
free(ch_tree);
__fail:
free(uuid);
return r;
}
int connection_create(conn_t *co, int sck)
{
int r = 0;
int bytes = 0;
char *uuid;
char *buf = NULL;
usrtc_t *ch_tree, *rpc_tree;
pth_queue_t *rqueue = malloc(sizeof(pth_queue_t));
pth_queue_t *mqueue = malloc(sizeof(pth_queue_t));
pth_dqtpoll_t *tpoll = malloc(sizeof(pth_dqtpoll_t));
if(!tpoll) return ENOMEM; // TODO: fallback
idx_allocator_t *idx_ch = malloc(sizeof(idx_allocator_t));
if(!co) return EINVAL;
else memset(co, 0, sizeof(conn_t));
pth_dqtpoll_init(tpoll, __rpc_callback); // TODO: check it
if(!idx_ch) return ENOMEM;
else r = idx_allocator_init(idx_ch, MAX_CHANNELS*MAX_MULTI, 0);
if(r) return r;
if(!(uuid = __generate_uuid())) return ENOMEM;
if(!(ch_tree = malloc(sizeof(usrtc_t)))) {
r = ENOMEM;
goto __fail;
}
if(!(rpc_tree = malloc(sizeof(usrtc_t)))) {
r = ENOMEM;
goto __fail_1;
}
if((r = pthread_mutex_init(&co->oplock, NULL))) goto __fail_2;
if((r = pthread_rwlock_init(&co->chnl_lock, NULL))) goto __fail_3;
usrtc_init(rpc_tree, USRTC_REDBLACK, MAX_RPC_LIST, __cmp_int);
usrtc_init(ch_tree, USRTC_REDBLACK, MAX_CHANNELS, __cmp_ulong);
co->idx_ch = idx_ch;
/* assign message queue */
r = pth_queue_init(rqueue);
if(r) goto __fail_3;
co->rqueue = rqueue;
/* assigned outbone message queue master also has this one */
r = pth_queue_init(mqueue);
if(r) goto __fail_3;
co->mqueue = mqueue;
/* init SSL certificates and context */
co->ctx = SSL_CTX_new(TLSv1_2_server_method());
if(!co->ctx) { r = EINVAL; printf("%s:%d\n", __FUNCTION__, __LINE__);goto __fail_3; }
else {
/* set verify context */
SSL_CTX_set_verify(co->ctx, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT,
__verify_certcall);
/* set verify depth */
SSL_CTX_set_verify_depth(co->ctx, VERIFY_DEPTH);
}
/* load certificates */
SSL_CTX_load_verify_locations(co->ctx, conn_sys->rootca, NULL);
/* set the local certificate from CertFile */
if(SSL_CTX_use_certificate_file(co->ctx, conn_sys->certpem,
SSL_FILETYPE_PEM)<=0) {
printf("certpem1 = %s\n", conn_sys->certpem);
ERR_print_errors_fp(stderr);
r = EINVAL; printf("%s:%d\n", __FUNCTION__, __LINE__);
goto __fail_3;
}
/* set the private key from KeyFile (may be the same as CertFile) */
if(SSL_CTX_use_PrivateKey_file(co->ctx, conn_sys->certkey,
SSL_FILETYPE_PEM)<=0) {
r = EINVAL; printf("%s:%d\n", __FUNCTION__, __LINE__);
goto __fail_3;
}
/* verify private key */
if (!SSL_CTX_check_private_key(co->ctx)) {
r = EINVAL; printf("%s:%d\n", __FUNCTION__, __LINE__);
goto __fail_3;
}
/* assign allocated memory */
co->rpc_list = rpc_tree;
co->chnl_tree = ch_tree;
co->uuid = uuid;
if(!(buf = malloc(__TMPBUFLEN))) {
r = ENOMEM;
goto __fail_3;
}
/* now we will create an SSL connection */
co->ssl = SSL_new(co->ctx);
SSL_set_fd(co->ssl, sck); /* attach connected socket */
/* set the context to verify ssl connection */
SSL_set_ex_data(co->ssl, conn_sys->ex_ssldata_index, (void *)co);
if(SSL_accept(co->ssl) == -1) {
r = EBADE;
free(buf);
/* shutdown connection */
goto __fail_3;
} /* if success we're ready to use established SSL channel */
printf("%s:%d\n", __FUNCTION__, __LINE__);
BIO_set_nbio(SSL_get_rbio(co->ssl), 1);
/*******************************************/
/*-=Protocol part of connection establish=-*/
/*******************************************/
while(!(co->flags & CXCONN_ESTABL)) { /* read the initiation stage connections */
bytes = __conn_read(co, buf, __TMPBUFLEN);
if(bytes > 0) {
buf[bytes] = 0;
r = __eval_cstr(buf, conn_sys->system_rpc, (void *)co);
printf("%s return %d (bytes %d)\n", buf, r, bytes);
if(r) goto __fail_3;
} else {
printf("bytes = %d\n", bytes);
if(bytes < 0) {
printf("Terminate SSL connection, the other end is lost.\n");
co->flags &= ~CXCONN_ESTABL;
co->flags |= CXCONN_BROKEN;
//__wake_up_waiters(co, ESXNOCONNECT);
r = ESXNOCONNECT;
goto __fail_3;
}
}
}
/* before it will be done assign rpc list */
if(conn_sys->get_rpc_typed_list_tree)
co->rpc_list = conn_sys->get_rpc_typed_list_tree(co);
free(buf);
r = pthread_create(&co->cthread, NULL, __cxmaster_thread_listener, (void *)co);
if(!r) {
/* add connection to the list */
usrtc_node_init(&co->csnode, co);
co->flags |= CXCONN_MASTER; /* set the right flags */
pthread_rwlock_wrlock(&conn_sys->rwlock);
usrtc_insert(conn_sys->connections, &co->csnode, (void *)co->uuid);
pthread_rwlock_unlock(&conn_sys->rwlock);
/* threads poll --- */
r = pthread_create(&co->msgthread, NULL, __msg_queue_thread, (void *)co);
if(r) goto __fail_3;
r = pthread_create(&co->rmsgthread, NULL, __rmsg_queue_thread, (void *)co);
if(r) goto __fail_3;
pth_dqtpoll_run(tpoll);
co->tpoll = tpoll;
}
return r;
__fail_3:
pthread_mutex_destroy(&co->oplock);
__fail_2:
free(rpc_tree);
__fail_1:
free(ch_tree);
__fail:
free(uuid);
return r;
}
int connection_close(conn_t *co) /* TODO: */
{
return 0;
}
int connection_reinit(conn_t *co) /* TODO: */
{
return 0;
}
static sxmsg_t *__allocate_msg(int *res)
{
sxmsg_t *msg = malloc(sizeof(sxmsg_t));
int r = 0;
if(!msg) {
*res = ENOMEM;
return NULL;
} else {
memset(msg, 0, sizeof(sxmsg_t));
if((r = pthread_mutex_init(&(msg->wait), NULL))) {
free(msg);
*res = r;
return NULL;
}
usrtc_node_init(&(msg->chnl_node), msg);
usrtc_node_init(&(msg->poll_node), msg);
usrtc_node_init(&(msg->pendingq_node), msg);
}
*res = 0;
return msg;
}
10 years ago
static void __destroy_msg(sxmsg_t *msg)
{
10 years ago
chnl_t *ch = msg->pch;
10 years ago
10 years ago
if(msg->flags & ESXMSG_USR) {
10 years ago
pthread_mutex_lock(&(ch->oplock));
10 years ago
idx_free(ch->idx_msg, msg->mid);
10 years ago
pthread_mutex_unlock(&(ch->oplock));
} else if(msg->flags & ESXMSG_SYS) {
//if(msg->uuid) free(msg->uuid);
10 years ago
}
pthread_mutex_unlock(&(msg->wait));
pthread_mutex_destroy(&(msg->wait));
free(msg);
return;
}
static int __create_reg_msg(sxmsg_t **msg, chnl_t *ch)
{
int r = 0;
sxmsg_t *sm = __allocate_msg(&r);
if(r) return r;
else {
sm->pch = ch;
sm->flags = (ESXMSG_USR | ESXMSG_PENDING);
/* ok allocate message ID */
pthread_mutex_lock(&(ch->oplock));
sm->mid = idx_allocate(ch->idx_msg);
pthread_mutex_unlock(&(ch->oplock));
pthread_mutex_lock(&(sm->wait));
*msg = sm;
}
return 0;
}
static int __create_sys_msg(sxmsg_t **msg, char *uuid, chnl_t *ch, sxpayload_t *data)
{
int r = 0;
sxmsg_t *m = __allocate_msg(&r);
if(r) return r;
else {
/* fill values */
m->pch = ch;
m->uuid = uuid;
m->payload = data;
/* set the right flags */
m->flags = (ESXMSG_SYS | ESXMSG_PENDING);
/* we need to lock the wait mutex */
pthread_mutex_lock(&(m->wait));
*msg = m;
}
return 0;
}
/* channels */
int channel_open(conn_t *co, chnl_t **ch, int type)
{
chnl_t *nch = NULL;
int r = 0;
char *uuid_;
sxpayload_t *pl;
ulong_t cid;
rpc_typed_list_t *rpclist = NULL;
usrtc_node_t *node = NULL;
sxmsg_t *sms;
if(!(co->flags & CXCONN_ESTABL)) {
return ESXNOCONNECT;
}
uuid_ = __generate_uuid();
pl = malloc(sizeof(sxpayload_t));
node = usrtc_lookup(co->rpc_list, &type);
if(node) rpclist = (rpc_typed_list_t *)usrtc_node_getdata(node);
if(!uuid_) {
if(pl) free(pl);
return ENOMEM;
}
if(!pl) {
__ffail:
if(uuid_) free(uuid_);
return ENOMEM;
} else {
pl->sx = NULL;
if(!(pl->cstr = malloc(sizeof(char)*ESX_SYSMSG_SIZE))) {
free(pl); goto __ffail;
} else memset(pl->cstr, 0, sizeof(char)*ESX_SYSMSG_SIZE);
}
pthread_rwlock_wrlock(&(co->chnl_lock));
cid = idx_allocate(co->idx_ch);
pthread_rwlock_unlock(&(co->chnl_lock));
if(cid == IDX_INVAL) {
r = ENOMEM;
goto __fini_op;
}
if((r = __alloc_channel(cid, co, rpclist, &nch))) {
goto __fini_op;
} else nch->flags |= ESXCHAN_PENDING;
nch->uuid = uuid_;
/* ok now we're ready to create a message and push channel to the list */
if((r = __create_sys_msg(&sms, uuid_, nch, pl))) {
__fail_chan:
/* destroy the channel*/
goto __fini_op;
} else {
/* put the channel to the channels search tree */
pthread_rwlock_wrlock(&(co->chnl_lock));
//printf("inserting cid = %d\n", nch->cid);
usrtc_insert(co->chnl_tree, &nch->node, &nch->cid);
pthread_rwlock_unlock(&(co->chnl_lock));
/* put system message to the run queue */
/* first form the message */
snprintf(pl->cstr, sizeof(char)*ESX_SYSMSG_SIZE,
"(ch-open ((:id %ld)(:uuid %s)(:type %d)))", nch->cid, nch->uuid, type);
nch->sysmsg = sms; /* assign system message to the channel */
/* put it */
if((r = pth_queue_add(conn_sys->ioqueue, (void *)sms, SYS_MSG))) {
__fail_chan_r:
/* remove it from the search tree */
pthread_rwlock_wrlock(&(co->chnl_lock));
usrtc_delete(co->chnl_tree, &nch->node);
pthread_rwlock_unlock(&(co->chnl_lock));
goto __fail_chan;
}
if(!(sms->flags & ESXMSG_PENDING)) {
/* was processed too fast */
goto __process_smsg;
} else pthread_mutex_lock(&(sms->wait)); /* will sleep until got a reply */
__process_smsg:
if(sms->opcode) {
r = sms->opcode;
goto __fail_chan_r;
} else r = 0;
nch->flags &= ~ESXCHAN_PENDING; /* mark it as established */
free(pl->cstr);
free(pl);
__destroy_msg(nch->sysmsg);
}
__fini_op:
if(r) {
if(uuid_) free(uuid_);
if(pl) {
if(pl->cstr) free(pl->cstr);
free(pl);
}
pthread_rwlock_wrlock(&(co->chnl_lock));
idx_free(co->idx_ch, nch->cid);
pthread_rwlock_unlock(&(co->chnl_lock));
idx_allocator_destroy(nch->idx_msg);
free(nch->idx_msg);
free(nch->msgs_tree);
pthread_mutex_destroy(&(nch->oplock));
pthread_rwlock_destroy(&(nch->msglock));
free(nch);
} else *ch = nch;
return r;
}
int channel_close(chnl_t *chnl)
{
char *uuid_;
usrtc_node_t *node = NULL;
int r;
conn_t *co = chnl->connection;
sxmsg_t *sms;
sxpayload_t *pl;
if(!(co->flags & CXCONN_ESTABL)) {
return ESXNOCONNECT;
}
uuid_ = __generate_uuid();
pthread_rwlock_rdlock(&(co->chnl_lock));
node = usrtc_lookup(co->chnl_tree, &chnl->cid);
pthread_rwlock_unlock(&(co->chnl_lock));
if(!node) {
fprintf(stderr, "No such channel\n");
return ENOENT;
}
pthread_rwlock_wrlock(&(chnl->msglock));
/* check unprocessed messages */
if(!usrtc_isempty(chnl->msgs_tree)) {
pthread_rwlock_unlock(&(chnl->msglock));
fprintf(stderr, "Unable to close channel\n");
return EBUSY;
}
pl = malloc(sizeof(sxpayload_t));
if(!pl) return ENOMEM;
if(__create_sys_msg(&sms, uuid_, chnl, pl)) {
if(chnl->idx_msg) free(chnl->idx_msg);
if(chnl->msgs_tree) free(chnl->msgs_tree);
free(chnl);
return ENOMEM;
}
pl->sx = NULL;
if(!(pl->cstr = malloc(sizeof(char) * ESX_SYSMSG_SIZE))) {
pthread_rwlock_unlock(&(chnl->msglock));
free(pl);
return ENOMEM;
}
memset(pl->cstr, 0, sizeof(char) * ESX_SYSMSG_SIZE);
/* put system message to the run queue */
/* first form the message */
snprintf(pl->cstr, sizeof(char) * ESX_SYSMSG_SIZE,
"(ch-close (:id %ld))", chnl->cid);
chnl->sysmsg = sms; /* assign system message to the channel */
/* put it */
if((r = pth_queue_add(conn_sys->ioqueue, (void *)sms, SYS_MSG))) {
pthread_rwlock_unlock(&(chnl->msglock));
return r;
}
if(!(sms->flags & ESXMSG_PENDING)) {
/* was processed too fast */
goto __process_smsg;
} else pthread_mutex_lock(&(sms->wait)); /* will sleep until got a reply */
__process_smsg:
if(sms->opcode) {
r = sms->opcode;
return r;
} else r = 0;
chnl->flags &= ~ESXCHAN_PENDING; /* mark it as established */
/* remove channel from the search tree */
pthread_rwlock_wrlock(&(chnl->connection->chnl_lock));
usrtc_delete(chnl->connection->chnl_tree, &chnl->node);
/* free index */
idx_free(co->idx_ch, chnl->cid);
pthread_rwlock_unlock(&(chnl->connection->chnl_lock));
pthread_rwlock_unlock(&(chnl->msglock));
__destroy_msg(chnl->sysmsg);
free(uuid_);
free(pl->cstr);
free(pl);
free(chnl->uuid);
idx_allocator_destroy(chnl->idx_msg);
free(chnl->idx_msg);
free(chnl->msgs_tree);
pthread_mutex_destroy(&(chnl->oplock));
pthread_rwlock_destroy(&(chnl->msglock));
free(chnl);
return 0;
}
/* message passing */
10 years ago
/*
* How message sending works:
* 1. Create a message structure assigned to the channel,
* 2. Put S-expression context to it
* 3. Put the message to the queue
* 4. expect the result waiting on the lock mutex
*/
static int __message_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec *tio)
{
int r = 0;
sxmsg_t *m = NULL;
10 years ago
conn_t *co = ch->connection;
if(!(co->flags & CXCONN_ESTABL)) {
destroy_sexp(sx);
return ESXNOCONNECT;
}
10 years ago
*msg = NULL;
10 years ago
r = __create_reg_msg(&m, ch);
if(r) return r;
else {
/* put the message to the search tree */
pthread_rwlock_wrlock(&(ch->msglock));
10 years ago
usrtc_insert(ch->msgs_tree, &(m->pendingq_node), &(m->mid));
10 years ago
pthread_rwlock_unlock(&(ch->msglock));
10 years ago
/* message assign */
m->opcode = 0;
m->payload = (void *)sx;
/* assign initial sx */
m->initial_sx = sx;
10 years ago
10 years ago
/* put the message to the run queue */
10 years ago
r = pth_queue_add(co->mqueue, (void *)m, USR_MSG);
if(r) return r; /* FIXME: better give up */
if(m->flags & ESXMSG_PENDING) {
if(!tio) pthread_mutex_lock(&(m->wait));
else pthread_mutex_timedlock(&(m->wait), tio);
}
if(tio && (m->flags & ESXMSG_PENDING))
return SXOTIMEDOUT;
if(!m->payload) {
r = m->opcode;
/* first remove the message from tree */
pthread_rwlock_wrlock(&(ch->msglock));
usrtc_delete(ch->msgs_tree, &(m->pendingq_node));
pthread_rwlock_unlock(&(ch->msglock));
/* destroy s expression */
destroy_sexp(m->initial_sx);
/* destroy */
__destroy_msg(m);
10 years ago
} else {
*msg = m;
r = SXOREPLYREQ;
}
10 years ago
}
return r;
}
int msg_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg)
{
return __message_send(ch, sx, msg, NULL);
}
int msg_send_timed(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec *tio)
{
return __message_send(ch, sx, msg, tio);
}
10 years ago
static int __msg_reply(sxmsg_t *msg, sexp_t *sx, struct timespec *tio, int opcode)
{
int r = 0;
chnl_t *ch = msg->pch;
conn_t *co = ch->connection;
if(!(co->flags & CXCONN_ESTABL)) {
destroy_sexp(sx);
return ESXNOCONNECT;
}
10 years ago
if(msg->flags & ESXMSG_ISREPLY)
destroy_sexp((sexp_t *)msg->payload);
10 years ago
msg->payload = sx;
msg->opcode = opcode;
msg->flags |= ESXMSG_PENDING; /* pending */
msg->flags |= ESXMSG_ISREPLY; /* this is a reply */
if(!sx) msg->flags &= ~ESXMSG_PENDING;
else msg->flags |= ESXMSG_RMONRETR;
10 years ago
/* put the message to the queue */
r = pth_queue_add(co->mqueue, (void *)msg, USR_MSG);
if(r) return r; /* FIXME: better give up */
if(!sx) return 0;
10 years ago
if(msg->flags & ESXMSG_PENDING) {
if(!tio) pthread_mutex_lock(&(msg->wait));
else pthread_mutex_timedlock(&(msg->wait), tio);
}
if(tio && (msg->flags & ESXMSG_PENDING))
return SXOTIMEDOUT;
r = msg->opcode;
if(msg->flags & ESXMSG_CLOSURE) {
/* destroy */
destroy_sexp(msg->initial_sx);
__destroy_msg(msg);
10 years ago
}
return r;
}
int msg_return(sxmsg_t *msg, int opcode)
{
10 years ago
return __msg_reply(msg, NULL, NULL, opcode);
}
int msg_reply(sxmsg_t *msg, sexp_t *sx)
{
10 years ago
return __msg_reply(msg, sx, NULL, 0);
}
int msg_reply_timed(sxmsg_t *msg, sexp_t *sx, struct timespec *tio)
{
10 years ago
return __msg_reply(msg, sx, tio, 0);
}
//TODO: continue. Implement wait for delivery and queue addition
/*
* How message sending works:
* 1. Create a message structure assigned to the channel,
* 2. Put S-expression context to it
* 3. Put the message to the queue
* 4. Wait for job execution
*/
static int __message_send_pulse(chnl_t *ch, sexp_t *sx, struct timespec *tio)
{
int r = 0;
sxmsg_t *m = NULL;
conn_t *co = ch->connection;
if(!(co->flags & CXCONN_ESTABL)) {
destroy_sexp(sx);
return ESXNOCONNECT;
}
r = __create_reg_msg(&m, ch);
if(r) return r;
else {
/* put the message to the search tree */
pthread_rwlock_wrlock(&(ch->msglock));
usrtc_insert(ch->msgs_tree, &(m->pendingq_node), &(m->mid));
pthread_rwlock_unlock(&(ch->msglock));
/* message assign */
m->opcode = 0;
m->payload = (void *)sx;
/* assign initial sx */
m->initial_sx = sx;
m->flags |= ESXMSG_PULSE;
/* put the message to the run queue */
r = pth_queue_add(co->mqueue, (void *)m, USR_MSG);
if(r) return r; /* FIXME: better give up */
if(m->flags & ESXMSG_PENDING) {
if(!tio) pthread_mutex_lock(&(m->wait));
else pthread_mutex_timedlock(&(m->wait), tio);
}
if(tio && (m->flags & ESXMSG_PENDING))
return SXOTIMEDOUT;
if(!m->payload) {
r = m->opcode;
/* first remove the message from tree */
pthread_rwlock_wrlock(&(ch->msglock));
usrtc_delete(ch->msgs_tree, &(m->pendingq_node));
pthread_rwlock_unlock(&(ch->msglock));
/* destroy s expression */
destroy_sexp(m->initial_sx);
/* destroy */
__destroy_msg(m);
} else {
r = SXOREPLYREQ;
}
}
return r;
}
int msg_send_pulse(chnl_t *ch, sexp_t *sx)
{
return __message_send_pulse(ch, sx, NULL);
}
int msg_send_pulse_timed(chnl_t *ch, sexp_t *sx, struct timespec *tio)
{
return 0;
}
int msg_send_pulse_nowait(chnl_t *ch, sexp_t *sx)
{
return 0;
}
/* sexp helpers */
int sexp_list_car(sexp_t *expr, sexp_t **sx)
{
if (!SEXP_IS_LIST(expr) || expr->list->ty != SEXP_VALUE) return 1;
*sx = expr->list;
return 0;
}
int sexp_list_cdr(sexp_t *expr, sexp_t **sx)
{
/* Dummy function. Can we do cdr properly? */
if (!SEXP_IS_LIST(expr) || expr->list->ty != SEXP_VALUE) return 1;
if (!expr->list->next) *sx = NULL;
else *sx = expr->list->next;
return 0;
}