subsystem became multiplied one;

This commit is contained in:
Alexander Vdolainen 2015-05-19 01:42:17 +03:00
parent 87c09c307e
commit 800437bec7
3 changed files with 144 additions and 117 deletions

View File

@ -77,12 +77,15 @@ typedef struct __perm_context_type {
#define CXCONN_ESTABL (1 << 3) #define CXCONN_ESTABL (1 << 3)
#define CXCONN_BROKEN (1 << 4) #define CXCONN_BROKEN (1 << 4)
struct __connections_subsys_type;
/* /*
* ä jätä kommentteja omalla kielellä! yksinkertaisia englanti sijaan! * ä jätä kommentteja omalla kielellä! yksinkertaisia englanti sijaan!
* i found somebody who write comments and messages in non-english, * i found somebody who write comments and messages in non-english,
* itäs a fucking practice - forget it. * itäs a fucking practice - forget it.
*/ */
typedef struct __connection_t { typedef struct __connection_t {
struct __connections_subsys_type *ssys; /* < connections subsystem */
char *uuid; /** < uuid of the connection */ char *uuid; /** < uuid of the connection */
idx_allocator_t *idx_ch; /** < index allocation for channels */ idx_allocator_t *idx_ch; /** < index allocation for channels */
usrtc_t *chnl_tree; /** < search tree of all channels */ usrtc_t *chnl_tree; /** < search tree of all channels */
@ -184,8 +187,6 @@ typedef struct __connection_rpc_list_type {
* *
*/ */
typedef struct __connections_subsys_type { typedef struct __connections_subsys_type {
int ex_ssldata_index; /** < index used to work with additional data
* provided to the special call during SSL handshake */
usrtc_t *connections; usrtc_t *connections;
pth_queue_t *ioq; /** < general messages queue */ pth_queue_t *ioq; /** < general messages queue */
pth_queue_t *ioqueue; /** < system messages queue */ pth_queue_t *ioqueue; /** < system messages queue */
@ -213,22 +214,33 @@ typedef struct __rpc_typed_list_type {
usrtc_node_t lnode; usrtc_node_t lnode;
} rpc_typed_list_t; } rpc_typed_list_t;
extern conn_sys_t *conn_sys;
/* General API */ /* General API */
/* subsystem */ /* subsystem */
extern conn_sys_t *conn_sys; extern conn_sys_t *conn_sys; /* an old obsolete method */
/* old API from 0.1.xx */
#define connections_subsystem_init() { conn_sys = malloc(sizeof(conn_sys_t)); connections_init(conn_sys); }
#define connections_subsystem_setsslserts(a, b, c) connections_setsslserts(conn_sys, a, b, c)
#define connections_subsystem_setrpclist_function(a) connections_setrpclist_function(conn_sys, a)
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
int connections_subsystem_init(void); /** call this function before use sntl related functions */
int sntl_init(void);
int connections_subsystem_setsslserts(const char *rootca, const char *certpem, /* new */
const char *certkey); int connections_init(conn_sys_t *ssys);
int connections_setsslserts(conn_sys_t *ssys, const char *rootca,
const char *certpem, const char *certkey);
int connections_setrpclist_function(conn_sys_t *ssys,
usrtc_t* (*get_rpc_typed_list_tree)(conn_t *));
int connections_subsystem_setrpclist_function(usrtc_t* (*get_rpc_typed_list_tree)(conn_t *));
#ifdef __cplusplus #ifdef __cplusplus
} }
@ -239,16 +251,20 @@ int connections_subsystem_setrpclist_function(usrtc_t* (*get_rpc_typed_list_tree
#define connections_subsystem_set_rpctlist_call(c, fuu) (c)->set_typed_list_callback = fuu #define connections_subsystem_set_rpctlist_call(c, fuu) (c)->set_typed_list_callback = fuu
#define connections_subsystem_set_on_destroy(c, fuu) (c)->on_destroy = fuu #define connections_subsystem_set_on_destroy(c, fuu) (c)->on_destroy = fuu
/* connection */ /* connection - compatibility (old versions) macros */
#define connection_create(c, s) connection_create_fapi((c), (s), NULL) #define connection_create(c, s) connection_create_fapi((c), (s), NULL)
#define connection_initiate(c, h, p, s, p1) connection_inititate_m(conn_sys, c, h, p, s, p1)
#define connection_create_fapi(c, s, a) connection_create_fapi_m(conn_sys, c, s, a)
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
int connection_initiate (conn_t *co, const char *host, int port, /* new */
const char *SSL_cert, perm_ctx_t *pctx); int connection_initiate_m(conn_sys_t *ssys, conn_t *co, const char *host,
int port, const char *SSL_cert, perm_ctx_t *pctx);
int connection_create_fapi(conn_t *co, int sck, struct in_addr *addr); int connection_create_fapi_m(conn_sys_t *ssys, conn_t *co, int sck,
struct in_addr *addr);
int connection_close(conn_t *co); int connection_close(conn_t *co);

View File

@ -99,6 +99,7 @@ int __alloc_channel(ulong_t cid, conn_t *co, rpc_typed_list_t *rlist, chnl_t **c
int channel_open(conn_t *co, chnl_t **ch, int type) int channel_open(conn_t *co, chnl_t **ch, int type)
{ {
chnl_t *nch = NULL; chnl_t *nch = NULL;
conn_sys_t *ssys = co->ssys;
int r = 0; int r = 0;
char *uuid_; char *uuid_;
sxpayload_t *pl; sxpayload_t *pl;
@ -165,7 +166,7 @@ int channel_open(conn_t *co, chnl_t **ch, int type)
"(ch-open ((:id %ld)(:uuid %s)(:type %d)))", nch->cid, nch->uuid, type); "(ch-open ((:id %ld)(:uuid %s)(:type %d)))", nch->cid, nch->uuid, type);
nch->sysmsg = sms; /* assign system message to the channel */ nch->sysmsg = sms; /* assign system message to the channel */
/* put it */ /* put it */
if((r = pth_queue_add(conn_sys->ioqueue, (void *)sms, SYS_MSG))) { if((r = pth_queue_add(ssys->ioqueue, (void *)sms, SYS_MSG))) {
__fail_chan_r: __fail_chan_r:
/* remove it from the search tree */ /* remove it from the search tree */
pthread_rwlock_wrlock(&(co->chnl_lock)); pthread_rwlock_wrlock(&(co->chnl_lock));
@ -216,6 +217,7 @@ int channel_close(chnl_t *chnl)
usrtc_node_t *node = NULL; usrtc_node_t *node = NULL;
int r; int r;
conn_t *co = chnl->connection; conn_t *co = chnl->connection;
conn_sys_t *ssys = co->ssys;
sxmsg_t *sms; sxmsg_t *sms;
sxpayload_t *pl; sxpayload_t *pl;
@ -266,7 +268,7 @@ int channel_close(chnl_t *chnl)
"(ch-close (:id %ld))", chnl->cid); "(ch-close (:id %ld))", chnl->cid);
chnl->sysmsg = sms; /* assign system message to the channel */ chnl->sysmsg = sms; /* assign system message to the channel */
/* put it */ /* put it */
if((r = pth_queue_add(conn_sys->ioqueue, (void *)sms, SYS_MSG))) { if((r = pth_queue_add(ssys->ioqueue, (void *)sms, SYS_MSG))) {
pthread_rwlock_unlock(&(chnl->msglock)); pthread_rwlock_unlock(&(chnl->msglock));
return r; return r;
} }

View File

@ -44,6 +44,16 @@ struct __rpc_job
conn_sys_t *conn_sys = NULL; conn_sys_t *conn_sys = NULL;
static int ex_ssldata_index; /** < index used to work with additional data
* provided to the special call during SSL handshake */
int sntl_init(void)
{
ex_ssldata_index = SSL_get_ex_new_index(0, "__ssldata index", NULL, NULL, NULL);
return 0;
}
static long __cmp_ulong(const void *a, const void *b); static long __cmp_ulong(const void *a, const void *b);
/* message alloc and destroy */ /* message alloc and destroy */
@ -51,7 +61,7 @@ extern sxmsg_t *__allocate_msg(int *res);
extern void __destroy_msg(sxmsg_t *msg); extern void __destroy_msg(sxmsg_t *msg);
/* connections */ /* connections */
void __connections_subsystem_connection_remove(conn_t *); static void __connections_subsystem_connection_remove(conn_t *);
static void __connection_free(conn_t *co); static void __connection_free(conn_t *co);
/* examination */ /* examination */
@ -337,6 +347,7 @@ static void __destroy_all_channels(conn_t *co)
static int __default_auth_set_context(void *cctx, sexp_t *sx) static int __default_auth_set_context(void *cctx, sexp_t *sx)
{ {
conn_t *co = (conn_t *)cctx; conn_t *co = (conn_t *)cctx;
conn_sys_t *ssys = co->ssys;
char *val, *var, *tbuf = NULL; char *val, *var, *tbuf = NULL;
sexp_t *lsx, *sx_iter, *sx_in; sexp_t *lsx, *sx_iter, *sx_in;
int llen, idx, err = 0; int llen, idx, err = 0;
@ -387,8 +398,8 @@ static int __default_auth_set_context(void *cctx, sexp_t *sx)
err = ENOMEM; err = ENOMEM;
goto __reply; goto __reply;
} }
if(conn_sys->secure_check) if(ssys->secure_check)
err = conn_sys->secure_check(co); err = ssys->secure_check(co);
__reply: __reply:
if(err) { if(err) {
@ -420,7 +431,6 @@ static int __default_auth_set_attr(void *cctx, sexp_t *sx)
lsx = sx->list->next; lsx = sx->list->next;
/* now we expect a list of lists */ /* now we expect a list of lists */
if(lsx->ty != SEXP_LIST) { if(lsx->ty != SEXP_LIST) {
// printf("%s:%d\n", __FUNCTION__, __LINE__);
r = ESXRCBADPROT; r = ESXRCBADPROT;
goto __finish; goto __finish;
} }
@ -483,6 +493,7 @@ static int __default_auth_set_error(void *cctx, sexp_t *sx)
static int __default_ch_get_types(void *cctx, sexp_t *sx) static int __default_ch_get_types(void *cctx, sexp_t *sx)
{ {
conn_t *co = (conn_t *)cctx; conn_t *co = (conn_t *)cctx;
conn_sys_t *ssys = co->ssys;
usrtc_node_t *node; usrtc_node_t *node;
rpc_typed_list_t *list_ent; rpc_typed_list_t *list_ent;
char *tbuf = malloc(4096), *tt; char *tbuf = malloc(4096), *tt;
@ -491,7 +502,7 @@ static int __default_ch_get_types(void *cctx, sexp_t *sx)
/* if we cannot allocate anything ... */ /* if we cannot allocate anything ... */
if(!tbuf) return ENOMEM; if(!tbuf) return ENOMEM;
/* ok here we go */ /* ok here we go */
co->rpc_list = conn_sys->get_rpc_typed_list_tree(co); co->rpc_list = ssys->get_rpc_typed_list_tree(co);
/* ok, here we're don't need to parse anything */ /* ok, here we're don't need to parse anything */
if(!usrtc_count(co->rpc_list)) { if(!usrtc_count(co->rpc_list)) {
err = ENXIO; err = ENXIO;
@ -523,6 +534,7 @@ static int __default_ch_get_types(void *cctx, sexp_t *sx)
static int __default_ch_set_types(void *cctx, sexp_t *sx) static int __default_ch_set_types(void *cctx, sexp_t *sx)
{ {
conn_t *co = (conn_t *)cctx; conn_t *co = (conn_t *)cctx;
conn_sys_t *ssys = co->ssys;
char buf[1024], *val, *var; char buf[1024], *val, *var;
int r = 0, llen, typeid, idx; int r = 0, llen, typeid, idx;
sexp_t *lsx, *sx_iter, *sx_in; sexp_t *lsx, *sx_iter, *sx_in;
@ -562,9 +574,9 @@ static int __default_ch_set_types(void *cctx, sexp_t *sx)
r = ESXRCBADPROT; r = ESXRCBADPROT;
goto __send_reply; goto __send_reply;
} else { } else {
if(conn_sys->set_typed_list_callback) { if(ssys->set_typed_list_callback) {
typeid = atoi((char *)(val + sizeof(char))); typeid = atoi((char *)(val + sizeof(char)));
if(conn_sys->set_typed_list_callback(co, typeid, var)) { if(ssys->set_typed_list_callback(co, typeid, var)) {
destroy_sexp(sx); destroy_sexp(sx);
return ENXIO; return ENXIO;
} }
@ -1471,12 +1483,13 @@ static int __eval_cstr(char *cstr, cx_rpc_list_t *rpc_list, void *ctx)
static void *__cxslave_thread_listener(void *wctx) static void *__cxslave_thread_listener(void *wctx)
{ {
conn_t *co = (conn_t *)wctx; conn_t *co = (conn_t *)wctx;
conn_sys_t *ssys = co->ssys;
char *buf = malloc(4096); char *buf = malloc(4096);
int r; int r;
while((r = __conn_read(co, buf, 4096)) != -1) { while((r = __conn_read(co, buf, 4096)) != -1) {
buf[r] = '\0'; buf[r] = '\0';
r = __eval_cstr(buf, conn_sys->system_rpc, co); r = __eval_cstr(buf, ssys->system_rpc, co);
} }
co->flags &= ~CXCONN_ESTABL; co->flags &= ~CXCONN_ESTABL;
co->flags |= CXCONN_BROKEN; co->flags |= CXCONN_BROKEN;
@ -1496,12 +1509,13 @@ static void *__cxslave_thread_listener(void *wctx)
static void *__cxmaster_thread_listener(void *wctx) static void *__cxmaster_thread_listener(void *wctx)
{ {
conn_t *co = (conn_t *)wctx; conn_t *co = (conn_t *)wctx;
conn_sys_t *ssys = co->ssys;
char *buf = malloc(4096); char *buf = malloc(4096);
int r; int r;
while((r = __conn_read(co, buf, 4096)) != -1) { while((r = __conn_read(co, buf, 4096)) != -1) {
buf[r] = '\0'; buf[r] = '\0';
r = __eval_cstr(buf, conn_sys->system_rpc, co); r = __eval_cstr(buf, ssys->system_rpc, co);
} }
co->flags &= ~CXCONN_ESTABL; co->flags &= ~CXCONN_ESTABL;
co->flags |= CXCONN_BROKEN; co->flags |= CXCONN_BROKEN;
@ -1734,7 +1748,8 @@ static int __verify_certcall(int preverify_ok, X509_STORE_CTX *ctx)
// X509 *cert = X509_STORE_CTX_get_current_cert(ctx); // X509 *cert = X509_STORE_CTX_get_current_cert(ctx);
int err = X509_STORE_CTX_get_error(ctx), depth = X509_STORE_CTX_get_error_depth(ctx); int err = X509_STORE_CTX_get_error(ctx), depth = X509_STORE_CTX_get_error_depth(ctx);
SSL *ssl = X509_STORE_CTX_get_ex_data(ctx, SSL_get_ex_data_X509_STORE_CTX_idx()); SSL *ssl = X509_STORE_CTX_get_ex_data(ctx, SSL_get_ex_data_X509_STORE_CTX_idx());
conn_t *co = SSL_get_ex_data(ssl, conn_sys->ex_ssldata_index); /* this is a custom data we're set before */ conn_t *co = SSL_get_ex_data(ssl, ex_ssldata_index); /* this is a custom data we're set before */
conn_sys_t *ssys = co->ssys;
/* now we need to check for certificates with a long chain, /* now we need to check for certificates with a long chain,
* so since we have a short one, reject long ones */ * so since we have a short one, reject long ones */
@ -1749,13 +1764,11 @@ static int __verify_certcall(int preverify_ok, X509_STORE_CTX *ctx)
/* ok, now we're on top of SSL (depth == 0) certs chain, /* ok, now we're on top of SSL (depth == 0) certs chain,
* and we can validate client certificate */ * and we can validate client certificate */
if(!depth) { if(!depth) {
//co->pctx = malloc(sizeof(perm_ctx_t));
co->pctx->certid = co->pctx->certid =
ASN1_INTEGER_get((const ASN1_INTEGER *)X509_get_serialNumber(ctx->current_cert)); ASN1_INTEGER_get((const ASN1_INTEGER *)X509_get_serialNumber(ctx->current_cert));
//printf("Certificate ID: %lu\n", co->pctx->certid);
/* now we're need to check the ssl cert */ /* now we're need to check the ssl cert */
if(conn_sys->validate_sslpem) { if(ssys->validate_sslpem) {
if(conn_sys->validate_sslpem(co)) return 0; if(ssys->validate_sslpem(co)) return 0;
else return 1; else return 1;
} else return 0; } else return 0;
} }
@ -1769,17 +1782,12 @@ static int __verify_certcall_dummy(int preverify_ok, X509_STORE_CTX *ctx)
return preverify_ok; return preverify_ok;
} }
/* subsystem: here u can told me about how it's ugly to use global pointers, /* thread serve the whole connections set i.e. subsystem
* yep, it's a business of fucking morons, btw it works (heh, openssl uses this * actually it works with system messages
* ancient shit method too, many many and many others too, trust me ...).
* subsystem required to define varios RPC lists, control list for connections,
* general queues, certificates (all connections uses the same set of certificates
* within application), general calls such as ... calls to get info about client
* cert and ... many other things.
*/ */
static void *__system_queue_listener(void *data)
void *__system_queue_listener(void *data)
{ {
conn_sys_t *ssys = (conn_sys_t *)data;
int r; int r;
pth_msg_t *tmp = malloc(sizeof(pth_msg_t)); pth_msg_t *tmp = malloc(sizeof(pth_msg_t));
sxmsg_t *sysmsg; sxmsg_t *sysmsg;
@ -1790,7 +1798,7 @@ void *__system_queue_listener(void *data)
if(!tmp) return NULL; if(!tmp) return NULL;
while(1) { while(1) {
r = pth_queue_get(conn_sys->ioqueue, NULL, tmp); r = pth_queue_get(ssys->ioqueue, NULL, tmp);
if(r) { if(r) {
free(tmp); free(tmp);
return NULL; return NULL;
@ -1822,55 +1830,55 @@ void *__system_queue_listener(void *data)
} }
/* general initialization must be called within app uses connection layer */ /* general initialization must be called within app uses connection layer */
int connections_subsystem_init(void) int connections_init(conn_sys_t *ssys)
{ {
int r = 0; int r = 0;
if(!(conn_sys = malloc(sizeof(conn_sys_t)))) return ENOMEM; if(!ssys) return EINVAL;
else if(!(conn_sys->connections = malloc(sizeof(usrtc_t)))) { else if(!(ssys->connections = malloc(sizeof(usrtc_t)))) {
r = ENOMEM; r = ENOMEM;
goto __fail; goto __fail;
} }
/* zeroing */ /* zeroing */
conn_sys->rootca = conn_sys->certkey = conn_sys->certpem = NULL; ssys->rootca = ssys->certkey = ssys->certpem = NULL;
conn_sys->validate_sslpem = NULL; ssys->validate_sslpem = NULL;
conn_sys->secure_check = NULL; ssys->secure_check = NULL;
conn_sys->on_destroy = NULL; ssys->on_destroy = NULL;
/* init connections list */ /* init connections list */
usrtc_init(conn_sys->connections, USRTC_REDBLACK, MAX_CONNECTIONS, usrtc_init(ssys->connections, USRTC_REDBLACK, MAX_CONNECTIONS,
__cmp_cstr); __cmp_cstr);
if((r = pthread_rwlock_init(&(conn_sys->rwlock), NULL))) if((r = pthread_rwlock_init(&(ssys->rwlock), NULL)))
goto __fail_1; goto __fail_1;
/* init queues */ /* init queues */
if(!(conn_sys->ioq = malloc(sizeof(pth_queue_t)))) { /* general io queue */ if(!(ssys->ioq = malloc(sizeof(pth_queue_t)))) { /* general io queue */
r = ENOMEM; r = ENOMEM;
goto __fail_2; goto __fail_2;
} }
if((r = pth_queue_init(conn_sys->ioq))) goto __fail_3; if((r = pth_queue_init(ssys->ioq))) goto __fail_3;
if(!(conn_sys->ioqueue = malloc(sizeof(pth_queue_t)))) { /* system io queue */ if(!(ssys->ioqueue = malloc(sizeof(pth_queue_t)))) { /* system io queue */
r = ENOMEM; r = ENOMEM;
goto __fail_2; goto __fail_2;
} }
if((r = pth_queue_init(conn_sys->ioqueue))) goto __fail_3_1; if((r = pth_queue_init(ssys->ioqueue))) goto __fail_3_1;
/* init SSL certificates checking functions */ /* init SSL certificates checking functions */
/* init RPC list related functions */ /* init RPC list related functions */
if(!(conn_sys->system_rpc = malloc(sizeof(cx_rpc_list_t)))) { if(!(ssys->system_rpc = malloc(sizeof(cx_rpc_list_t)))) {
r = ENOMEM; r = ENOMEM;
goto __fail_3; goto __fail_3;
} else { } else {
if(!(conn_sys->system_rpc->rpc_tree = malloc(sizeof(usrtc_t)))) { if(!(ssys->system_rpc->rpc_tree = malloc(sizeof(usrtc_t)))) {
r = ENOMEM; r = ENOMEM;
__fail_rpc: __fail_rpc:
free(conn_sys->system_rpc); free(ssys->system_rpc);
goto __fail_3_1; goto __fail_3_1;
} }
usrtc_init(conn_sys->system_rpc->rpc_tree, USRTC_SPLAY, 256, __cmp_cstr); usrtc_init(ssys->system_rpc->rpc_tree, USRTC_SPLAY, 256, __cmp_cstr);
r = __init_systemrpc_tree(conn_sys->system_rpc->rpc_tree); r = __init_systemrpc_tree(ssys->system_rpc->rpc_tree);
if(r) { if(r) {
free(conn_sys->system_rpc->rpc_tree); free(ssys->system_rpc->rpc_tree);
goto __fail_rpc; goto __fail_rpc;
} }
} }
@ -1881,62 +1889,63 @@ int connections_subsystem_init(void)
OpenSSL_add_all_algorithms(); OpenSSL_add_all_algorithms();
SSL_load_error_strings(); SSL_load_error_strings();
conn_sys->ex_ssldata_index = SSL_get_ex_new_index(0, "__ssldata index", NULL, NULL, NULL);
/* create threads for queue */ /* create threads for queue */
if((r = pthread_create(&conn_sys->ios_thread, NULL, __system_queue_listener, NULL))) { if((r = pthread_create(&ssys->ios_thread, NULL, __system_queue_listener, (void *)ssys))) {
goto __fail_rpc; goto __fail_rpc;
} }
return 0; return 0;
__fail_3_1: __fail_3_1:
free(conn_sys->ioqueue); free(ssys->ioqueue);
__fail_3: __fail_3:
free(conn_sys->ioq); free(ssys->ioq);
__fail_2: __fail_2:
pthread_rwlock_destroy(&(conn_sys->rwlock)); pthread_rwlock_destroy(&(ssys->rwlock));
__fail_1: __fail_1:
free(conn_sys->connections); free(ssys->connections);
__fail: __fail:
free(conn_sys);
return r; return r;
} }
/* load certificates */ /* load certificates */
int connections_subsystem_setsslserts(const char *rootca, const char *certpem, int connections_setsslserts(conn_sys_t *ssys, const char *rootca,
const char *certkey) const char *certpem, const char *certkey)
{ {
int r = ENOMEM; int r = ENOMEM;
if(!conn_sys) return EINVAL; if(!ssys) return EINVAL;
/* simply copying */ /* simply copying */
if(!(conn_sys->rootca = strdup(rootca))) return ENOMEM; if(!(ssys->rootca = strdup(rootca))) return ENOMEM;
if(!(conn_sys->certkey = strdup(certkey))) goto __fail; if(!(ssys->certkey = strdup(certkey))) goto __fail;
if(!(conn_sys->certpem = strdup(certpem))) goto __fail; if(!(ssys->certpem = strdup(certpem))) goto __fail;
r = 0; r = 0;
return 0; return 0;
__fail: __fail:
if(conn_sys->rootca) free(conn_sys->rootca); if(ssys->rootca) free(ssys->rootca);
if(conn_sys->certkey) free(conn_sys->certkey); if(ssys->certkey) free(ssys->certkey);
if(conn_sys->certpem) free(conn_sys->certpem); if(ssys->certpem) free(ssys->certpem);
return r; return r;
} }
int connections_subsystem_setrpclist_function(usrtc_t* (*get_rpc_typed_list_tree)(conn_t *)) int connections_setrpclist_function(conn_sys_t *ssys,
usrtc_t* (*get_rpc_typed_list_tree)
(conn_t *))
{ {
conn_sys->get_rpc_typed_list_tree = get_rpc_typed_list_tree; if(!ssys) return EINVAL;
ssys->get_rpc_typed_list_tree = get_rpc_typed_list_tree;
return 0; return 0;
} }
void __connections_subsystem_connection_remove(conn_t *co) static void __connections_subsystem_connection_remove(conn_t *co)
{ {
pthread_rwlock_wrlock(&conn_sys->rwlock); pthread_rwlock_wrlock(&(co->ssys->rwlock));
usrtc_delete(conn_sys->connections, &(co->csnode)); usrtc_delete(co->ssys->connections, &(co->csnode));
pthread_rwlock_unlock(&conn_sys->rwlock); pthread_rwlock_unlock(&(co->ssys->rwlock));
return; return;
} }
@ -1946,8 +1955,8 @@ void __connections_subsystem_connection_remove(conn_t *co)
/* connection_initiate: perform a connection thru the socket to the /* connection_initiate: perform a connection thru the socket to the
* host with master certificate, i.e. it's a slave one for client. * host with master certificate, i.e. it's a slave one for client.
*/ */
int connection_initiate(conn_t *co, const char *host, int port, int connection_initiate_m(conn_sys_t *ssys, conn_t *co, const char *host,
const char *SSL_cert, perm_ctx_t *pctx) int port, const char *SSL_cert, perm_ctx_t *pctx)
{ {
int r = 0, sd; int r = 0, sd;
int bytes = 0; int bytes = 0;
@ -1955,10 +1964,9 @@ int connection_initiate(conn_t *co, const char *host, int port,
char *buf = NULL; char *buf = NULL;
struct hostent *host_; struct hostent *host_;
struct sockaddr_in addr; struct sockaddr_in addr;
#ifdef WIN32 #ifdef WIN32
WSADATA wsaData; WSADATA wsaData;
#endif #endif
usrtc_t *ch_tree, *rpc_tree; usrtc_t *ch_tree, *rpc_tree;
pth_queue_t *mqueue = malloc(sizeof(pth_queue_t)); pth_queue_t *mqueue = malloc(sizeof(pth_queue_t));
pth_queue_t *rqueue = malloc(sizeof(pth_queue_t)); pth_queue_t *rqueue = malloc(sizeof(pth_queue_t));
@ -1976,9 +1984,9 @@ int connection_initiate(conn_t *co, const char *host, int port,
return r; return r;
} }
#ifdef WIN32 #ifdef WIN32
WSAStartup(MAKEWORD(2, 2), &wsaData); WSAStartup(MAKEWORD(2, 2), &wsaData);
#endif #endif
if(!tpoll) goto __fallenomem; if(!tpoll) goto __fallenomem;
if(!idx_ch) goto __fallenomem; if(!idx_ch) goto __fallenomem;
@ -1987,7 +1995,11 @@ int connection_initiate(conn_t *co, const char *host, int port,
__falleinval: __falleinval:
r = EINVAL; r = EINVAL;
goto __fall0; goto __fall0;
} } else if(!ssys) goto __falleinval;
/* setup connections set */
co->ssys = ssys;
if(!host) goto __falleinval; if(!host) goto __falleinval;
if(!SSL_cert) goto __falleinval; if(!SSL_cert) goto __falleinval;
if(!pctx) goto __falleinval; if(!pctx) goto __falleinval;
@ -2040,7 +2052,7 @@ int connection_initiate(conn_t *co, const char *host, int port,
} }
/* load certificates */ /* load certificates */
SSL_CTX_load_verify_locations(co->ctx, conn_sys->rootca, NULL); SSL_CTX_load_verify_locations(co->ctx, ssys->rootca, NULL);
/* set the local certificate from CertFile */ /* set the local certificate from CertFile */
if(SSL_CTX_use_certificate_file(co->ctx, SSL_cert, if(SSL_CTX_use_certificate_file(co->ctx, SSL_cert,
SSL_FILETYPE_PEM)<=0) { SSL_FILETYPE_PEM)<=0) {
@ -2115,7 +2127,7 @@ int connection_initiate(conn_t *co, const char *host, int port,
/* shutdown connection */ /* shutdown connection */
goto __fail_3; goto __fail_3;
} /* if success we're ready to use established SSL channel */ } /* if success we're ready to use established SSL channel */
// BIO_set_nbio(SSL_get_rbio(co->ssl), 1);
/* auth and RPC contexts sync */ /* auth and RPC contexts sync */
co->pctx = pctx; co->pctx = pctx;
snprintf(buf, __TMPBUFLEN, "(auth-set-context ((:user \"%s\")(:passwd \"%s\")))", snprintf(buf, __TMPBUFLEN, "(auth-set-context ((:user \"%s\")(:passwd \"%s\")))",
@ -2151,7 +2163,7 @@ int connection_initiate(conn_t *co, const char *host, int port,
buf[bytes] = 0; buf[bytes] = 0;
/* perform an rpc call */ /* perform an rpc call */
r = __eval_cstr(buf, conn_sys->system_rpc, (void *)co); r = __eval_cstr(buf, ssys->system_rpc, (void *)co);
if(!r) { /* all is fine security context is good */ if(!r) { /* all is fine security context is good */
snprintf(buf, __TMPBUFLEN, "(ch-get-types)"); /* now we should receive possible channel types */ snprintf(buf, __TMPBUFLEN, "(ch-get-types)"); /* now we should receive possible channel types */
@ -2167,7 +2179,7 @@ int connection_initiate(conn_t *co, const char *host, int port,
buf[bytes] = 0; buf[bytes] = 0;
/* perform an rpc call */ /* perform an rpc call */
r = __eval_cstr(buf, conn_sys->system_rpc, (void *)co); r = __eval_cstr(buf, ssys->system_rpc, (void *)co);
} }
free(buf); /* now we can free the temporary buffer */ free(buf); /* now we can free the temporary buffer */
@ -2178,9 +2190,9 @@ int connection_initiate(conn_t *co, const char *host, int port,
/* add connection to the list */ /* add connection to the list */
usrtc_node_init(&co->csnode, co); usrtc_node_init(&co->csnode, co);
co->flags = (CXCONN_SLAVE | CXCONN_ESTABL); /* set the right flags */ co->flags = (CXCONN_SLAVE | CXCONN_ESTABL); /* set the right flags */
pthread_rwlock_wrlock(&conn_sys->rwlock); pthread_rwlock_wrlock(&ssys->rwlock);
usrtc_insert(conn_sys->connections, &co->csnode, (void *)co->uuid); usrtc_insert(ssys->connections, &co->csnode, (void *)co->uuid);
pthread_rwlock_unlock(&conn_sys->rwlock); pthread_rwlock_unlock(&ssys->rwlock);
} }
r = pthread_create(&co->msgthread, NULL, __msg_queue_thread, (void *)co); r = pthread_create(&co->msgthread, NULL, __msg_queue_thread, (void *)co);
if(r) goto __finalize; if(r) goto __finalize;
@ -2204,7 +2216,8 @@ int connection_initiate(conn_t *co, const char *host, int port,
return r; return r;
} }
int connection_create_fapi(conn_t *co, int sck, struct in_addr *addr) int connection_create_fapi_m(conn_sys_t *ssys, conn_t *co, int sck,
struct in_addr *addr)
{ {
int r = 0; int r = 0;
int bytes = 0; int bytes = 0;
@ -2243,6 +2256,9 @@ int connection_create_fapi(conn_t *co, int sck, struct in_addr *addr)
co->idx_ch = idx_ch; co->idx_ch = idx_ch;
/* setup connections set */
co->ssys = ssys;
/* assign message queue */ /* assign message queue */
r = pth_queue_init(rqueue); r = pth_queue_init(rqueue);
if(r) goto __fail_3; if(r) goto __fail_3;
@ -2264,17 +2280,17 @@ int connection_create_fapi(conn_t *co, int sck, struct in_addr *addr)
} }
/* load certificates */ /* load certificates */
SSL_CTX_load_verify_locations(co->ctx, conn_sys->rootca, NULL); SSL_CTX_load_verify_locations(co->ctx, ssys->rootca, NULL);
/* set the local certificate from CertFile */ /* set the local certificate from CertFile */
if(SSL_CTX_use_certificate_file(co->ctx, conn_sys->certpem, if(SSL_CTX_use_certificate_file(co->ctx, ssys->certpem,
SSL_FILETYPE_PEM)<=0) { SSL_FILETYPE_PEM)<=0) {
printf("certpem1 = %s\n", conn_sys->certpem); printf("certpem1 = %s\n", ssys->certpem);
ERR_print_errors_fp(stderr); ERR_print_errors_fp(stderr);
r = EINVAL; printf("%s:%d\n", __FUNCTION__, __LINE__); r = EINVAL; printf("%s:%d\n", __FUNCTION__, __LINE__);
goto __fail_3; goto __fail_3;
} }
/* set the private key from KeyFile (may be the same as CertFile) */ /* set the private key from KeyFile (may be the same as CertFile) */
if(SSL_CTX_use_PrivateKey_file(co->ctx, conn_sys->certkey, if(SSL_CTX_use_PrivateKey_file(co->ctx, ssys->certkey,
SSL_FILETYPE_PEM)<=0) { SSL_FILETYPE_PEM)<=0) {
r = EINVAL; printf("%s:%d\n", __FUNCTION__, __LINE__); r = EINVAL; printf("%s:%d\n", __FUNCTION__, __LINE__);
goto __fail_3; goto __fail_3;
@ -2307,7 +2323,7 @@ int connection_create_fapi(conn_t *co, int sck, struct in_addr *addr)
SSL_set_accept_state(co->ssl); SSL_set_accept_state(co->ssl);
/* set the context to verify ssl connection */ /* set the context to verify ssl connection */
SSL_set_ex_data(co->ssl, conn_sys->ex_ssldata_index, (void *)co); SSL_set_ex_data(co->ssl, ex_ssldata_index, (void *)co);
//BIO_set_nbio(SSL_get_rbio(co->ssl), 1); //BIO_set_nbio(SSL_get_rbio(co->ssl), 1);
SSL_set_accept_state(co->ssl); SSL_set_accept_state(co->ssl);
if(SSL_accept(co->ssl) == -1) { if(SSL_accept(co->ssl) == -1) {
@ -2317,8 +2333,6 @@ int connection_create_fapi(conn_t *co, int sck, struct in_addr *addr)
goto __fail_3; goto __fail_3;
} /* if success we're ready to use established SSL channel */ } /* if success we're ready to use established SSL channel */
//BIO_set_nbio(SSL_get_rbio(co->ssl), 1);
/*******************************************/ /*******************************************/
/*-=Protocol part of connection establish=-*/ /*-=Protocol part of connection establish=-*/
/*******************************************/ /*******************************************/
@ -2326,7 +2340,7 @@ int connection_create_fapi(conn_t *co, int sck, struct in_addr *addr)
bytes = __conn_read(co, buf, __TMPBUFLEN); bytes = __conn_read(co, buf, __TMPBUFLEN);
if(bytes > 0) { if(bytes > 0) {
buf[bytes] = 0; buf[bytes] = 0;
r = __eval_cstr(buf, conn_sys->system_rpc, (void *)co); r = __eval_cstr(buf, ssys->system_rpc, (void *)co);
if(r) { if(r) {
fprintf(stderr, "Initiation func return %d\n", r); fprintf(stderr, "Initiation func return %d\n", r);
free(buf); free(buf);
@ -2339,7 +2353,7 @@ int connection_create_fapi(conn_t *co, int sck, struct in_addr *addr)
co->flags &= ~CXCONN_ESTABL; co->flags &= ~CXCONN_ESTABL;
co->flags |= CXCONN_BROKEN; co->flags |= CXCONN_BROKEN;
free(buf); free(buf);
if(conn_sys->on_destroy) conn_sys->on_destroy(co); if(ssys->on_destroy) ssys->on_destroy(co);
SSL_shutdown(co->ssl); SSL_shutdown(co->ssl);
r = ESXNOCONNECT; r = ESXNOCONNECT;
goto __fail_3; goto __fail_3;
@ -2348,8 +2362,8 @@ int connection_create_fapi(conn_t *co, int sck, struct in_addr *addr)
} }
/* before it will be done assign rpc list */ /* before it will be done assign rpc list */
if(conn_sys->get_rpc_typed_list_tree) if(ssys->get_rpc_typed_list_tree)
co->rpc_list = conn_sys->get_rpc_typed_list_tree(co); co->rpc_list = ssys->get_rpc_typed_list_tree(co);
free(buf); free(buf);
r = pthread_create(&co->cthread, NULL, __cxmaster_thread_listener, (void *)co); r = pthread_create(&co->cthread, NULL, __cxmaster_thread_listener, (void *)co);
@ -2357,9 +2371,9 @@ int connection_create_fapi(conn_t *co, int sck, struct in_addr *addr)
/* add connection to the list */ /* add connection to the list */
usrtc_node_init(&co->csnode, co); usrtc_node_init(&co->csnode, co);
co->flags |= CXCONN_MASTER; /* set the right flags */ co->flags |= CXCONN_MASTER; /* set the right flags */
pthread_rwlock_wrlock(&conn_sys->rwlock); pthread_rwlock_wrlock(&ssys->rwlock);
usrtc_insert(conn_sys->connections, &co->csnode, (void *)co->uuid); usrtc_insert(ssys->connections, &co->csnode, (void *)co->uuid);
pthread_rwlock_unlock(&conn_sys->rwlock); pthread_rwlock_unlock(&ssys->rwlock);
/* threads poll --- */ /* threads poll --- */
r = pthread_create(&co->msgthread, NULL, __msg_queue_thread, (void *)co); r = pthread_create(&co->msgthread, NULL, __msg_queue_thread, (void *)co);
if(r) goto __fail_3; if(r) goto __fail_3;
@ -2400,11 +2414,6 @@ int connection_close(conn_t *co)
return 0; return 0;
} }
int connection_reinit(conn_t *co) /* TODO: the next version */
{
return ENOSYS;
}
extern int __create_reg_msg(sxmsg_t **msg, chnl_t *ch); extern int __create_reg_msg(sxmsg_t **msg, chnl_t *ch);
extern int __create_sys_msg(sxmsg_t **msg, char *uuid, chnl_t *ch, sxpayload_t *data); extern int __create_sys_msg(sxmsg_t **msg, char *uuid, chnl_t *ch, sxpayload_t *data);
@ -2427,7 +2436,7 @@ static void __connection_free(conn_t *co)
/* since we don't have any threads working with channels destroy them */ /* since we don't have any threads working with channels destroy them */
__destroy_all_channels(co); __destroy_all_channels(co);
/* permission context and callback of exists */ /* permission context and callback of exists */
if(conn_sys->on_destroy) conn_sys->on_destroy(co); if(co->ssys->on_destroy) co->ssys->on_destroy(co);
else { /* we don't have a handler */ else { /* we don't have a handler */
if(co->pctx->login) free(co->pctx->login); if(co->pctx->login) free(co->pctx->login);
if(co->pctx->passwd) free(co->pctx->passwd); if(co->pctx->passwd) free(co->pctx->passwd);