[core] Code cleanup, minor fixes;

v0.5.xx
Alexander Vdolainen 9 years ago
parent 0fde3f330f
commit 9e9ea00df8

@ -79,7 +79,7 @@ struct __sxmsg_t;
*/ */
typedef struct __sxlink_t { typedef struct __sxlink_t {
/* General section */ /* General section */
struct __sxhub_type *ssys; /* < hub subsystem */ struct __sxhub_type *hub; /* < hub subsystem */
char *uuid; /** < uuid of the link */ char *uuid; /** < uuid of the link */
/* Channels section */ /* Channels section */
idx_allocator_t idx_ch; /** < index allocation for channels */ idx_allocator_t idx_ch; /** < index allocation for channels */
@ -354,8 +354,8 @@ int sxhub_setsslserts(sxhub_t *ssys, const char *rootca,
const char *certpem, const char *certkey); const char *certpem, const char *certkey);
/* create links */ /* create links */
sxlink_t *sxlink_master_accept(sxhub_t *ssys, int sck, struct in_addr *addr); sxlink_t *sxlink_master_accept(sxhub_t *hub, int sck, struct in_addr *addr);
sxlink_t *sxlink_connect(sxhub_t *ssys, const char *host, sxlink_t *sxlink_connect(sxhub_t *hub, const char *host,
int port, const char *SSL_cert, const char *login, int port, const char *SSL_cert, const char *login,
const char *passwd); const char *passwd);
int sxlink_close(sxlink_t *co); int sxlink_close(sxlink_t *co);

@ -6,7 +6,7 @@
* - performance optimization * - performance optimization
* *
* (c) Askele Group 2013-2015 <http://askele.com> * (c) Askele Group 2013-2015 <http://askele.com>
* (c) Alexander Vdolainen 2013-2015 <avdolainen@gmail.com> * (c) Alexander Vdolainen 2013-2015, 2016 <avdolainen@zoho.com>
* *
* libsxmp is free software: you can redistribute it and/or modify it * libsxmp is free software: you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published * under the terms of the GNU Lesser General Public License as published
@ -98,8 +98,8 @@ static void __destroy_rpc_list_tree(usrtc_t *tree)
static int __set_credentials(void *cctx, sexp_t *sx) static int __set_credentials(void *cctx, sexp_t *sx)
{ {
register int idx; register int idx;
sxlink_t *co = (sxlink_t *)cctx; sxlink_t *link = (sxlink_t *)cctx;
sxhub_t *ssys = co->ssys; sxhub_t *hub = link->hub;
sexp_t *isx; sexp_t *isx;
char *login = NULL; char *login = NULL;
char *passwd = NULL; char *passwd = NULL;
@ -116,22 +116,22 @@ static int __set_credentials(void *cctx, sexp_t *sx)
if(!login || !passwd) return SXE_BADPROTO; if(!login || !passwd) return SXE_BADPROTO;
co->pctx->login = strdup(login); link->pctx->login = strdup(login);
co->pctx->passwd = strdup(passwd); link->pctx->passwd = strdup(passwd);
if(!co->pctx->login || !co->pctx->passwd) { if(!link->pctx->login || !link->pctx->passwd) {
if(co->pctx->login) free(co->pctx->login); if(link->pctx->login) free(link->pctx->login);
if(co->pctx->passwd) free(co->pctx->passwd); if(link->pctx->passwd) free(link->pctx->passwd);
return SXE_ENOMEM; return SXE_ENOMEM;
} }
if(ssys->secure_check) return ssys->secure_check(co); if(hub->secure_check) return hub->secure_check(link);
else return SXE_SUCCESS; else return SXE_SUCCESS;
} }
static int __get_channels_list(void *cctx, sexp_t *sx) static int __get_channels_list(void *cctx, sexp_t *sx)
{ {
sxlink_t *co = (sxlink_t *)cctx; sxlink_t *co = (sxlink_t *)cctx;
sxhub_t *ssys = co->ssys; sxhub_t *ssys = co->hub;
sxmsg_t *msg = co->messages[0]; sxmsg_t *msg = co->messages[0];
char *buf = msg->payload; char *buf = msg->payload;
usrtc_node_t *node; usrtc_node_t *node;
@ -174,7 +174,7 @@ static int __get_channels_list(void *cctx, sexp_t *sx)
static int __get_streams(void *cctx, sexp_t *sx) static int __get_streams(void *cctx, sexp_t *sx)
{ {
sxlink_t *link = (sxlink_t *)cctx; sxlink_t *link = (sxlink_t *)cctx;
sxhub_t *hub = link->ssys; sxhub_t *hub = link->hub;
sxmsg_t *msg = link->messages[0]; sxmsg_t *msg = link->messages[0];
char *buf = msg->payload; char *buf = msg->payload;
usrtc_node_t *node, *rpc_node; usrtc_node_t *node, *rpc_node;
@ -398,7 +398,7 @@ static int __set_channels_list(void *cctx, sexp_t *sx)
{ {
register int idx; register int idx;
sxlink_t *co = (sxlink_t *)cctx; sxlink_t *co = (sxlink_t *)cctx;
sxhub_t *ssys = co->ssys; sxhub_t *ssys = co->hub;
sexp_t *isx, *iisx; sexp_t *isx, *iisx;
int id, r; int id, r;
@ -679,3 +679,114 @@ int sxhub_stream_register(sxhub_t *hub, const struct sxstream_description *s_des
return 0; return 0;
} }
/* internally used functions */
static int __verify_certcall_dummy(int preverify_ok, X509_STORE_CTX *ctx)
{
return preverify_ok;
}
int ex_ssldata_index;
/* this is a callback to perform a custom SSL certs chain validation,
* as I promised here the comments, a lot of ...
* The first shit: 0 means validation failed, 1 otherwise
* The second shit: X509 API, I guess u will love it ;-)
* openssl calls this function for each certificate in chain,
* since our case is a simple (depth of chain is one, since we're
* don't care for public certificates lists or I cannot find any reasons to
* do it ...), amount of calls reduced, and in this case we're interested
* only in top of chain i.e. actual certificate used on client side,
* the validity of signing for other certificates within chain is
* guaranteed by the ssl itself.
* u know, we need to lookup in database, or elsewhere... some information
* about client certificate, and decide - is it valid, or not?, if so
* yep I mean it's valid, we can assign it's long fucking number to
* security context, to use in ongoing full scaled connection handshaking.
*/
static int __verify_certcall(int preverify_ok, X509_STORE_CTX *ctx)
{
// X509 *cert = X509_STORE_CTX_get_current_cert(ctx);
int err = X509_STORE_CTX_get_error(ctx), depth = X509_STORE_CTX_get_error_depth(ctx);
SSL *ssl = X509_STORE_CTX_get_ex_data(ctx, SSL_get_ex_data_X509_STORE_CTX_idx());
sxlink_t *link = SSL_get_ex_data(ssl, ex_ssldata_index); /* this is a custom data we're set before */
sxhub_t *hub = link->hub;
/* now we need to check for certificates with a long chain,
* so since we have a short one, reject long ones */
if(depth > VERIFY_DEPTH) { /* longer than we expect */
preverify_ok = 0; /* yep, 0 means error for those function callback in openssl, fucking set */
err = X509_V_ERR_CERT_CHAIN_TOO_LONG;
X509_STORE_CTX_set_error(ctx, err);
}
if(!preverify_ok) return 0;
/* ok, now we're on top of SSL (depth == 0) certs chain,
* and we can validate client certificate */
if(!depth) {
link->pctx->certid = ASN1_INTEGER_get((const ASN1_INTEGER *)X509_get_serialNumber(ctx->current_cert));
/* now we're need to check the ssl cert */
if(hub->validate_sslpem) {
if(hub->validate_sslpem(link)) return 0;
else return 1;
} else return 0;
}
return preverify_ok;
}
int _sxhub_settls_ctx(sxhub_t *hub, const char *crtfile)
{
if(!hub->ctx) {
/* init SSL certificates and context */
if(!(hub->ctx = SSL_CTX_new(TLSv1_2_client_method()))) return SXE_ENOMEM;
/* set verify context */
SSL_CTX_set_verify(hub->ctx, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT,
__verify_certcall_dummy);
/* set verify depth */
SSL_CTX_set_verify_depth(hub->ctx, VERIFY_DEPTH);
/* load certificates */
SSL_CTX_load_verify_locations(hub->ctx, hub->rootca, NULL);
}
/* set the local certificate from CertFile */
if(SSL_CTX_use_certificate_file(hub->ctx, crtfile, SSL_FILETYPE_PEM)<=0) return SXE_ESSL;
/* set the private key from KeyFile (may be the same as CertFile) */
if(SSL_CTX_use_PrivateKey_file(hub->ctx, crtfile, SSL_FILETYPE_PEM)<=0) return SXE_ESSL;
/* verify private key */
if (!SSL_CTX_check_private_key(hub->ctx)) return SXE_ESSL;
return SXE_SUCCESS;
}
int _sxhub_settls_ctx_s(sxhub_t *hub)
{
if(hub->ctx) return SXE_SUCCESS;
/* init SSL certificates and context */
if(!(hub->ctx = SSL_CTX_new(TLSv1_2_server_method()))) return SXE_ENOMEM;
/* set verify context */
SSL_CTX_set_verify(hub->ctx, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT,
__verify_certcall);
/* set verify depth */
SSL_CTX_set_verify_depth(hub->ctx, VERIFY_DEPTH);
/* set cache policy */
SSL_CTX_set_session_cache_mode(hub->ctx, SSL_SESS_CACHE_OFF);
SSL_CTX_set_mode(hub->ctx, SSL_MODE_RELEASE_BUFFERS);
/* load certificates */
SSL_CTX_load_verify_locations(hub->ctx, hub->rootca, NULL);
/* set the local certificate from CertFile */
if(SSL_CTX_use_certificate_file(hub->ctx, hub->certpem, SSL_FILETYPE_PEM) <= 0) return SXE_ESSL;
/* set the private key from KeyFile (may be the same as CertFile) */
if(SSL_CTX_use_PrivateKey_file(hub->ctx, hub->certpem, SSL_FILETYPE_PEM) <= 0) return SXE_ESSL;
/* verify private key */
if (!SSL_CTX_check_private_key(hub->ctx)) return SXE_ESSL;
return SXE_SUCCESS;
}

@ -39,6 +39,10 @@
#define _SXSTREAMBREAD_CMD "!#brs>" #define _SXSTREAMBREAD_CMD "!#brs>"
#define _SXSTREAMBWRITE_CMD "!#bws>" #define _SXSTREAMBWRITE_CMD "!#bws>"
/* hub related */
int _sxhub_settls_ctx(sxhub_t *hub, const char *crtfile);
int _sxhub_settls_ctx_s(sxhub_t *hub);
/* link related */ /* link related */
int _sxmpl_writemsg(sxlink_t *co, sxmsg_t *msg); int _sxmpl_writemsg(sxlink_t *co, sxmsg_t *msg);
int _sxmpl_rapidwrite(sxlink_t *link, sxmsg_t *msg); int _sxmpl_rapidwrite(sxlink_t *link, sxmsg_t *msg);

@ -46,7 +46,7 @@
void _message_process(sxmsg_t *msg) void _message_process(sxmsg_t *msg)
{ {
sxchnl_t *chan = msg->pch; sxchnl_t *chan = msg->pch;
sxhub_t *hub = chan->link->ssys; sxhub_t *hub = chan->link->hub;
sexp_t *sx, *isx; sexp_t *sx, *isx;
usrtc_t *listrpc; usrtc_t *listrpc;
usrtc_node_t *node; usrtc_node_t *node;

@ -846,7 +846,7 @@ int _builtin_stream_open(void *m, sexp_t *sx)
} }
/* check availability */ /* check availability */
hub = link->ssys; hub = link->hub;
if(!hub->streams) { if(!hub->streams) {
__nostream: __nostream:
r = SXE_NOSUCHSTREAMTYPE; r = SXE_NOSUCHSTREAMTYPE;

@ -301,67 +301,12 @@ static void __sxmpl_bundle_destroy(sxmplv2_bundle_t *n)
return; return;
} }
static int ex_ssldata_index; /** < index used to work with additional data extern int ex_ssldata_index; /** < index used to work with additional data
* provided to the special call during SSL handshake */ * provided to the special call during SSL handshake */
/* this function is an ugly implementation to get C string with uuid */ /* this function is an ugly implementation to get C string with uuid */
extern char *__generate_uuid(void); extern char *__generate_uuid(void);
/* this is a callback to perform a custom SSL certs chain validation,
* as I promised here the comments, a lot of ...
* The first shit: 0 means validation failed, 1 otherwise
* The second shit: X509 API, I guess u will love it ;-)
* openssl calls this function for each certificate in chain,
* since our case is a simple (depth of chain is one, since we're
* don't care for public certificates lists or I cannot find any reasons to
* do it ...), amount of calls reduced, and in this case we're interested
* only in top of chain i.e. actual certificate used on client side,
* the validity of signing for other certificates within chain is
* guaranteed by the ssl itself.
* u know, we need to lookup in database, or elsewhere... some information
* about client certificate, and decide - is it valid, or not?, if so
* yep I mean it's valid, we can assign it's long fucking number to
* security context, to use in ongoing full scaled connection handshaking.
*/
static int __verify_certcall(int preverify_ok, X509_STORE_CTX *ctx)
{
// X509 *cert = X509_STORE_CTX_get_current_cert(ctx);
int err = X509_STORE_CTX_get_error(ctx), depth = X509_STORE_CTX_get_error_depth(ctx);
SSL *ssl = X509_STORE_CTX_get_ex_data(ctx, SSL_get_ex_data_X509_STORE_CTX_idx());
sxlink_t *co = SSL_get_ex_data(ssl, ex_ssldata_index); /* this is a custom data we're set before */
sxhub_t *ssys = co->ssys;
/* now we need to check for certificates with a long chain,
* so since we have a short one, reject long ones */
if(depth > VERIFY_DEPTH) { /* longer than we expect */
preverify_ok = 0; /* yep, 0 means error for those function callback in openssl, fucking set */
err = X509_V_ERR_CERT_CHAIN_TOO_LONG;
X509_STORE_CTX_set_error(ctx, err);
}
if(!preverify_ok) return 0;
/* ok, now we're on top of SSL (depth == 0) certs chain,
* and we can validate client certificate */
if(!depth) {
co->pctx->certid =
ASN1_INTEGER_get((const ASN1_INTEGER *)X509_get_serialNumber(ctx->current_cert));
/* now we're need to check the ssl cert */
if(ssys->validate_sslpem) {
if(ssys->validate_sslpem(co)) return 0;
else return 1;
} else return 0;
}
return preverify_ok;
}
/* dummy just to check the server side */
static int __verify_certcall_dummy(int preverify_ok, X509_STORE_CTX *ctx)
{
return preverify_ok;
}
static pthread_mutex_t *lock_cs; static pthread_mutex_t *lock_cs;
static long *lock_count; static long *lock_count;
@ -540,15 +485,15 @@ static void __link_minimal_free(sxlink_t *co)
return; return;
} }
static int __eval_sysrpc(sxlink_t *co, sexp_t *sx, int builtin) static int __eval_sysrpc(sxlink_t *link, sexp_t *sx, int builtin)
{ {
sxl_rpclist_t *rpc_list; sxl_rpclist_t *rpc_list;
usrtc_node_t *node; usrtc_node_t *node;
sxl_rpc_t *rentry; sxl_rpc_t *rentry;
char *rpcf; char *rpcf;
if(builtin) rpc_list = co->ssys->stream_rpc; if(builtin) rpc_list = link->hub->stream_rpc;
else rpc_list = co->ssys->system_rpc; else rpc_list = link->hub->system_rpc;
if(sx->ty == SEXP_LIST) rpcf = sx->list->val; if(sx->ty == SEXP_LIST) rpcf = sx->list->val;
else return SXE_BADPROTO; else return SXE_BADPROTO;
@ -560,7 +505,7 @@ static int __eval_sysrpc(sxlink_t *co, sexp_t *sx, int builtin)
else rentry = (sxl_rpc_t *)usrtc_node_getdata(node); else rentry = (sxl_rpc_t *)usrtc_node_getdata(node);
/* call it */ /* call it */
return rentry->rpcf((void *)co, sx); return rentry->rpcf((void *)link, sx);
} }
static inline int __eval_syssexp(sxlink_t *co, sexp_t *sx) static inline int __eval_syssexp(sxlink_t *co, sexp_t *sx)
@ -574,32 +519,32 @@ static inline int __eval_builtinsexp(sxlink_t *co, sexp_t *sx)
} }
#ifdef _NO_SXMPMP #ifdef _NO_SXMPMP
#define _CONN_INUSE(co) (co)->usecount++; #define _CONN_INUSE(lo) (lo)->usecount++;
#define _CONN_NOTINUSE(co) (co)->usecount--; #define _CONN_NOTINUSE(lo) (lo)->usecount--;
#define _CONN_UCOUNT(co) (co)->usecount #define _CONN_UCOUNT(lo) (lo)->usecount
#else #else
static inline void _CONN_INUSE(sxlink_t *co) { static inline void _CONN_INUSE(sxlink_t *l) {
pthread_rwlock_wrlock(&co->ssys->rwlock); pthread_rwlock_wrlock(&l->hub->rwlock);
co->usecount++; l->usecount++;
pthread_rwlock_unlock(&co->ssys->rwlock); pthread_rwlock_unlock(&l->hub->rwlock);
} }
static inline void _CONN_NOTINUSE(sxlink_t *co) { static inline void _CONN_NOTINUSE(sxlink_t *l) {
pthread_rwlock_wrlock(&co->ssys->rwlock); pthread_rwlock_wrlock(&l->hub->rwlock);
co->usecount--; l->usecount--;
pthread_rwlock_unlock(&co->ssys->rwlock); pthread_rwlock_unlock(&l->hub->rwlock);
} }
static inline int _CONN_UCOUNT(sxlink_t *co) { static inline int _CONN_UCOUNT(sxlink_t *l) {
int r; int r;
pthread_rwlock_rdlock(&co->ssys->rwlock); pthread_rwlock_rdlock(&l->hub->rwlock);
r = co->usecount; r = l->usecount;
pthread_rwlock_unlock(&co->ssys->rwlock); pthread_rwlock_unlock(&l->hub->rwlock);
return r; return r;
} }
#endif #endif
static void __link_destroy(sxlink_t *co) static void __link_destroy(sxlink_t *l)
{ {
int i = 0, fd; int i = 0, fd;
sxmsg_t *msg, *omsg; sxmsg_t *msg, *omsg;
@ -607,12 +552,12 @@ static void __link_destroy(sxlink_t *co)
list_node_t *iter, *siter; list_node_t *iter, *siter;
sxchnl_t *chan; sxchnl_t *chan;
sxmplv2_head_t *head; sxmplv2_head_t *head;
sxhub_t *ssys = co->ssys; sxhub_t *hub = l->hub;
/* first we will unpin all messages and mark it as errors on */ /* first we will unpin all messages and mark it as errors on */
if(co->pending_messages) { if(l->pending_messages) {
pthread_mutex_lock(&co->write_pending_lock); pthread_mutex_lock(&l->write_pending_lock);
list_for_each_safe(&co->write_pending, iter, siter) { list_for_each_safe(&l->write_pending, iter, siter) {
ppm = container_of(iter, sxppmsg_t, node); ppm = container_of(iter, sxppmsg_t, node);
omsg = ppm->msg; omsg = ppm->msg;
@ -627,9 +572,9 @@ static void __link_destroy(sxlink_t *co)
pthread_mutex_unlock(&omsg->wait); pthread_mutex_unlock(&omsg->wait);
} }
free(ppm); free(ppm);
co->pending_messages--; l->pending_messages--;
} }
pthread_mutex_unlock(&co->write_pending_lock); pthread_mutex_unlock(&l->write_pending_lock);
} }
/* free queue */ /* free queue */
@ -638,55 +583,54 @@ static void __link_destroy(sxlink_t *co)
ERR_free_strings(); ERR_free_strings();
/* update use count */ /* update use count */
_CONN_NOTINUSE(co); _CONN_NOTINUSE(l);
/* ok, let's free other if we can */ /* ok, let's free other if we can */
if(!_CONN_UCOUNT(co)) { if(!_CONN_UCOUNT(l)) {
/* go thru messages */ /* go thru messages */
pthread_mutex_lock(&co->idx_msg_lock); pthread_mutex_lock(&l->idx_msg_lock);
for(i = 0; i < MAX_MSGINPROCESS; i++) { for(i = 0; i < MAX_MSGINPROCESS; i++) {
msg = co->messages[i]; msg = l->messages[i];
if(!msg) continue; if(!msg) continue;
else head = &msg->mhead; else head = &msg->mhead;
head->opcode = SXE_LINKERROR; head->opcode = SXE_LINKERROR;
pthread_mutex_unlock(&msg->wait); pthread_mutex_unlock(&msg->wait);
pthread_mutex_destroy(&msg->wait); pthread_mutex_destroy(&msg->wait);
free(msg); free(msg);
co->messages[i] = NULL; l->messages[i] = NULL;
idx_free(&co->idx_msg, i); idx_free(&l->idx_msg, i);
} }
pthread_mutex_unlock(&co->idx_msg_lock); pthread_mutex_unlock(&l->idx_msg_lock);
/* ok now we will free the channels */ /* ok now we will free the channels */
pthread_mutex_lock(&co->idx_ch_lock); pthread_mutex_lock(&l->idx_ch_lock);
for(i = 0; i < 512; i++) { for(i = 0; i < 512; i++) {
chan = co->channels[i]; chan = l->channels[i];
if(!chan) continue; if(!chan) continue;
idx_free(&co->idx_ch, i); idx_free(&l->idx_ch, i);
free(chan); free(chan);
} }
pthread_mutex_unlock(&co->idx_ch_lock); pthread_mutex_unlock(&l->idx_ch_lock);
if(ssys->on_destroy) ssys->on_destroy(co); if(hub->on_destroy) hub->on_destroy(l);
if(co->pctx->login) free(co->pctx->login); if(l->pctx->login) free(l->pctx->login);
if(co->pctx->passwd) free(co->pctx->passwd); if(l->pctx->passwd) free(l->pctx->passwd);
SSL_set_shutdown(co->ssl, SSL_RECEIVED_SHUTDOWN | SSL_SENT_SHUTDOWN); SSL_set_shutdown(l->ssl, SSL_RECEIVED_SHUTDOWN | SSL_SENT_SHUTDOWN);
fd = SSL_get_fd(co->ssl); fd = SSL_get_fd(l->ssl);
SSL_free(co->ssl); SSL_free(l->ssl);
co->ssl = NULL; l->ssl = NULL;
ERR_remove_thread_state(0); ERR_remove_thread_state(0);
ERR_remove_state(0); ERR_remove_state(0);
ERR_free_strings(); ERR_free_strings();
close(fd); close(fd);
__link_second_free(co); __link_second_free(l);
__link_minimal_free(co); __link_minimal_free(l);
} }
return; return;
@ -866,7 +810,7 @@ static void *__sxmpl_thread(void *b)
/* FIXME: special sxmpv2.1 top layer messages - should be here */ /* FIXME: special sxmpv2.1 top layer messages - should be here */
if(mhead->opcode == SXE_RAPIDMSG) { /* custom pulse */ if(mhead->opcode == SXE_RAPIDMSG) { /* custom pulse */
sx = parse_sexp(bbuf, mhead->payload_length); sx = parse_sexp(bbuf, mhead->payload_length);
if(sx && co->ssys->on_pulse) co->ssys->on_pulse(co, sx); if(sx && co->hub->on_pulse) co->hub->on_pulse(co, sx);
if(sx) destroy_sexp(sx); if(sx) destroy_sexp(sx);
} }
} }
@ -997,11 +941,11 @@ static void *__sxmpl_thread(void *b)
return NULL; return NULL;
} }
sxlink_t *sxlink_master_accept(sxhub_t *ssys, int sck, struct in_addr *addr) sxlink_t *sxlink_master_accept(sxhub_t *hub, int sck, struct in_addr *addr)
{ {
void *buf = NULL; void *buf = NULL;
char *bbuf; char *bbuf;
sxlink_t *co = __link_minimal_alloc(addr); sxlink_t *link = __link_minimal_alloc(addr);
sxmsg_t *msg = NULL; sxmsg_t *msg = NULL;
sxmplv2_head_t *head; sxmplv2_head_t *head;
sxmplv2_bundle_t *bundle; sxmplv2_bundle_t *bundle;
@ -1009,65 +953,28 @@ sxlink_t *sxlink_master_accept(sxhub_t *ssys, int sck, struct in_addr *addr)
size_t rd; size_t rd;
int r = SXE_FAILED, i; int r = SXE_FAILED, i;
if(!co) { if(!link) {
errno = SXE_ENOMEM; errno = SXE_ENOMEM;
return NULL; return NULL;
} } else link->hub = hub;
/* ok, now we need to init ssl stuff */ /* ok, now we need to init ssl stuff */
co->ssys = ssys;
/* check up - do we need to initialize SSL context? */ /* check up - do we need to initialize SSL context? */
if(!ssys->ctx) { r = _sxhub_settls_ctx_s(hub);
/* init SSL certificates and context */ if(r != SXE_SUCCESS) goto __fail;
ssys->ctx = SSL_CTX_new(TLSv1_2_server_method());
if(!ssys->ctx) { r = SXE_ENOMEM; goto __fail; }
else {
/* set verify context */
SSL_CTX_set_verify(ssys->ctx, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT,
__verify_certcall);
/* set verify depth */
SSL_CTX_set_verify_depth(ssys->ctx, VERIFY_DEPTH);
/* set cache policy */
SSL_CTX_set_session_cache_mode(ssys->ctx, SSL_SESS_CACHE_OFF);
SSL_CTX_set_mode(ssys->ctx, SSL_MODE_RELEASE_BUFFERS);
}
/* load certificates */
SSL_CTX_load_verify_locations(ssys->ctx, ssys->rootca, NULL);
/* set the local certificate from CertFile */
if(SSL_CTX_use_certificate_file(ssys->ctx, ssys->certpem,
SSL_FILETYPE_PEM)<=0) {
r = SXE_ESSL;
goto __fail;
}
/* set the private key from KeyFile (may be the same as CertFile) */
if(SSL_CTX_use_PrivateKey_file(ssys->ctx, ssys->certkey,
SSL_FILETYPE_PEM)<=0) {
r = SXE_ESSL;
goto __fail;
}
/* verify private key */
if (!SSL_CTX_check_private_key(ssys->ctx)) {
r = SXE_ESSL;
goto __fail;
}
}
/* now we will create an SSL connection */ /* now we will create an SSL connection */
co->ssl = SSL_new(ssys->ctx); link->ssl = SSL_new(hub->ctx);
if(!co->ssl) { r = SXE_ENOMEM; goto __fail; } if(!link->ssl) { r = SXE_ENOMEM; goto __fail; }
else SSL_set_fd(co->ssl, sck); /* attach connected socket */ else SSL_set_fd(link->ssl, sck); /* attach connected socket */
/* set the context to verify ssl connection */ /* set the context to verify ssl connection */
SSL_set_ex_data(co->ssl, ex_ssldata_index, (void *)co); SSL_set_ex_data(link->ssl, ex_ssldata_index, (void *)link);
SSL_set_accept_state(co->ssl); SSL_set_accept_state(link->ssl);
if(SSL_accept(co->ssl) == -1) { r = SXE_EPERM; goto __fail; } /* leak here ? */ if(SSL_accept(link->ssl) == -1) { r = SXE_EPERM; goto __fail; } /* leak here ? */
/* set connection to the batch mode */ /* set connection to the batch mode */
co->flags |= SXMP_BATCHMODE; link->flags |= SXMP_BATCHMODE;
/* allocate our first buffer */ /* allocate our first buffer */
buf = mmap(NULL, 65536, PROT_READ|PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); buf = mmap(NULL, 65536, PROT_READ|PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if(buf == MAP_FAILED) { r = SXE_ENOMEM; goto __fail2; } if(buf == MAP_FAILED) { r = SXE_ENOMEM; goto __fail2; }
@ -1075,13 +982,13 @@ sxlink_t *sxlink_master_accept(sxhub_t *ssys, int sck, struct in_addr *addr)
if(!(msg = malloc(sizeof(sxmsg_t)))) { r = SXE_ENOMEM; goto __fail2; } if(!(msg = malloc(sizeof(sxmsg_t)))) { r = SXE_ENOMEM; goto __fail2; }
else { else {
memset(msg, 0, sizeof(sxmsg_t)); memset(msg, 0, sizeof(sxmsg_t));
co->messages[0] = msg; link->messages[0] = msg;
} }
bbuf = (char *)buf; bbuf = (char *)buf;
bbuf += sizeof(sxmplv2_head_t); bbuf += sizeof(sxmplv2_head_t);
while(co->flags & SXMP_BATCHMODE) { while(link->flags & SXMP_BATCHMODE) {
rd = __conn_read(co, buf, sizeof(sxmplv2_head_t)); rd = __conn_read(link, buf, sizeof(sxmplv2_head_t));
if(rd == sizeof(sxmplv2_head_t)) { if(rd == sizeof(sxmplv2_head_t)) {
head = (sxmplv2_head_t *)buf; head = (sxmplv2_head_t *)buf;
@ -1089,15 +996,15 @@ sxlink_t *sxlink_master_accept(sxhub_t *ssys, int sck, struct in_addr *addr)
if(head->opcode != SXE_SUCCESS) { r = head->opcode; goto __fail3; } if(head->opcode != SXE_SUCCESS) { r = head->opcode; goto __fail3; }
else { /* opcode is fine */ else { /* opcode is fine */
/* if we're ready for messaging mode, turn off batch mode */ /* if we're ready for messaging mode, turn off batch mode */
if(co->flags & SXMP_MESSAGINGMODE) { if(link->flags & SXMP_MESSAGINGMODE) {
co->flags &= ~SXMP_BATCHMODE; link->flags &= ~SXMP_BATCHMODE;
break; break;
} }
} }
if(!head->payload_length) continue; /* pass the following check up */ if(!head->payload_length) continue; /* pass the following check up */
rd = __conn_read(co, bbuf, head->payload_length); rd = __conn_read(link, bbuf, head->payload_length);
if(rd == -1) { r = SXE_LINKERROR; goto __fail3; } if(rd == -1) { r = SXE_LINKERROR; goto __fail3; }
if(rd != head->payload_length) { r = SXE_LINKERROR; goto __fail3; } if(rd != head->payload_length) { r = SXE_LINKERROR; goto __fail3; }
bbuf[rd] = '\0'; bbuf[rd] = '\0';
@ -1109,17 +1016,17 @@ sxlink_t *sxlink_master_accept(sxhub_t *ssys, int sck, struct in_addr *addr)
msg->payload = bbuf; msg->payload = bbuf;
msg->mhead.payload_length = 0; msg->mhead.payload_length = 0;
/* deal with it */ /* deal with it */
r = __eval_syssexp(co, sx); r = __eval_syssexp(link, sx);
memcpy(head, &msg->mhead, sizeof(sxmplv2_head_t)); memcpy(head, &msg->mhead, sizeof(sxmplv2_head_t));
head->opcode = r; head->opcode = r;
if(r != SXE_SUCCESS) { /* we finish */ if(r != SXE_SUCCESS) { /* we finish */
head->payload_length = 0; head->payload_length = 0;
__conn_write(co, head, sizeof(sxmplv2_head_t)); __conn_write(link, head, sizeof(sxmplv2_head_t));
destroy_sexp(sx); destroy_sexp(sx);
goto __fail3; goto __fail3;
} }
rd = __conn_write(co, buf, sizeof(sxmplv2_head_t) + msg->mhead.payload_length); rd = __conn_write(link, buf, sizeof(sxmplv2_head_t) + msg->mhead.payload_length);
if(rd == -1) { r = SXE_LINKERROR; goto __fail3; } if(rd == -1) { r = SXE_LINKERROR; goto __fail3; }
if(rd != sizeof(sxmplv2_head_t) + msg->mhead.payload_length) { if(rd != sizeof(sxmplv2_head_t) + msg->mhead.payload_length) {
destroy_sexp(sx); destroy_sexp(sx);
@ -1131,33 +1038,33 @@ sxlink_t *sxlink_master_accept(sxhub_t *ssys, int sck, struct in_addr *addr)
} }
/* if we're there - negotiation is done, going to init messaging mode */ /* if we're there - negotiation is done, going to init messaging mode */
r = __link_second_alloc(co); r = __link_second_alloc(link);
if(r != SXE_SUCCESS) goto __fail3; if(r != SXE_SUCCESS) goto __fail3;
/* free message */ /* free message */
co->messages[0] = NULL; link->messages[0] = NULL;
free(msg); free(msg);
/* and now we're need to create a thread poll */ /* and now we're need to create a thread poll */
if(!(bundle = malloc(sizeof(sxmplv2_bundle_t)))) { r = SXE_ENOMEM; goto __fail4; } if(!(bundle = malloc(sizeof(sxmplv2_bundle_t)))) { r = SXE_ENOMEM; goto __fail4; }
else { else {
bundle->buf = buf; bundle->buf = buf;
bundle->conn = co; bundle->conn = link;
} }
for(i = 0; i < MAX_SXMPLTHREADS; i++) { for(i = 0; i < MAX_SXMPLTHREADS; i++) {
if(bundle == (void *)0xdead) bundle = __sxmpl_bundle_create(co); if(bundle == (void *)0xdead) bundle = __sxmpl_bundle_create(link);
if(!bundle) goto __fail5; if(!bundle) goto __fail5;
r = pthread_create(&co->thrd_poll[i], NULL, __sxmpl_thread, bundle); /* and here, alloc tls */ r = pthread_create(&link->thrd_poll[i], NULL, __sxmpl_thread, bundle); /* and here, alloc tls */
if(r) goto __fail5; if(r) goto __fail5;
else { else {
bundle = (void *)0xdead; bundle = (void *)0xdead;
pthread_detach(co->thrd_poll[i]); pthread_detach(link->thrd_poll[i]);
} }
} }
/* all is done, connection now ready */ /* all is done, connection now ready */
co->flags |= SXMP_ALIVE; link->flags |= SXMP_ALIVE;
r = SXE_SUCCESS; r = SXE_SUCCESS;
errno = r; errno = r;
@ -1165,29 +1072,29 @@ sxlink_t *sxlink_master_accept(sxhub_t *ssys, int sck, struct in_addr *addr)
/* free context for this thread */ /* free context for this thread */
ERR_remove_state(0); ERR_remove_state(0);
return co; return link;
__fail5: __fail5:
r = SXE_ENOMEM; r = SXE_ENOMEM;
/* bundles will be freed by the threads when SSL_read will fails. */ /* bundles will be freed by the threads when SSL_read will fails. */
__fail4: __fail4:
__link_second_free(co); __link_second_free(link);
__fail3: __fail3:
if(ssys->on_destroy) ssys->on_destroy(co); if(hub->on_destroy) hub->on_destroy(link);
__fail2: __fail2:
if(msg) free(msg); if(msg) free(msg);
if(buf != MAP_FAILED) munmap(buf, 65536); if(buf != MAP_FAILED) munmap(buf, 65536);
SSL_shutdown(co->ssl); SSL_shutdown(link->ssl);
__fail: __fail:
if(co) { if(link) {
if(co->ssl) { if(link->ssl) {
ERR_remove_thread_state(0); ERR_remove_thread_state(0);
ERR_remove_state(0); ERR_remove_state(0);
ERR_free_strings(); ERR_free_strings();
SSL_free(co->ssl); SSL_free(link->ssl);
} }
__link_minimal_free(co); __link_minimal_free(link);
} }
close(sck); close(sck);
errno = r; errno = r;
@ -1201,11 +1108,11 @@ enum {
_SYNC_ON_STREAMS = 2, _SYNC_ON_STREAMS = 2,
}; };
sxlink_t *sxlink_connect(sxhub_t *ssys, const char *host, sxlink_t *sxlink_connect(sxhub_t *hub, const char *host,
int port, const char *SSL_cert, const char *login, int port, const char *SSL_cert, const char *login,
const char *passwd) const char *passwd)
{ {
sxlink_t *co = __link_minimal_alloc(NULL); sxlink_t *link = __link_minimal_alloc(NULL);
struct hostent *host_; struct hostent *host_;
struct sockaddr_in addr; struct sockaddr_in addr;
int r = SXE_SUCCESS, sck, i, sync_state = _SYNC_ON_CHANNELS; int r = SXE_SUCCESS, sck, i, sync_state = _SYNC_ON_CHANNELS;
@ -1223,66 +1130,16 @@ sxlink_t *sxlink_connect(sxhub_t *ssys, const char *host,
r = SXE_IGNORED; r = SXE_IGNORED;
if(!host || !SSL_cert) goto __fail; if(!host || !SSL_cert) goto __fail;
if(!co) { r = SXE_ENOMEM; goto __fail; } if(!link) { r = SXE_ENOMEM; goto __fail; }
link->hub = hub;
#ifdef WIN32 #ifdef WIN32
WSAStartup(MAKEWORD(2, 2), &wsaData); WSAStartup(MAKEWORD(2, 2), &wsaData);
#endif #endif
/* ok, now we need to init ssl stuff */
co->ssys = ssys;
/* check up ssl context */ /* check up ssl context */
if(!ssys->ctx) { r = _sxhub_settls_ctx(hub, SSL_cert);
/* init SSL certificates and context */ if(r != SXE_SUCCESS) goto __fail;
ssys->ctx = SSL_CTX_new(TLSv1_2_client_method());
if(!ssys->ctx) { r = SXE_ENOMEM; goto __fail; }
else {
/* set verify context */
SSL_CTX_set_verify(ssys->ctx, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT,
__verify_certcall_dummy);
/* set verify depth */
SSL_CTX_set_verify_depth(ssys->ctx, VERIFY_DEPTH);
}
/* load certificates */
SSL_CTX_load_verify_locations(ssys->ctx, ssys->rootca, NULL);
/* set the local certificate from CertFile */
if(SSL_CTX_use_certificate_file(ssys->ctx, SSL_cert,
SSL_FILETYPE_PEM)<=0) {
r = SXE_ESSL;
goto __fail;
}
/* set the private key from KeyFile (may be the same as CertFile) */
if(SSL_CTX_use_PrivateKey_file(ssys->ctx, SSL_cert,
SSL_FILETYPE_PEM)<=0) {
r = SXE_ESSL;
goto __fail;
}
/* verify private key */
if (!SSL_CTX_check_private_key(ssys->ctx)) {
r = SXE_ESSL;
goto __fail;
}
} else {
/* set the local certificate from CertFile */
if(SSL_CTX_use_certificate_file(ssys->ctx, SSL_cert,
SSL_FILETYPE_PEM)<=0) {
r = SXE_ESSL;
goto __fail;
}
/* set the private key from KeyFile (may be the same as CertFile) */
if(SSL_CTX_use_PrivateKey_file(ssys->ctx, SSL_cert,
SSL_FILETYPE_PEM)<=0) {
r = SXE_ESSL;
goto __fail;
}
/* verify private key */
if (!SSL_CTX_check_private_key(ssys->ctx)) {
r = SXE_ESSL;
goto __fail;
}
}
/* resolve host */ /* resolve host */
#ifdef WIN32 #ifdef WIN32
@ -1315,22 +1172,22 @@ sxlink_t *sxlink_connect(sxhub_t *ssys, const char *host,
} }
/* SSL handshake */ /* SSL handshake */
co->ssl = SSL_new(ssys->ctx); link->ssl = SSL_new(hub->ctx);
if(!co->ssl) { if(!link->ssl) {
close(sck); close(sck);
r = SXE_ENOMEM; r = SXE_ENOMEM;
goto __fail; goto __fail;
} }
SSL_set_fd(co->ssl, sck); /* attach connected socket */ SSL_set_fd(link->ssl, sck); /* attach connected socket */
SSL_set_connect_state(co->ssl); SSL_set_connect_state(link->ssl);
if(SSL_connect(co->ssl) == -1) { if(SSL_connect(link->ssl) == -1) {
r = SXE_EPERM; r = SXE_EPERM;
/* shutdown connection */ /* shutdown connection */
goto __fail; goto __fail;
} /* if success we're ready to use established SSL channel */ } /* if success we're ready to use established SSL channel */
/* set connection to the batch mode */ /* set connection to the batch mode */
co->flags |= SXMP_BATCHMODE; link->flags |= SXMP_BATCHMODE;
/* allocate our first buffer */ /* allocate our first buffer */
buf = mmap(NULL, 65536, PROT_READ|PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); buf = mmap(NULL, 65536, PROT_READ|PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
@ -1339,7 +1196,7 @@ sxlink_t *sxlink_connect(sxhub_t *ssys, const char *host,
if(!(msg = malloc(sizeof(sxmsg_t)))) { r = SXE_ENOMEM; goto __fail2; } if(!(msg = malloc(sizeof(sxmsg_t)))) { r = SXE_ENOMEM; goto __fail2; }
else { else {
memset(msg, 0, sizeof(sxmsg_t)); memset(msg, 0, sizeof(sxmsg_t));
co->messages[0] = msg; link->messages[0] = msg;
} }
bbuf = (char *)buf; bbuf = (char *)buf;
bbuf += sizeof(sxmplv2_head_t); bbuf += sizeof(sxmplv2_head_t);
@ -1352,10 +1209,10 @@ sxlink_t *sxlink_connect(sxhub_t *ssys, const char *host,
head->opcode = SXE_SUCCESS; head->opcode = SXE_SUCCESS;
head->payload_length = ln; head->payload_length = ln;
wr = __conn_write(co, buf, ln + sizeof(sxmplv2_head_t)); wr = __conn_write(link, buf, ln + sizeof(sxmplv2_head_t));
if(wr < 0) goto __fail2; if(wr < 0) goto __fail2;
rd = __conn_read(co, head, sizeof(sxmplv2_head_t)); rd = __conn_read(link, head, sizeof(sxmplv2_head_t));
if(rd < 0) goto __fail2; if(rd < 0) goto __fail2;
if(head->opcode != SXE_SUCCESS) { if(head->opcode != SXE_SUCCESS) {
r = head->opcode; r = head->opcode;
@ -1363,7 +1220,7 @@ sxlink_t *sxlink_connect(sxhub_t *ssys, const char *host,
} }
/* since V2.1 syncronization should be done here */ /* since V2.1 syncronization should be done here */
while(co->flags & SXMP_BATCHMODE) { while(link->flags & SXMP_BATCHMODE) {
head->opcode = SXE_SUCCESS; head->opcode = SXE_SUCCESS;
switch(sync_state) { switch(sync_state) {
@ -1380,86 +1237,86 @@ sxlink_t *sxlink_connect(sxhub_t *ssys, const char *host,
/* write a message */ /* write a message */
head->payload_length = ln; head->payload_length = ln;
wr = __conn_write(co, buf, ln + sizeof(sxmplv2_head_t)); wr = __conn_write(link, buf, ln + sizeof(sxmplv2_head_t));
if(wr < 0) goto __fail2; if(wr < 0) goto __fail2;
rd = __conn_read(co, head, sizeof(sxmplv2_head_t)); rd = __conn_read(link, head, sizeof(sxmplv2_head_t));
if(rd < 0) goto __fail2; if(rd < 0) goto __fail2;
if(head->opcode != SXE_SUCCESS) goto __fail2; if(head->opcode != SXE_SUCCESS) goto __fail2;
if(!head->payload_length) { if(!head->payload_length) {
sync_state++; sync_state++;
continue; continue;
} }
rd = __conn_read(co, bbuf, head->payload_length); rd = __conn_read(link, bbuf, head->payload_length);
if(rd < 0) goto __fail2; if(rd < 0) goto __fail2;
/* perform a parsing of the desired message */ /* perform a parsing of the desired message */
bbuf[rd] = '\0'; bbuf[rd] = '\0';
sx = parse_sexp(bbuf, rd); sx = parse_sexp(bbuf, rd);
if(!sx) { r = SXE_BADPROTO; goto __fail2; } if(!sx) { r = SXE_BADPROTO; goto __fail2; }
r = __eval_syssexp(co, sx); r = __eval_syssexp(link, sx);
if(!r) r = SXE_SUCCESS; if(!r) r = SXE_SUCCESS;
destroy_sexp(sx); destroy_sexp(sx);
if(sync_state != _SYNC_ON_STREAMS) { if(sync_state != _SYNC_ON_STREAMS) {
/* write back */ /* write back */
head->opcode = r; head->opcode = r;
head->payload_length = 0; head->payload_length = 0;
wr = __conn_write(co, head, sizeof(sxmplv2_head_t)); wr = __conn_write(link, head, sizeof(sxmplv2_head_t));
if(wr < 0) { r = SXE_LINKERROR; goto __fail2;} if(wr < 0) { r = SXE_LINKERROR; goto __fail2;}
if(r != SXE_SUCCESS) { r = SXE_LINKERROR; goto __fail2;} if(r != SXE_SUCCESS) { r = SXE_LINKERROR; goto __fail2;}
} }
sync_state++; sync_state++;
} }
/* if we're there - negotiation is done, going to init messaging mode */ /* if we're there - negotiation is done, going to init messaging mode */
r = __link_second_alloc(co); r = __link_second_alloc(link);
if(r != SXE_SUCCESS) goto __fail3; if(r != SXE_SUCCESS) goto __fail3;
/* free message */ /* free message */
co->messages[0] = NULL; link->messages[0] = NULL;
free(msg); free(msg);
/* and now we're need to create a thread poll */ /* and now we're need to create a thread poll */
if(!(bundle = malloc(sizeof(sxmplv2_bundle_t)))) { r = SXE_ENOMEM; goto __fail4; } if(!(bundle = malloc(sizeof(sxmplv2_bundle_t)))) { r = SXE_ENOMEM; goto __fail4; }
else { else {
bundle->buf = buf; bundle->buf = buf;
bundle->conn = co; bundle->conn = link;
} }
for(i = 0; i < MAX_SXMPLTHREADS; i++) { for(i = 0; i < MAX_SXMPLTHREADS; i++) {
if(bundle == (void *)0xdead) bundle = __sxmpl_bundle_create(co); if(bundle == (void *)0xdead) bundle = __sxmpl_bundle_create(link);
if(!bundle) goto __fail5; if(!bundle) goto __fail5;
r = pthread_create(&co->thrd_poll[i], NULL, __sxmpl_thread, bundle); r = pthread_create(&link->thrd_poll[i], NULL, __sxmpl_thread, bundle);
if(r) goto __fail5; if(r) goto __fail5;
else { else {
pthread_detach(co->thrd_poll[i]); pthread_detach(link->thrd_poll[i]);
bundle = (void *)0xdead; bundle = (void *)0xdead;
} }
} }
/* all is done, connection now ready */ /* all is done, connection now ready */
co->flags |= SXMP_ALIVE; link->flags |= SXMP_ALIVE;
return co; return link;
__fail5: __fail5:
r = SXE_ENOMEM; r = SXE_ENOMEM;
/* bundles will be freed by the threads when SSL_read will fails. */ /* bundles will be freed by the threads when SSL_read will fails. */
__fail4: __fail4:
__link_second_free(co); __link_second_free(link);
__fail3: __fail3:
if(ssys->on_destroy) ssys->on_destroy(co); if(hub->on_destroy) hub->on_destroy(link);
__fail2: __fail2:
if(buf != MAP_FAILED) munmap(buf, 65536); if(buf != MAP_FAILED) munmap(buf, 65536);
SSL_shutdown(co->ssl); SSL_shutdown(link->ssl);
ERR_remove_thread_state(0); ERR_remove_thread_state(0);
ERR_remove_state(0); ERR_remove_state(0);
close(sck); close(sck);
__fail: __fail:
if(co) { if(link) {
if(co->ssl) SSL_free(co->ssl); if(link->ssl) SSL_free(link->ssl);
__link_minimal_free(co); __link_minimal_free(link);
} }
errno = r; errno = r;
return NULL; return NULL;

Loading…
Cancel
Save