|
|
|
/*
|
|
|
|
* Secure X Message Passing Library v2 implementation.
|
|
|
|
* (sxmplv2) it superseed all versions before due to the:
|
|
|
|
* - memory consumption
|
|
|
|
* - new features such as pulse emitting
|
|
|
|
* - performance optimization
|
|
|
|
*
|
|
|
|
* (c) Askele Group 2013-2015 <http://askele.com>
|
|
|
|
* (c) Alexander Vdolainen 2013-2015 <avdolainen@gmail.com>
|
|
|
|
*
|
|
|
|
* libsxmp is free software: you can redistribute it and/or modify it
|
|
|
|
* under the terms of the GNU Lesser General Public License as published
|
|
|
|
* by the Free Software Foundation, either version 3 of the License, or
|
|
|
|
* (at your option) any later version.
|
|
|
|
*
|
|
|
|
* libsxmp is distributed in the hope that it will be useful, but
|
|
|
|
* WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
|
|
|
|
* See the GNU Lesser General Public License for more details.
|
|
|
|
*
|
|
|
|
* You should have received a copy of the GNU Lesser General Public License
|
|
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.";
|
|
|
|
*
|
|
|
|
*/
|
|
|
|
|
|
|
|
#include <stdlib.h>
|
|
|
|
#include <stdio.h>
|
|
|
|
#include <errno.h>
|
|
|
|
#include <string.h>
|
|
|
|
#include <pthread.h>
|
|
|
|
#include <sys/stat.h>
|
|
|
|
#include <sys/time.h>
|
|
|
|
#include <sys/types.h>
|
|
|
|
#include <sys/mman.h>
|
|
|
|
#include <fcntl.h>
|
|
|
|
|
|
|
|
#ifdef WIN32
|
|
|
|
#include <Winsock2.h>
|
|
|
|
#define EBADE 1
|
|
|
|
#define NETDB_SUCCESS 0
|
|
|
|
#else
|
|
|
|
#include <sys/select.h>
|
|
|
|
#include <netdb.h>
|
|
|
|
#include <unistd.h>
|
|
|
|
#include <uuid/uuid.h>
|
|
|
|
#endif
|
|
|
|
|
|
|
|
#include <openssl/ssl.h>
|
|
|
|
#include <openssl/err.h>
|
|
|
|
#include <openssl/engine.h>
|
|
|
|
|
|
|
|
#include <tdata/usrtc.h>
|
|
|
|
#include <tdata/list.h>
|
|
|
|
#include <sexpr/sexp.h>
|
|
|
|
|
|
|
|
#include <sxmp/limits.h>
|
|
|
|
#include <sxmp/sxmp.h>
|
|
|
|
|
|
|
|
#include "internal.h"
|
|
|
|
|
|
|
|
typedef struct __sxmpl_bundle_type {
|
|
|
|
void *buf;
|
|
|
|
sxlink_t *conn;
|
|
|
|
} sxmplv2_bundle_t;
|
|
|
|
|
|
|
|
/* networking helpers */
|
|
|
|
#ifndef WIN32
|
|
|
|
int __resolvehost(const char *hostname, char *buf, int buf_len,
|
|
|
|
struct hostent **rhp)
|
|
|
|
{
|
|
|
|
struct hostent *hostbuf ;
|
|
|
|
struct hostent *hp = *rhp = NULL;
|
|
|
|
int herr = 0, hres = 0;
|
|
|
|
|
|
|
|
hostbuf = malloc(sizeof(struct hostent));
|
|
|
|
if(!hostbuf) return NO_ADDRESS;
|
|
|
|
hres = gethostbyname_r(hostname, hostbuf,
|
|
|
|
buf, buf_len, &hp, &herr);
|
|
|
|
|
|
|
|
if(hres) return NO_ADDRESS;
|
|
|
|
*rhp = hp;
|
|
|
|
|
|
|
|
return NETDB_SUCCESS;
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
|
|
|
|
static int __conn_read(sxlink_t *co, void *buf, size_t buf_len)
|
|
|
|
{
|
|
|
|
int rfd = SSL_get_fd(co->ssl), r;
|
|
|
|
fd_set readset, writeset;
|
|
|
|
int ofcmode, read_blocked = 0, read_blocked_on_write = 0;
|
|
|
|
|
|
|
|
/* First we make the socket nonblocking */
|
|
|
|
#ifndef WIN32
|
|
|
|
ofcmode = fcntl(rfd, F_GETFL,0);
|
|
|
|
ofcmode |= O_NDELAY;
|
|
|
|
if(fcntl(rfd, F_SETFL, ofcmode))
|
|
|
|
fprintf(stderr, "[sxmplv2] (RD)Couldn't make socket nonblocking");
|
|
|
|
#endif
|
|
|
|
|
|
|
|
__retry:
|
|
|
|
|
|
|
|
do {
|
|
|
|
__try_again:
|
|
|
|
if(co->flags & SXMP_CLOSED) {
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
r = SSL_read(co->ssl, buf, (int)buf_len);
|
|
|
|
switch(SSL_get_error (co->ssl, r)) {
|
|
|
|
case SSL_ERROR_NONE:
|
|
|
|
return r;
|
|
|
|
break;
|
|
|
|
case SSL_ERROR_WANT_READ:
|
|
|
|
/* get prepare to select */
|
|
|
|
read_blocked = 1;
|
|
|
|
break;
|
|
|
|
case SSL_ERROR_WANT_WRITE: /* here we blocked on write */
|
|
|
|
read_blocked_on_write = 1;
|
|
|
|
break;
|
|
|
|
case SSL_ERROR_SYSCALL:
|
|
|
|
if(errno == EAGAIN || errno == EINTR) goto __try_again;
|
|
|
|
else {
|
|
|
|
fprintf(stderr, "[sxmplv2] (RD)SSL syscall error.\n");
|
|
|
|
goto __close_conn;
|
|
|
|
}
|
|
|
|
break;
|
|
|
|
case SSL_ERROR_WANT_CONNECT:
|
|
|
|
case SSL_ERROR_WANT_ACCEPT:
|
|
|
|
fprintf(stderr, "[sxmplv2] (RD)SSL negotiation required. Trying again.\n");
|
|
|
|
goto __try_again;
|
|
|
|
break;
|
|
|
|
case SSL_ERROR_SSL:
|
|
|
|
fprintf(stderr, "[sxmplv2] (RD)SSL error occured. Connection will be closed.\n");
|
|
|
|
goto __close_conn;
|
|
|
|
break;
|
|
|
|
case SSL_ERROR_ZERO_RETURN:
|
|
|
|
fprintf(stderr, "[sxmplv2] (RD)SSL connection is cleary closed.\n");
|
|
|
|
default:
|
|
|
|
__close_conn:
|
|
|
|
ERR_free_strings();
|
|
|
|
co->flags |= SXMP_CLOSED;
|
|
|
|
fprintf(stderr, "[sxmplv2] (RD)Unknown error on %s (errno = %d)\n", co->uuid, errno);
|
|
|
|
ERR_remove_state(0);
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
} while(SSL_pending(co->ssl) && !read_blocked);
|
|
|
|
|
|
|
|
__select_retry:
|
|
|
|
|
|
|
|
if(read_blocked) {
|
|
|
|
FD_ZERO(&readset);
|
|
|
|
FD_SET(rfd, &readset);
|
|
|
|
/* waits until something will be ready to read */
|
|
|
|
r = select(rfd + 1, &readset, NULL, NULL, NULL);
|
|
|
|
if(r < 0) {
|
|
|
|
if(errno == EINTR || errno == EAGAIN) goto __select_retry;
|
|
|
|
fprintf(stderr, "[sxmplv2] (RD)Select (%d) on %s\n", errno, co->uuid);
|
|
|
|
ERR_remove_state(0);
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
if(!r) {
|
|
|
|
fprintf(stderr, "[sxmplv2] (RD)Nothing to wait for\n");
|
|
|
|
ERR_remove_state(0);
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
read_blocked = 0;
|
|
|
|
if(r && FD_ISSET(rfd, &readset)) goto __retry; /* try to read again */
|
|
|
|
}
|
|
|
|
if(read_blocked_on_write) { /* we was blocked on write */
|
|
|
|
FD_ZERO(&readset);
|
|
|
|
FD_ZERO(&writeset);
|
|
|
|
FD_SET(rfd, &readset);
|
|
|
|
FD_SET(rfd, &writeset);
|
|
|
|
|
|
|
|
r = select(rfd + 1, &readset, &writeset, NULL, NULL);
|
|
|
|
read_blocked_on_write = 0;
|
|
|
|
if(r && FD_ISSET(rfd, &writeset)) goto __retry;
|
|
|
|
}
|
|
|
|
|
|
|
|
return r;
|
|
|
|
}
|
|
|
|
|
|
|
|
static int __conn_write(sxlink_t *co, void *buf, size_t buf_len)
|
|
|
|
{
|
|
|
|
int r, rfd = SSL_get_fd(co->ssl);
|
|
|
|
fd_set writeset;
|
|
|
|
|
|
|
|
__retry:
|
|
|
|
if(co->flags & SXMP_CLOSED) {
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
r = SSL_write(co->ssl, buf, (int)buf_len);
|
|
|
|
switch(SSL_get_error(co->ssl, r)) {
|
|
|
|
case SSL_ERROR_WANT_READ:
|
|
|
|
case SSL_ERROR_WANT_WRITE:
|
|
|
|
/* here we should block */
|
|
|
|
FD_ZERO(&writeset);
|
|
|
|
FD_SET(rfd, &writeset);
|
|
|
|
r = select(rfd + 1, NULL, &writeset, NULL, NULL);
|
|
|
|
if(r && FD_ISSET(rfd, &writeset)) goto __retry;
|
|
|
|
break;
|
|
|
|
case SSL_ERROR_SYSCALL:
|
|
|
|
if(errno == EAGAIN || errno == EINTR) goto __retry;
|
|
|
|
else goto __close_conn;
|
|
|
|
break;
|
|
|
|
default:
|
|
|
|
__close_conn:
|
|
|
|
if(r < 0) {
|
|
|
|
/* set closed flag */
|
|
|
|
ERR_free_strings();
|
|
|
|
co->flags |= SXMP_CLOSED;
|
|
|
|
fprintf(stderr, "[sxmplv2] (WR)Unknown error on %s (%d)\n", co->uuid, r);
|
|
|
|
ERR_remove_state(0);
|
|
|
|
return -1;
|
|
|
|
} else return r;
|
|
|
|
}
|
|
|
|
|
|
|
|
return r;
|
|
|
|
}
|
|
|
|
|
|
|
|
int _sxmpl_writemsg(sxlink_t *co, sxmsg_t *msg)
|
|
|
|
{
|
|
|
|
sxmplv2_head_t *head;
|
|
|
|
size_t rd;
|
|
|
|
int r;
|
|
|
|
char *buf = NULL;
|
|
|
|
|
|
|
|
if(!co || !msg) return SXE_FAILED;
|
|
|
|
|
|
|
|
/* check message for validity */
|
|
|
|
head = &msg->mhead;
|
|
|
|
if(head->payload_length && !msg->payload) return SXE_FAILED;
|
|
|
|
|
|
|
|
if(head->payload_length) {
|
|
|
|
buf = malloc(sizeof(sxmplv2_head_t) + head->payload_length);
|
|
|
|
memcpy(buf, head, sizeof(sxmplv2_head_t));
|
|
|
|
memcpy(buf + sizeof(sxmplv2_head_t), msg->payload, head->payload_length);
|
|
|
|
}
|
|
|
|
|
|
|
|
/* write the head and payload if applicable */
|
|
|
|
pthread_mutex_lock(&co->sslinout[1]);
|
|
|
|
if(!buf)
|
|
|
|
rd = __conn_write(co, head, sizeof(sxmplv2_head_t));
|
|
|
|
else rd = __conn_write(co, buf, sizeof(sxmplv2_head_t) + head->payload_length);
|
|
|
|
if(rd < 0) {
|
|
|
|
co->flags |= SXMP_CLOSED;
|
|
|
|
r = SXE_ESSL;
|
|
|
|
}
|
|
|
|
pthread_mutex_unlock(&co->sslinout[1]);
|
|
|
|
|
|
|
|
if(!(co->flags & SXMP_CLOSED)) r = SXE_SUCCESS;
|
|
|
|
if(buf) free(buf);
|
|
|
|
return r;
|
|
|
|
}
|
|
|
|
|
|
|
|
static sxmplv2_bundle_t *__sxmpl_bundle_create(sxlink_t *co)
|
|
|
|
{
|
|
|
|
sxmplv2_bundle_t *n = malloc(sizeof(sxmplv2_bundle_t));
|
|
|
|
|
|
|
|
if(!n) return NULL;
|
|
|
|
else memset(n, 0, sizeof(sxmplv2_bundle_t));
|
|
|
|
|
|
|
|
n->buf = mmap(NULL, 65536, PROT_READ|PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
|
|
|
|
if(n->buf == MAP_FAILED) {
|
|
|
|
free(n);
|
|
|
|
return NULL;
|
|
|
|
}
|
|
|
|
|
|
|
|
n->conn = co;
|
|
|
|
|
|
|
|
return n;
|
|
|
|
}
|
|
|
|
|
|
|
|
static void __sxmpl_bundle_destroy(sxmplv2_bundle_t *n)
|
|
|
|
{
|
|
|
|
munmap(n->buf, 65536);
|
|
|
|
free(n);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
static int ex_ssldata_index; /** < index used to work with additional data
|
|
|
|
* provided to the special call during SSL handshake */
|
|
|
|
|
|
|
|
/* this function is an ugly implementation to get C string with uuid */
|
|
|
|
extern char *__generate_uuid(void);
|
|
|
|
|
|
|
|
/* this is a callback to perform a custom SSL certs chain validation,
|
|
|
|
* as I promised here the comments, a lot of ...
|
|
|
|
* The first shit: 0 means validation failed, 1 otherwise
|
|
|
|
* The second shit: X509 API, I guess u will love it ;-)
|
|
|
|
* openssl calls this function for each certificate in chain,
|
|
|
|
* since our case is a simple (depth of chain is one, since we're
|
|
|
|
* don't care for public certificates lists or I cannot find any reasons to
|
|
|
|
* do it ...), amount of calls reduced, and in this case we're interested
|
|
|
|
* only in top of chain i.e. actual certificate used on client side,
|
|
|
|
* the validity of signing for other certificates within chain is
|
|
|
|
* guaranteed by the ssl itself.
|
|
|
|
* u know, we need to lookup in database, or elsewhere... some information
|
|
|
|
* about client certificate, and decide - is it valid, or not?, if so
|
|
|
|
* yep I mean it's valid, we can assign it's long fucking number to
|
|
|
|
* security context, to use in ongoing full scaled connection handshaking.
|
|
|
|
*/
|
|
|
|
static int __verify_certcall(int preverify_ok, X509_STORE_CTX *ctx)
|
|
|
|
{
|
|
|
|
// X509 *cert = X509_STORE_CTX_get_current_cert(ctx);
|
|
|
|
int err = X509_STORE_CTX_get_error(ctx), depth = X509_STORE_CTX_get_error_depth(ctx);
|
|
|
|
SSL *ssl = X509_STORE_CTX_get_ex_data(ctx, SSL_get_ex_data_X509_STORE_CTX_idx());
|
|
|
|
sxlink_t *co = SSL_get_ex_data(ssl, ex_ssldata_index); /* this is a custom data we're set before */
|
|
|
|
sxhub_t *ssys = co->ssys;
|
|
|
|
|
|
|
|
/* now we need to check for certificates with a long chain,
|
|
|
|
* so since we have a short one, reject long ones */
|
|
|
|
if(depth > VERIFY_DEPTH) { /* longer than we expect */
|
|
|
|
preverify_ok = 0; /* yep, 0 means error for those function callback in openssl, fucking set */
|
|
|
|
err = X509_V_ERR_CERT_CHAIN_TOO_LONG;
|
|
|
|
X509_STORE_CTX_set_error(ctx, err);
|
|
|
|
}
|
|
|
|
|
|
|
|
if(!preverify_ok) return 0;
|
|
|
|
|
|
|
|
/* ok, now we're on top of SSL (depth == 0) certs chain,
|
|
|
|
* and we can validate client certificate */
|
|
|
|
if(!depth) {
|
|
|
|
co->pctx->certid =
|
|
|
|
ASN1_INTEGER_get((const ASN1_INTEGER *)X509_get_serialNumber(ctx->current_cert));
|
|
|
|
//X509_STORE_CTX_free(ctx);
|
|
|
|
//X509_free(ctx->current_cert);
|
|
|
|
/* now we're need to check the ssl cert */
|
|
|
|
if(ssys->validate_sslpem) {
|
|
|
|
if(ssys->validate_sslpem(co)) return 0;
|
|
|
|
else return 1;
|
|
|
|
} else return 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
return preverify_ok;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* dummy just to check the server side */
|
|
|
|
static int __verify_certcall_dummy(int preverify_ok, X509_STORE_CTX *ctx)
|
|
|
|
{
|
|
|
|
return preverify_ok;
|
|
|
|
}
|
|
|
|
|
|
|
|
static pthread_mutex_t *lock_cs;
|
|
|
|
static long *lock_count;
|
|
|
|
|
|
|
|
static void pthreads_locking_callback(int mode, int type, const char *file, int line)
|
|
|
|
{
|
|
|
|
if (mode & CRYPTO_LOCK) {
|
|
|
|
pthread_mutex_lock(&(lock_cs[type]));
|
|
|
|
lock_count[type]++;
|
|
|
|
} else {
|
|
|
|
pthread_mutex_unlock(&(lock_cs[type]));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
static void pthreads_thread_id(CRYPTO_THREADID *tid)
|
|
|
|
{
|
|
|
|
#ifdef WIN32
|
|
|
|
CRYPTO_THREADID_set_numeric(tid, (unsigned long)GetCurrentThreadId());
|
|
|
|
#else
|
|
|
|
CRYPTO_THREADID_set_numeric(tid, (unsigned long)pthread_self());
|
|
|
|
#endif
|
|
|
|
}
|
|
|
|
|
|
|
|
int sxmp_init(void)
|
|
|
|
{
|
|
|
|
int i;
|
|
|
|
|
|
|
|
/* init SSL library */
|
|
|
|
SSL_library_init();
|
|
|
|
|
|
|
|
OpenSSL_add_all_algorithms();
|
|
|
|
SSL_load_error_strings();
|
|
|
|
|
|
|
|
ex_ssldata_index = SSL_get_ex_new_index(0, "__ssldata index", NULL, NULL, NULL);
|
|
|
|
|
|
|
|
/* here we go - init all */
|
|
|
|
lock_cs = OPENSSL_malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t));
|
|
|
|
lock_count = OPENSSL_malloc(CRYPTO_num_locks() * sizeof(long));
|
|
|
|
for (i = 0; i < CRYPTO_num_locks(); i++) {
|
|
|
|
lock_count[i] = 0;
|
|
|
|
pthread_mutex_init(&(lock_cs[i]), NULL);
|
|
|
|
}
|
|
|
|
|
|
|
|
CRYPTO_THREADID_set_callback(pthreads_thread_id);
|
|
|
|
CRYPTO_set_locking_callback(pthreads_locking_callback);
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
void sxmp_finalize(void)
|
|
|
|
{
|
|
|
|
int i;
|
|
|
|
|
|
|
|
CRYPTO_set_locking_callback(NULL);
|
|
|
|
|
|
|
|
for (i = 0; i < CRYPTO_num_locks(); i++) {
|
|
|
|
pthread_mutex_destroy(&(lock_cs[i]));
|
|
|
|
}
|
|
|
|
OPENSSL_free(lock_cs);
|
|
|
|
OPENSSL_free(lock_count);
|
|
|
|
|
|
|
|
ERR_free_strings();
|
|
|
|
ENGINE_cleanup();
|
|
|
|
CRYPTO_cleanup_all_ex_data();
|
|
|
|
EVP_cleanup();
|
|
|
|
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
sxlink_t *__link_minimal_alloc(struct in_addr *addr)
|
|
|
|
{
|
|
|
|
sxlink_t *co = malloc(sizeof(sxlink_t));
|
|
|
|
int r;
|
|
|
|
|
|
|
|
if(!co) { r = ENOMEM; goto __fail; }
|
|
|
|
else memset(co, 0, sizeof(sxlink_t));
|
|
|
|
|
|
|
|
if(!(co->messages = malloc(sizeof(uintptr_t)*1024))) { r = ENOMEM; goto __fail; }
|
|
|
|
else memset(co->messages, 0, sizeof(uintptr_t)*1024);
|
|
|
|
|
|
|
|
if(!(co->pctx = malloc(sizeof(sxsession_ctx_t)))) { r = ENOMEM; goto __fail; }
|
|
|
|
else memset(co->pctx, 0, sizeof(sxsession_ctx_t));
|
|
|
|
if(addr) {
|
|
|
|
if(!(co->pctx->addr = malloc(sizeof(struct in_addr)))) { r = ENOMEM; goto __fail; }
|
|
|
|
|
|
|
|
memcpy(co->pctx->addr, addr, sizeof(struct in_addr));
|
|
|
|
}
|
|
|
|
|
|
|
|
if(!(co->uuid = __generate_uuid())) { r = ENOMEM; goto __fail; }
|
|
|
|
|
|
|
|
return co;
|
|
|
|
|
|
|
|
__fail:
|
|
|
|
if(co) {
|
|
|
|
if(co->pctx) {
|
|
|
|
if(co->pctx->addr) free(co->pctx->addr);
|
|
|
|
free(co->pctx);
|
|
|
|
}
|
|
|
|
if(co->messages) free(co->messages);
|
|
|
|
free(co);
|
|
|
|
}
|
|
|
|
errno = r;
|
|
|
|
return NULL;
|
|
|
|
}
|
|
|
|
|
|
|
|
static int __link_second_alloc(sxlink_t *co)
|
|
|
|
{
|
|
|
|
usrtc_node_init(&co->csnode, co);
|
|
|
|
|
|
|
|
memset(&co->idx_ch, 0, sizeof(idx_allocator_t));
|
|
|
|
memset(&co->idx_msg, 0, sizeof(idx_allocator_t));
|
|
|
|
if((idx_allocator_init(&co->idx_ch, 512, 0))) goto __fail;
|
|
|
|
if((idx_allocator_init(&co->idx_msg, 1024, 0))) goto __fail;
|
|
|
|
|
|
|
|
if(!(co->channels = malloc(sizeof(uintptr_t)*512))) goto __fail;
|
|
|
|
else memset(co->channels, 0, sizeof(uintptr_t)*512);
|
|
|
|
|
|
|
|
/* init mutexes */
|
|
|
|
pthread_mutex_init(&co->idx_ch_lock, NULL);
|
|
|
|
pthread_mutex_init(&co->idx_msg_lock, NULL);
|
|
|
|
pthread_mutex_init(&co->write_pending_lock, NULL);
|
|
|
|
pthread_mutex_init(&co->sslinout[0], NULL);
|
|
|
|
pthread_mutex_init(&co->sslinout[1], NULL);
|
|
|
|
|
|
|
|
/* init list */
|
|
|
|
list_init_head(&co->write_pending);
|
|
|
|
|
|
|
|
return SXE_SUCCESS;
|
|
|
|
|
|
|
|
__fail:
|
|
|
|
idx_allocator_destroy(&co->idx_msg);
|
|
|
|
idx_allocator_destroy(&co->idx_ch);
|
|
|
|
return SXE_ENOMEM;
|
|
|
|
}
|
|
|
|
|
|
|
|
static void __link_second_free(sxlink_t *co)
|
|
|
|
{
|
|
|
|
if(co->channels) free(co->channels);
|
|
|
|
idx_allocator_destroy(&co->idx_msg);
|
|
|
|
idx_allocator_destroy(&co->idx_ch);
|
|
|
|
|
|
|
|
pthread_mutex_destroy(&co->idx_ch_lock);
|
|
|
|
pthread_mutex_destroy(&co->idx_msg_lock);
|
|
|
|
pthread_mutex_destroy(&co->write_pending_lock);
|
|
|
|
pthread_mutex_destroy(&co->sslinout[0]);
|
|
|
|
pthread_mutex_destroy(&co->sslinout[1]);
|
|
|
|
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
static void __link_minimal_free(sxlink_t *co)
|
|
|
|
{
|
|
|
|
if(co) {
|
|
|
|
if(co->pctx) {
|
|
|
|
if(co->pctx->addr) free(co->pctx->addr);
|
|
|
|
free(co->pctx);
|
|
|
|
}
|
|
|
|
if(co->messages) free(co->messages);
|
|
|
|
free(co->uuid);
|
|
|
|
free(co);
|
|
|
|
}
|
|
|
|
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
static int __eval_syssexp(sxlink_t *co, sexp_t *sx)
|
|
|
|
{
|
|
|
|
sxl_rpclist_t *rpc_list = co->ssys->system_rpc;
|
|
|
|
usrtc_node_t *node;
|
|
|
|
sxl_rpc_t *rentry;
|
|
|
|
char *rpcf;
|
|
|
|
|
|
|
|
if(sx->ty == SEXP_LIST)
|
|
|
|
rpcf = sx->list->val;
|
|
|
|
else return SXE_BADPROTO;
|
|
|
|
|
|
|
|
/* find an appropriate function */
|
|
|
|
node = usrtc_lookup(rpc_list->rpc_tree, rpcf);
|
|
|
|
|
|
|
|
if(!node) return SXE_ENORPC;
|
|
|
|
else rentry = (sxl_rpc_t *)usrtc_node_getdata(node);
|
|
|
|
|
|
|
|
/* call it */
|
|
|
|
return rentry->rpcf((void *)co, sx);
|
|
|
|
}
|
|
|
|
|
|
|
|
#ifdef _NO_SXMPMP
|
|
|
|
#define _CONN_INUSE(co) (co)->usecount++;
|
|
|
|
#define _CONN_NOTINUSE(co) (co)->usecount--;
|
|
|
|
#define _CONN_UCOUNT(co) (co)->usecount
|
|
|
|
#else
|
|
|
|
static inline void _CONN_INUSE(sxlink_t *co) {
|
|
|
|
pthread_rwlock_wrlock(&co->ssys->rwlock);
|
|
|
|
co->usecount++;
|
|
|
|
pthread_rwlock_unlock(&co->ssys->rwlock);
|
|
|
|
}
|
|
|
|
|
|
|
|
static inline void _CONN_NOTINUSE(sxlink_t *co) {
|
|
|
|
pthread_rwlock_wrlock(&co->ssys->rwlock);
|
|
|
|
co->usecount--;
|
|
|
|
pthread_rwlock_unlock(&co->ssys->rwlock);
|
|
|
|
}
|
|
|
|
|
|
|
|
static inline int _CONN_UCOUNT(sxlink_t *co) {
|
|
|
|
int r;
|
|
|
|
pthread_rwlock_rdlock(&co->ssys->rwlock);
|
|
|
|
r = co->usecount;
|
|
|
|
pthread_rwlock_unlock(&co->ssys->rwlock);
|
|
|
|
return r;
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
|
|
|
|
static void __link_destroy(sxlink_t *co)
|
|
|
|
{
|
|
|
|
int i = 0, fd;
|
|
|
|
sxmsg_t *msg, *omsg;
|
|
|
|
sxppmsg_t *ppm;
|
|
|
|
list_node_t *iter, *siter;
|
|
|
|
sxchnl_t *chan;
|
|
|
|
sxmplv2_head_t *head;
|
|
|
|
sxhub_t *ssys = co->ssys;
|
|
|
|
|
|
|
|
/* first we will unpin all messages and mark it as errors on */
|
|
|
|
if(co->pending_messages) {
|
|
|
|
pthread_mutex_lock(&co->write_pending_lock);
|
|
|
|
list_for_each_safe(&co->write_pending, iter, siter) {
|
|
|
|
ppm = container_of(iter, sxppmsg_t, node);
|
|
|
|
omsg = ppm->msg;
|
|
|
|
|
|
|
|
/* ok, now we're able to remove it from list */
|
|
|
|
list_del(&ppm->node);
|
|
|
|
if(omsg->mhead.attr & SXMSG_CLOSED) { /* message is closed - destroy it */
|
|
|
|
pthread_mutex_unlock(&omsg->wait);
|
|
|
|
pthread_mutex_destroy(&omsg->wait);
|
|
|
|
free(omsg);
|
|
|
|
} else { /* wake up */
|
|
|
|
omsg->mhead.opcode = SXE_LINKERROR;
|
|
|
|
pthread_mutex_unlock(&omsg->wait);
|
|
|
|
}
|
|
|
|
free(ppm);
|
|
|
|
co->pending_messages--;
|
|
|
|
}
|
|
|
|
pthread_mutex_unlock(&co->write_pending_lock);
|
|
|
|
}
|
|
|
|
|
|
|
|
/* free queue */
|
|
|
|
ERR_remove_state(0);
|
|
|
|
ERR_remove_thread_state(0);
|
|
|
|
ERR_free_strings();
|
|
|
|
|
|
|
|
/* update use count */
|
|
|
|
_CONN_NOTINUSE(co);
|
|
|
|
|
|
|
|
/* ok, let's free other if we can */
|
|
|
|
if(!_CONN_UCOUNT(co)) {
|
|
|
|
/* go thru messages */
|
|
|
|
pthread_mutex_lock(&co->idx_msg_lock);
|
|
|
|
for(i = 0; i < 1024; i++) {
|
|
|
|
msg = co->messages[i];
|
|
|
|
if(!msg) continue;
|
|
|
|
else head = &msg->mhead;
|
|
|
|
head->opcode = SXE_LINKERROR;
|
|
|
|
pthread_mutex_unlock(&msg->wait);
|
|
|
|
pthread_mutex_destroy(&msg->wait);
|
|
|
|
free(msg);
|
|
|
|
co->messages[i] = NULL;
|
|
|
|
idx_free(&co->idx_msg, i);
|
|
|
|
}
|
|
|
|
pthread_mutex_unlock(&co->idx_msg_lock);
|
|
|
|
|
|
|
|
/* ok now we will free the channels */
|
|
|
|
pthread_mutex_lock(&co->idx_ch_lock);
|
|
|
|
for(i = 0; i < 512; i++) {
|
|
|
|
chan = co->channels[i];
|
|
|
|
if(!chan) continue;
|
|
|
|
idx_free(&co->idx_ch, i);
|
|
|
|
free(chan);
|
|
|
|
}
|
|
|
|
pthread_mutex_unlock(&co->idx_ch_lock);
|
|
|
|
|
|
|
|
if(ssys->on_destroy) ssys->on_destroy(co);
|
|
|
|
if(co->pctx->login) free(co->pctx->login);
|
|
|
|
if(co->pctx->passwd) free(co->pctx->passwd);
|
|
|
|
|
|
|
|
SSL_set_shutdown(co->ssl, SSL_RECEIVED_SHUTDOWN | SSL_SENT_SHUTDOWN);
|
|
|
|
|
|
|
|
fd = SSL_get_fd(co->ssl);
|
|
|
|
|
|
|
|
SSL_free(co->ssl);
|
|
|
|
co->ssl = NULL;
|
|
|
|
|
|
|
|
ERR_remove_thread_state(0);
|
|
|
|
ERR_remove_state(0);
|
|
|
|
|
|
|
|
|
|
|
|
ERR_free_strings();
|
|
|
|
|
|
|
|
close(fd);
|
|
|
|
__link_second_free(co);
|
|
|
|
__link_minimal_free(co);
|
|
|
|
}
|
|
|
|
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
static void *__sxmpl_thread(void *b)
|
|
|
|
{
|
|
|
|
sxmplv2_bundle_t *bun = (sxmplv2_bundle_t *)b;
|
|
|
|
sxlink_t *co = bun->conn;
|
|
|
|
void *buf = bun->buf;
|
|
|
|
char *bbuf = (char*)buf;
|
|
|
|
sxmplv2_head_t *mhead = (sxmplv2_head_t *)buf;
|
|
|
|
sxmsg_t *msg, *omsg;
|
|
|
|
sexp_t *sx;
|
|
|
|
sxchnl_t *channel;
|
|
|
|
list_node_t *iter, *siter;
|
|
|
|
sxppmsg_t *ppm;
|
|
|
|
pthread_t self = pthread_self();
|
|
|
|
struct timespec wtick;
|
|
|
|
int dispatch = 0, e;
|
|
|
|
size_t rd, wr;
|
|
|
|
ulong_t mid;
|
|
|
|
#ifdef _PERFPROFILE
|
|
|
|
struct timeval beg, end;
|
|
|
|
#endif
|
|
|
|
/* byte buffer is following head */
|
|
|
|
bbuf += sizeof(sxmplv2_head_t);
|
|
|
|
|
|
|
|
__wait_alive:
|
|
|
|
/* flag test - FIXME: make it atomic (it will works atomically on x86, btw on others not) */
|
|
|
|
if(!(co->flags & SXMP_ALIVE)) {
|
|
|
|
if(co->flags & SXMP_CLOSED) goto __finish;
|
|
|
|
else {
|
|
|
|
usleep(20);
|
|
|
|
goto __wait_alive;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/* check up a thread */
|
|
|
|
if(pthread_equal(self, co->thrd_poll[7])) /* dispatcher */
|
|
|
|
dispatch = 1;
|
|
|
|
|
|
|
|
/* update use count */
|
|
|
|
_CONN_INUSE(co);
|
|
|
|
|
|
|
|
/* the following logic : (except dispatcher)
|
|
|
|
* 1. check up pending write -> if exists write one and start again., otherwise go next
|
|
|
|
* 2. read from ssl connection (we will sleep if other already acquire the lock)
|
|
|
|
*/
|
|
|
|
while(1) {
|
|
|
|
__again:
|
|
|
|
if(co->flags & SXMP_CLOSED) goto __finish; /* go away if required asap */
|
|
|
|
/* works with pending messages */
|
|
|
|
if(co->pending_messages && !(co->flags & SXMP_CLOSED)) {
|
|
|
|
pthread_mutex_lock(&co->write_pending_lock);
|
|
|
|
list_for_each_safe(&co->write_pending, iter, siter) {
|
|
|
|
ppm = container_of(iter, sxppmsg_t, node);
|
|
|
|
omsg = ppm->msg;
|
|
|
|
if(_sxmpl_writemsg(co, omsg) != SXE_SUCCESS) {
|
|
|
|
pthread_mutex_unlock(&co->write_pending_lock);
|
|
|
|
goto __finish; /* write failed - finishing ... */
|
|
|
|
}
|
|
|
|
|
|
|
|
/* ok, now we're able to remove it from list */
|
|
|
|
list_del(&ppm->node);
|
|
|
|
if(omsg->mhead.attr & SXMSG_CLOSED) { /* message is closed - destroy it */
|
|
|
|
pthread_mutex_unlock(&omsg->wait);
|
|
|
|
pthread_mutex_destroy(&omsg->wait);
|
|
|
|
free(omsg);
|
|
|
|
}
|
|
|
|
free(ppm);
|
|
|
|
co->pending_messages--;
|
|
|
|
}
|
|
|
|
pthread_mutex_unlock(&co->write_pending_lock);
|
|
|
|
}
|
|
|
|
|
|
|
|
if(!dispatch) pthread_mutex_lock(&(co->sslinout[0]));
|
|
|
|
else { /* dispatch thread ticking every ət */
|
|
|
|
wtick.tv_sec = time(NULL) + 1;
|
|
|
|
e = pthread_mutex_timedlock(&(co->sslinout[0]), &wtick);
|
|
|
|
if(e == ETIMEDOUT) goto __again;
|
|
|
|
}
|
|
|
|
if(co->flags & SXMP_CLOSED) {
|
|
|
|
pthread_mutex_unlock(&(co->sslinout[0]));
|
|
|
|
goto __finish;
|
|
|
|
}
|
|
|
|
#ifdef _PERFPROFILE
|
|
|
|
gettimeofday(&beg, NULL);
|
|
|
|
#endif
|
|
|
|
rd = __conn_read(co, mhead, sizeof(sxmplv2_head_t));
|
|
|
|
#ifdef _PERFPROFILE
|
|
|
|
gettimeofday(&end, NULL);
|
|
|
|
|
|
|
|
if((end.tv_sec - beg.tv_sec) > 0) {
|
|
|
|
printf("connread(head) Seconds: %ld ", end.tv_sec - beg.tv_sec);
|
|
|
|
printf("µS: %ld\n", end.tv_usec + (1000000 - beg.tv_usec));
|
|
|
|
} else printf("connread(head) µS: %ld\n", end.tv_usec - beg.tv_usec);
|
|
|
|
#endif
|
|
|
|
|
|
|
|
if(co->flags & SXMP_CLOSED) {
|
|
|
|
pthread_mutex_unlock(&(co->sslinout[0]));
|
|
|
|
goto __finish; /* go away if required asap */
|
|
|
|
}
|
|
|
|
|
|
|
|
#ifdef _VERBOSE_DEBUG
|
|
|
|
dumphead(mhead);
|
|
|
|
#endif
|
|
|
|
if(rd < 0) {
|
|
|
|
__sslproto_error:
|
|
|
|
co->flags |= SXMP_CLOSED;
|
|
|
|
pthread_mutex_unlock(&(co->sslinout[0]));
|
|
|
|
goto __finish;
|
|
|
|
} else {
|
|
|
|
/* check up if we can read or not */
|
|
|
|
if(mhead->payload_length) {
|
|
|
|
#ifdef _PERFPROFILE
|
|
|
|
gettimeofday(&beg, NULL);
|
|
|
|
#endif
|
|
|
|
rd = __conn_read(co, bbuf, mhead->payload_length);
|
|
|
|
#ifdef _PERFPROFILE
|
|
|
|
gettimeofday(&end, NULL);
|
|
|
|
if((end.tv_sec - beg.tv_sec) > 0) {
|
|
|
|
printf("connread(payload) Seconds: %ld ", end.tv_sec - beg.tv_sec);
|
|
|
|
printf("µS: %ld\n", end.tv_usec + (1000000 - beg.tv_usec));
|
|
|
|
} else printf("connread(payload) µS: %ld\n", end.tv_usec - beg.tv_usec);
|
|
|
|
#endif
|
|
|
|
|
|
|
|
if(rd == -1) goto __sslproto_error;
|
|
|
|
else pthread_mutex_unlock(&(co->sslinout[0]));
|
|
|
|
|
|
|
|
if(rd != mhead->payload_length) {
|
|
|
|
mid = mhead->msgid;
|
|
|
|
/* if we're need to do something */
|
|
|
|
if(mhead->msgid >= 1024) {
|
|
|
|
mhead->opcode = SXE_INVALINDEX;
|
|
|
|
goto __return_error;
|
|
|
|
} else {
|
|
|
|
// pthread_mutex_lock(&co->idx_msg_lock);
|
|
|
|
msg = co->messages[mid];
|
|
|
|
//thread_mutex_unlock(&co->idx_msg_lock);
|
|
|
|
}
|
|
|
|
if(!msg) {
|
|
|
|
if(mhead->attr & SXMSG_OPEN) mhead->opcode = SXE_BADPROTO;
|
|
|
|
else {
|
|
|
|
if((mhead->attr & SXMSG_PROTO) || (mhead->attr & SXMSG_LINK))
|
|
|
|
mhead->opcode = SXE_BADPROTO;
|
|
|
|
else mhead->opcode = SXE_NOSUCHMSG;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
__return_error:
|
|
|
|
mhead->attr |= SXMSG_CLOSED;
|
|
|
|
mhead->payload_length = 0;
|
|
|
|
pthread_mutex_lock(&(co->sslinout[1]));
|
|
|
|
wr = __conn_write(co, mhead, sizeof(sxmplv2_head_t));
|
|
|
|
pthread_mutex_unlock(&(co->sslinout[1]));
|
|
|
|
if(wr == -1) goto __finish;
|
|
|
|
else goto __again;
|
|
|
|
}
|
|
|
|
} else pthread_mutex_unlock(&(co->sslinout[0]));
|
|
|
|
/* take a message */
|
|
|
|
if(mhead->attr & SXMSG_PROTO) { /* protocol message i.e. channel open/close */
|
|
|
|
/* ok, check up the side */
|
|
|
|
if(mhead->attr & SXMSG_REPLYREQ) { /* means we're not initiators and we don't need to allocate a message */
|
|
|
|
if(mhead->attr & SXMSG_OPEN)
|
|
|
|
mhead->opcode = _channel_open(co, &mhead->reserve);
|
|
|
|
else mhead->opcode = _channel_close(co, mhead->reserve);
|
|
|
|
|
|
|
|
/* set flags */
|
|
|
|
mhead->payload_length = 0;
|
|
|
|
mhead->attr &= ~SXMSG_REPLYREQ;
|
|
|
|
pthread_mutex_lock(&(co->sslinout[1]));
|
|
|
|
wr = __conn_write(co, mhead, sizeof(sxmplv2_head_t));
|
|
|
|
pthread_mutex_unlock(&(co->sslinout[1]));
|
|
|
|
if(wr < 0) goto __finish;
|
|
|
|
} else { /* it's came back */
|
|
|
|
/* reply came ... */
|
|
|
|
if(mhead->msgid >= 1024) {
|
|
|
|
__inval_idx_nor:
|
|
|
|
fprintf(stderr, "[sxmplv2] Invalid index of the message.\n");
|
|
|
|
goto __again;
|
|
|
|
}
|
|
|
|
mid = mhead->msgid;
|
|
|
|
//hread_mutex_lock(&co->idx_msg_lock);
|
|
|
|
msg = co->messages[mid];
|
|
|
|
//hread_mutex_unlock(&co->idx_msg_lock);
|
|
|
|
if(!msg) goto __inval_idx_nor;
|
|
|
|
|
|
|
|
/* ok now we'are copy data and unlock wait mutex */
|
|
|
|
memcpy(&msg->mhead, mhead, sizeof(sxmplv2_head_t));
|
|
|
|
pthread_mutex_unlock(&msg->wait);
|
|
|
|
}
|
|
|
|
} else if(mhead->attr & SXMSG_LINK) { /* link layer messages */
|
|
|
|
if(mhead->attr & SXMSG_CLOSED) goto __finish; /* close the link */
|
|
|
|
if(mhead->attr & SXMSG_PULSE) { /* it's a link pulse messages */
|
|
|
|
/* TODO: syncronization and so on */
|
|
|
|
if(mhead->opcode == SXE_RAPIDMSG) { /* custom pulse */
|
|
|
|
sx = parse_sexp(bbuf, mhead->payload_length);
|
|
|
|
if(sx && co->ssys->on_pulse) co->ssys->on_pulse(co, sx);
|
|
|
|
if(sx) destroy_sexp(sx);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else { /* regular messages */
|
|
|
|
if((mhead->attr & SXMSG_OPEN) && (mhead->attr & SXMSG_REPLYREQ)) { /* dialog initiation */
|
|
|
|
channel = co->channels[mhead->reserve];
|
|
|
|
if(!channel) { /* ok, we'are failed */
|
|
|
|
mhead->opcode = SXE_NOSUCHCHAN;
|
|
|
|
__ret_regerr:
|
|
|
|
mhead->payload_length = 0;
|
|
|
|
mhead->attr &= ~SXMSG_REPLYREQ;
|
|
|
|
mhead->attr &= ~SXMSG_OPEN;
|
|
|
|
mhead->attr |= SXMSG_CLOSED;
|
|
|
|
pthread_mutex_lock(&(co->sslinout[1]));
|
|
|
|
wr = __conn_write(co, mhead, sizeof(sxmplv2_head_t));
|
|
|
|
pthread_mutex_unlock(&(co->sslinout[1]));
|
|
|
|
if(wr < 0) goto __finish;
|
|
|
|
else goto __again;
|
|
|
|
}
|
|
|
|
/* if message is busy - fails */
|
|
|
|
mid = mhead->msgid;
|
|
|
|
msg = co->messages[mid];
|
|
|
|
if(msg) { mhead->opcode = SXE_EBUSY; goto __ret_regerr; }
|
|
|
|
|
|
|
|
/* now we will take a deal */
|
|
|
|
if(!(msg = malloc(sizeof(sxmsg_t)))) {
|
|
|
|
mhead->opcode = SXE_ENOMEM; goto __ret_regerr;
|
|
|
|
} else {
|
|
|
|
/* set mutex and channel */
|
|
|
|
pthread_mutex_init(&msg->wait, NULL);
|
|
|
|
pthread_mutex_lock(&msg->wait);
|
|
|
|
msg->pch = channel;
|
|
|
|
/* copy header only */
|
|
|
|
memcpy(&msg->mhead, mhead, sizeof(sxmplv2_head_t));
|
|
|
|
if(mhead->payload_length) msg->payload = bbuf;
|
|
|
|
}
|
|
|
|
|
|
|
|
pthread_mutex_lock(&co->idx_msg_lock);
|
|
|
|
idx_reserve(&co->idx_msg, mid);
|
|
|
|
co->messages[mid] = msg;
|
|
|
|
pthread_mutex_unlock(&co->idx_msg_lock);
|
|
|
|
|
|
|
|
/* now we are able to process the message */
|
|
|
|
_message_process(msg);
|
|
|
|
} else if(mhead->attr & SXMSG_CLOSED) {
|
|
|
|
/* check for the message */
|
|
|
|
if(mhead->msgid >= 1024) goto __inval_idx_nor;
|
|
|
|
mid = mhead->msgid;
|
|
|
|
pthread_mutex_lock(&co->idx_msg_lock);
|
|
|
|
msg = co->messages[mid];
|
|
|
|
if(!msg) {
|
|
|
|
pthread_mutex_unlock(&co->idx_msg_lock); goto __inval_idx_nor; }
|
|
|
|
|
|
|
|
/* message dialog is closed - remove this right now */
|
|
|
|
|
|
|
|
idx_free(&co->idx_msg, mid);
|
|
|
|
co->messages[mid] = NULL;
|
|
|
|
pthread_mutex_unlock(&co->idx_msg_lock);
|
|
|
|
|
|
|
|
if(msg->mhead.attr & SXMSG_TIMEDOUT) { /* nobody wait for it */
|
|
|
|
/* now just free it */
|
|
|
|
pthread_mutex_unlock(&msg->wait);
|
|
|
|
pthread_mutex_destroy(&msg->wait);
|
|
|
|
free(msg);
|
|
|
|
} else {
|
|
|
|
memcpy(&msg->mhead, mhead, sizeof(sxmplv2_head_t));
|
|
|
|
if(mhead->payload_length) {
|
|
|
|
msg->payload = malloc(mhead->payload_length);
|
|
|
|
if(msg->payload) memcpy(msg->payload, bbuf, mhead->payload_length);
|
|
|
|
else msg->mhead.opcode = SXE_ENOMEM;
|
|
|
|
}
|
|
|
|
|
|
|
|
pthread_mutex_unlock(&msg->wait); /* wake up thread waiting for */
|
|
|
|
}
|
|
|
|
} else if((!(mhead->attr & SXMSG_CLOSED) && !(mhead->attr & SXMSG_OPEN)) &&
|
|
|
|
(mhead->attr & SXMSG_REPLYREQ)) { /* ongoing dialog */
|
|
|
|
/* check for the message */
|
|
|
|
if(mhead->msgid >= 1024) goto __inval_idx_nor;
|
|
|
|
mid = mhead->msgid;
|
|
|
|
msg = co->messages[mid];
|
|
|
|
if(!msg) goto __inval_idx_nor;
|
|
|
|
|
|
|
|
if(msg->mhead.attr & SXMSG_TIMEDOUT) { /* nobody wait for it */
|
|
|
|
pthread_mutex_lock(&co->idx_msg_lock);
|
|
|
|
idx_free(&co->idx_msg, mid);
|
|
|
|
co->messages[mid] = NULL;
|
|
|
|
pthread_mutex_unlock(&co->idx_msg_lock);
|
|
|
|
|
|
|
|
/* now just free it */
|
|
|
|
pthread_mutex_destroy(&msg->wait);
|
|
|
|
free(msg);
|
|
|
|
|
|
|
|
/* we must reply */
|
|
|
|
mhead->opcode = SXE_ETIMEDOUT;
|
|
|
|
goto __ret_regerr;
|
|
|
|
} else {
|
|
|
|
memcpy(&msg->mhead, mhead, sizeof(sxmplv2_head_t));
|
|
|
|
if(mhead->payload_length) {
|
|
|
|
msg->payload = malloc(mhead->payload_length);
|
|
|
|
if(msg->payload) memcpy(msg->payload, bbuf, mhead->payload_length);
|
|
|
|
else {
|
|
|
|
mhead->opcode = msg->mhead.opcode = SXE_ENOMEM; /* we will return it to waitee */
|
|
|
|
msg->mhead.attr &= ~SXMSG_REPLYREQ; /* doesn't need to reply */
|
|
|
|
/* reply here now */
|
|
|
|
mhead->payload_length = 0;
|
|
|
|
mhead->attr &= ~SXMSG_REPLYREQ;
|
|
|
|
mhead->attr &= ~SXMSG_OPEN;
|
|
|
|
mhead->attr |= SXMSG_CLOSED;
|
|
|
|
pthread_mutex_lock(&(co->sslinout[1]));
|
|
|
|
wr = __conn_write(co, mhead, sizeof(sxmplv2_head_t));
|
|
|
|
pthread_mutex_unlock(&(co->sslinout[1]));
|
|
|
|
if(wr < 0) goto __finish;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
pthread_mutex_unlock(&msg->wait); /* wake up thread waiting for */
|
|
|
|
}
|
|
|
|
} else { mhead->opcode = SXE_BADPROTO; goto __ret_regerr; }
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
__finish:
|
|
|
|
|
|
|
|
co->flags |= SXMP_CLOSED;
|
|
|
|
__link_destroy(co);
|
|
|
|
__sxmpl_bundle_destroy(b); /* destroy bundle */
|
|
|
|
return NULL;
|
|
|
|
}
|
|
|
|
|
|
|
|
sxlink_t *sxlink_master_accept(sxhub_t *ssys, int sck, struct in_addr *addr)
|
|
|
|
{
|
|
|
|
void *buf = NULL;
|
|
|
|
char *bbuf;
|
|
|
|
sxlink_t *co = __link_minimal_alloc(addr);
|
|
|
|
sxmsg_t *msg = NULL;
|
|
|
|
sxmplv2_head_t *head;
|
|
|
|
sxmplv2_bundle_t *bundle;
|
|
|
|
sexp_t *sx;
|
|
|
|
size_t rd;
|
|
|
|
int r = SXE_FAILED, i;
|
|
|
|
|
|
|
|
if(!co) {
|
|
|
|
errno = SXE_ENOMEM;
|
|
|
|
return NULL;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* ok, now we need to init ssl stuff */
|
|
|
|
co->ssys = ssys;
|
|
|
|
|
|
|
|
/* check up - do we need to initialize SSL context? */
|
|
|
|
if(!ssys->ctx) {
|
|
|
|
/* init SSL certificates and context */
|
|
|
|
ssys->ctx = SSL_CTX_new(TLSv1_2_server_method());
|
|
|
|
if(!ssys->ctx) { r = SXE_ENOMEM; goto __fail; }
|
|
|
|
else {
|
|
|
|
/* set verify context */
|
|
|
|
SSL_CTX_set_verify(ssys->ctx, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT,
|
|
|
|
__verify_certcall);
|
|
|
|
/* set verify depth */
|
|
|
|
SSL_CTX_set_verify_depth(ssys->ctx, VERIFY_DEPTH);
|
|
|
|
|
|
|
|
/* set cache policy */
|
|
|
|
SSL_CTX_set_session_cache_mode(ssys->ctx, SSL_SESS_CACHE_OFF);
|
|
|
|
SSL_CTX_set_mode(ssys->ctx, SSL_MODE_RELEASE_BUFFERS);
|
|
|
|
}
|
|
|
|
|
|
|
|
/* load certificates */
|
|
|
|
SSL_CTX_load_verify_locations(ssys->ctx, ssys->rootca, NULL);
|
|
|
|
/* set the local certificate from CertFile */
|
|
|
|
if(SSL_CTX_use_certificate_file(ssys->ctx, ssys->certpem,
|
|
|
|
SSL_FILETYPE_PEM)<=0) {
|
|
|
|
r = SXE_ESSL;
|
|
|
|
goto __fail;
|
|
|
|
}
|
|
|
|
/* set the private key from KeyFile (may be the same as CertFile) */
|
|
|
|
if(SSL_CTX_use_PrivateKey_file(ssys->ctx, ssys->certkey,
|
|
|
|
SSL_FILETYPE_PEM)<=0) {
|
|
|
|
r = SXE_ESSL;
|
|
|
|
goto __fail;
|
|
|
|
}
|
|
|
|
/* verify private key */
|
|
|
|
if (!SSL_CTX_check_private_key(ssys->ctx)) {
|
|
|
|
r = SXE_ESSL;
|
|
|
|
goto __fail;
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
/* now we will create an SSL connection */
|
|
|
|
co->ssl = SSL_new(ssys->ctx);
|
|
|
|
if(!co->ssl) { r = SXE_ENOMEM; goto __fail; }
|
|
|
|
else SSL_set_fd(co->ssl, sck); /* attach connected socket */
|
|
|
|
|
|
|
|
/* set the context to verify ssl connection */
|
|
|
|
SSL_set_ex_data(co->ssl, ex_ssldata_index, (void *)co);
|
|
|
|
SSL_set_accept_state(co->ssl);
|
|
|
|
if(SSL_accept(co->ssl) == -1) { r = SXE_EPERM; goto __fail; } /* leak here ? */
|
|
|
|
// SSL_do_handshake(co->ssl);
|
|
|
|
|
|
|
|
/* ok, now we are able to allocate and so on */
|
|
|
|
/* set connection to the batch mode */
|
|
|
|
co->flags |= SXMP_BATCHMODE;
|
|
|
|
/* allocate our first buffer */
|
|
|
|
buf = mmap(NULL, 65536, PROT_READ|PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
|
|
|
|
if(buf == MAP_FAILED) { r = SXE_ENOMEM; goto __fail2; }
|
|
|
|
/* allocate first message */
|
|
|
|
if(!(msg = malloc(sizeof(sxmsg_t)))) { r = SXE_ENOMEM; goto __fail2; }
|
|
|
|
else {
|
|
|
|
memset(msg, 0, sizeof(sxmsg_t));
|
|
|
|
co->messages[0] = msg;
|
|
|
|
}
|
|
|
|
bbuf = (char *)buf;
|
|
|
|
bbuf += sizeof(sxmplv2_head_t);
|
|
|
|
|
|
|
|
while(co->flags & SXMP_BATCHMODE) {
|
|
|
|
rd = __conn_read(co, buf, sizeof(sxmplv2_head_t));
|
|
|
|
if(rd == sizeof(sxmplv2_head_t)) {
|
|
|
|
head = (sxmplv2_head_t *)buf;
|
|
|
|
|
|
|
|
/* check for returns */
|
|
|
|
if(head->opcode != SXE_SUCCESS) { r = head->opcode; goto __fail3; }
|
|
|
|
else { /* opcode is fine */
|
|
|
|
/* if we're ready for messaging mode, turn off batch mode */
|
|
|
|
if(co->flags & SXMP_MESSAGINGMODE) {
|
|
|
|
co->flags &= ~SXMP_BATCHMODE;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if(!head->payload_length) continue; /* pass the following check up */
|
|
|
|
|
|
|
|
rd = __conn_read(co, bbuf, head->payload_length);
|
|
|
|
if(rd == -1) { r = SXE_LINKERROR; goto __fail3; }
|
|
|
|
if(rd != head->payload_length) { r = SXE_LINKERROR; goto __fail3; }
|
|
|
|
bbuf[rd] = '\0';
|
|
|
|
sx = parse_sexp(bbuf, rd);
|
|
|
|
if(!sx) goto __fail3;
|
|
|
|
|
|
|
|
/* initialize message */
|
|
|
|
msg->payload = bbuf;
|
|
|
|
msg->mhead.payload_length = 0;
|
|
|
|
/* deal with it */
|
|
|
|
r = __eval_syssexp(co, sx);
|
|
|
|
memcpy(head, &msg->mhead, sizeof(sxmplv2_head_t));
|
|
|
|
head->opcode = r;
|
|
|
|
if(r != SXE_SUCCESS) { /* we finish */
|
|
|
|
head->payload_length = 0;
|
|
|
|
__conn_write(co, head, sizeof(sxmplv2_head_t));
|
|
|
|
destroy_sexp(sx);
|
|
|
|
goto __fail3;
|
|
|
|
}
|
|
|
|
rd = __conn_write(co, buf, sizeof(sxmplv2_head_t) + msg->mhead.payload_length);
|
|
|
|
if(rd == -1) { r = SXE_LINKERROR; goto __fail3; }
|
|
|
|
if(rd != sizeof(sxmplv2_head_t) + msg->mhead.payload_length) {
|
|
|
|
destroy_sexp(sx);
|
|
|
|
goto __fail3;
|
|
|
|
}
|
|
|
|
|
|
|
|
destroy_sexp(sx);
|
|
|
|
} else { r = SXE_LINKERROR; goto __fail3; }
|
|
|
|
}
|
|
|
|
|
|
|
|
/* if we're there - negotiation is done, going to init messaging mode */
|
|
|
|
r = __link_second_alloc(co);
|
|
|
|
if(r != SXE_SUCCESS) goto __fail3;
|
|
|
|
|
|
|
|
/* free message */
|
|
|
|
co->messages[0] = NULL;
|
|
|
|
free(msg);
|
|
|
|
|
|
|
|
/* and now we're need to create a thread poll */
|
|
|
|
if(!(bundle = malloc(sizeof(sxmplv2_bundle_t)))) { r = SXE_ENOMEM; goto __fail4; }
|
|
|
|
else {
|
|
|
|
bundle->buf = buf;
|
|
|
|
bundle->conn = co;
|
|
|
|
}
|
|
|
|
|
|
|
|
for(i = 0; i < MAX_SXMPLTHREADS; i++) {
|
|
|
|
if(bundle == (void *)0xdead) bundle = __sxmpl_bundle_create(co);
|
|
|
|
if(!bundle) goto __fail5;
|
|
|
|
r = pthread_create(&co->thrd_poll[i], NULL, __sxmpl_thread, bundle); /* and here, alloc tls */
|
|
|
|
if(r) goto __fail5;
|
|
|
|
else {
|
|
|
|
bundle = (void *)0xdead;
|
|
|
|
pthread_detach(co->thrd_poll[i]);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/* all is done, connection now ready */
|
|
|
|
co->flags |= SXMP_ALIVE;
|
|
|
|
|
|
|
|
r = SXE_SUCCESS;
|
|
|
|
errno = r;
|
|
|
|
|
|
|
|
/* free context for this thread */
|
|
|
|
ERR_remove_state(0);
|
|
|
|
|
|
|
|
return co;
|
|
|
|
|
|
|
|
__fail5:
|
|
|
|
r = SXE_ENOMEM;
|
|
|
|
/* bundles will be freed by the threads when SSL_read will fails. */
|
|
|
|
__fail4:
|
|
|
|
__link_second_free(co);
|
|
|
|
__fail3:
|
|
|
|
if(ssys->on_destroy) ssys->on_destroy(co);
|
|
|
|
__fail2:
|
|
|
|
if(msg) free(msg);
|
|
|
|
if(buf != MAP_FAILED) munmap(buf, 65536);
|
|
|
|
SSL_shutdown(co->ssl);
|
|
|
|
__fail:
|
|
|
|
if(co) {
|
|
|
|
if(co->ssl) {
|
|
|
|
ERR_remove_thread_state(0);
|
|
|
|
ERR_remove_state(0);
|
|
|
|
ERR_free_strings();
|
|
|
|
|
|
|
|
SSL_free(co->ssl);
|
|
|
|
}
|
|
|
|
__link_minimal_free(co);
|
|
|
|
}
|
|
|
|
close(sck);
|
|
|
|
errno = r;
|
|
|
|
|
|
|
|
return NULL;
|
|
|
|
}
|
|
|
|
|
|
|
|
sxlink_t *sxlink_connect(sxhub_t *ssys, const char *host,
|
|
|
|
int port, const char *SSL_cert, const char *login,
|
|
|
|
const char *passwd)
|
|
|
|
{
|
|
|
|
sxlink_t *co = __link_minimal_alloc(NULL);
|
|
|
|
struct hostent *host_;
|
|
|
|
struct sockaddr_in addr;
|
|
|
|
int r = SXE_SUCCESS, sck;
|
|
|
|
#ifdef WIN32
|
|
|
|
WSADATA wsaData;
|
|
|
|
#endif
|
|
|
|
char hostbuf[2048];
|
|
|
|
void *buf = NULL;
|
|
|
|
char *bbuf;
|
|
|
|
sxmplv2_head_t *head;
|
|
|
|
sxmplv2_bundle_t *bundle;
|
|
|
|
sxmsg_t *msg;
|
|
|
|
size_t rd, wr;
|
|
|
|
int i;
|
|
|
|
|
|
|
|
r = SXE_IGNORED;
|
|
|
|
if(!host || !SSL_cert) goto __fail;
|
|
|
|
if(!co) { r = SXE_ENOMEM; goto __fail; }
|
|
|
|
|
|
|
|
#ifdef WIN32
|
|
|
|
WSAStartup(MAKEWORD(2, 2), &wsaData);
|
|
|
|
#endif
|
|
|
|
|
|
|
|
/* ok, now we need to init ssl stuff */
|
|
|
|
co->ssys = ssys;
|
|
|
|
|
|
|
|
/* check up ssl context */
|
|
|
|
if(!ssys->ctx) {
|
|
|
|
/* init SSL certificates and context */
|
|
|
|
ssys->ctx = SSL_CTX_new(TLSv1_2_client_method());
|
|
|
|
if(!ssys->ctx) { r = SXE_ENOMEM; goto __fail; }
|
|
|
|
else {
|
|
|
|
/* set verify context */
|
|
|
|
SSL_CTX_set_verify(ssys->ctx, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT,
|
|
|
|
__verify_certcall_dummy);
|
|
|
|
/* set verify depth */
|
|
|
|
SSL_CTX_set_verify_depth(ssys->ctx, VERIFY_DEPTH);
|
|
|
|
}
|
|
|
|
|
|
|
|
/* load certificates */
|
|
|
|
SSL_CTX_load_verify_locations(ssys->ctx, ssys->rootca, NULL);
|
|
|
|
/* set the local certificate from CertFile */
|
|
|
|
if(SSL_CTX_use_certificate_file(ssys->ctx, SSL_cert,
|
|
|
|
SSL_FILETYPE_PEM)<=0) {
|
|
|
|
r = SXE_ESSL;
|
|
|
|
goto __fail;
|
|
|
|
}
|
|
|
|
/* set the private key from KeyFile (may be the same as CertFile) */
|
|
|
|
if(SSL_CTX_use_PrivateKey_file(ssys->ctx, SSL_cert,
|
|
|
|
SSL_FILETYPE_PEM)<=0) {
|
|
|
|
r = SXE_ESSL;
|
|
|
|
goto __fail;
|
|
|
|
}
|
|
|
|
/* verify private key */
|
|
|
|
if (!SSL_CTX_check_private_key(ssys->ctx)) {
|
|
|
|
r = SXE_ESSL;
|
|
|
|
goto __fail;
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
/* set the local certificate from CertFile */
|
|
|
|
if(SSL_CTX_use_certificate_file(ssys->ctx, SSL_cert,
|
|
|
|
SSL_FILETYPE_PEM)<=0) {
|
|
|
|
r = SXE_ESSL;
|
|
|
|
goto __fail;
|
|
|
|
}
|
|
|
|
/* set the private key from KeyFile (may be the same as CertFile) */
|
|
|
|
if(SSL_CTX_use_PrivateKey_file(ssys->ctx, SSL_cert,
|
|
|
|
SSL_FILETYPE_PEM)<=0) {
|
|
|
|
r = SXE_ESSL;
|
|
|
|
goto __fail;
|
|
|
|
}
|
|
|
|
/* verify private key */
|
|
|
|
if (!SSL_CTX_check_private_key(ssys->ctx)) {
|
|
|
|
r = SXE_ESSL;
|
|
|
|
goto __fail;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/* resolve host */
|
|
|
|
#ifdef WIN32
|
|
|
|
host_ = gethostbyname(host);
|
|
|
|
if(!host_) {
|
|
|
|
r = SXE_FAILED;
|
|
|
|
goto __fail;
|
|
|
|
}
|
|
|
|
#else
|
|
|
|
r = __resolvehost(host, hostbuf, 2048, &host_);
|
|
|
|
if(r) {
|
|
|
|
r = SXE_FAILED;
|
|
|
|
goto __fail;
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
|
|
|
|
/* create a socket */
|
|
|
|
sck = socket(PF_INET, SOCK_STREAM, 0);
|
|
|
|
memset(&addr, 0, sizeof(addr));
|
|
|
|
|
|
|
|
/* try to connect it */
|
|
|
|
addr.sin_family = AF_INET;
|
|
|
|
addr.sin_port = htons(port);
|
|
|
|
addr.sin_addr.s_addr = *(uint32_t*)(host_->h_addr);
|
|
|
|
r = connect(sck, (struct sockaddr*)&addr, sizeof(addr));
|
|
|
|
if(r) {
|
|
|
|
close(sck);
|
|
|
|
r = SXE_FAILED; /* couldn't connect to the desired host */
|
|
|
|
goto __fail;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* SSL handshake */
|
|
|
|
co->ssl = SSL_new(ssys->ctx); /* TODO: checkout for it */
|
|
|
|
SSL_set_fd(co->ssl, sck); /* attach connected socket */
|
|
|
|
SSL_set_connect_state(co->ssl);
|
|
|
|
if(SSL_connect(co->ssl) == -1) {
|
|
|
|
r = SXE_EPERM;
|
|
|
|
/* shutdown connection */
|
|
|
|
goto __fail;
|
|
|
|
} /* if success we're ready to use established SSL channel */
|
|
|
|
|
|
|
|
/* set connection to the batch mode */
|
|
|
|
co->flags |= SXMP_BATCHMODE;
|
|
|
|
|
|
|
|
/* allocate our first buffer */
|
|
|
|
buf = mmap(NULL, 65536, PROT_READ|PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
|
|
|
|
if(buf == MAP_FAILED) { r = SXE_ENOMEM; goto __fail2; }
|
|
|
|
/* allocate first message */
|
|
|
|
if(!(msg = malloc(sizeof(sxmsg_t)))) { r = SXE_ENOMEM; goto __fail2; }
|
|
|
|
else {
|
|
|
|
memset(msg, 0, sizeof(sxmsg_t));
|
|
|
|
co->messages[0] = msg;
|
|
|
|
}
|
|
|
|
bbuf = (char *)buf;
|
|
|
|
bbuf += sizeof(sxmplv2_head_t);
|
|
|
|
head = (sxmplv2_head_t *)buf;
|
|
|
|
|
|
|
|
sexp_t *sx;
|
|
|
|
size_t ln;
|
|
|
|
while(co->flags & SXMP_BATCHMODE) {
|
|
|
|
/* form a message -- credentials */
|
|
|
|
ln = snprintf(bbuf, 65535 - sizeof(sxmplv2_head_t), "(auth-set-credentials \"%s\" \"%s\")",
|
|
|
|
login ? login : "nil", passwd ? passwd : "nil");
|
|
|
|
|
|
|
|
head->opcode = SXE_SUCCESS;
|
|
|
|
head->payload_length = ln;
|
|
|
|
wr = __conn_write(co, buf, ln + sizeof(sxmplv2_head_t));
|
|
|
|
if(wr < 0) goto __fail2;
|
|
|
|
|
|
|
|
rd = __conn_read(co, head, sizeof(sxmplv2_head_t));
|
|
|
|
if(rd < 0) goto __fail2;
|
|
|
|
if(head->opcode != SXE_SUCCESS) {
|
|
|
|
r = head->opcode;
|
|
|
|
goto __fail2;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* ok, get available channels */
|
|
|
|
head->opcode = SXE_SUCCESS;
|
|
|
|
ln = snprintf(bbuf, 65535 - sizeof(sxmplv2_head_t), "(get-channels-list)");
|
|
|
|
head->payload_length = ln;
|
|
|
|
wr = __conn_write(co, buf, ln + sizeof(sxmplv2_head_t));
|
|
|
|
if(wr < 0) goto __fail2;
|
|
|
|
|
|
|
|
rd = __conn_read(co, head, sizeof(sxmplv2_head_t));
|
|
|
|
if(rd < 0) goto __fail2;
|
|
|
|
if(head->opcode != SXE_SUCCESS) goto __fail2;
|
|
|
|
if(!head->payload_length) goto __fail2;
|
|
|
|
rd = __conn_read(co, bbuf, head->payload_length);
|
|
|
|
if(rd < 0) goto __fail2;
|
|
|
|
|
|
|
|
/* perform a parsing of the desired message */
|
|
|
|
bbuf[rd] = '\0';
|
|
|
|
sx = parse_sexp(bbuf, rd);
|
|
|
|
if(!sx) { r = SXE_BADPROTO; goto __fail2; }
|
|
|
|
r = __eval_syssexp(co, sx);
|
|
|
|
if(!r) r = SXE_SUCCESS;
|
|
|
|
destroy_sexp(sx);
|
|
|
|
|
|
|
|
/* write back */
|
|
|
|
head->opcode = r;
|
|
|
|
head->payload_length = 0;
|
|
|
|
wr = __conn_write(co, head, sizeof(sxmplv2_head_t));
|
|
|
|
if(wr < 0) {
|
|
|
|
r = SXE_LINKERROR; goto __fail2;}
|
|
|
|
if(r != SXE_SUCCESS) { r = SXE_LINKERROR; goto __fail2;}
|
|
|
|
}
|
|
|
|
|
|
|
|
/* if we're there - negotiation is done, going to init messaging mode */
|
|
|
|
r = __link_second_alloc(co);
|
|
|
|
if(r != SXE_SUCCESS) goto __fail3;
|
|
|
|
|
|
|
|
/* free message */
|
|
|
|
co->messages[0] = NULL;
|
|
|
|
free(msg);
|
|
|
|
|
|
|
|
/* and now we're need to create a thread poll */
|
|
|
|
if(!(bundle = malloc(sizeof(sxmplv2_bundle_t)))) { r = SXE_ENOMEM; goto __fail4; }
|
|
|
|
else {
|
|
|
|
bundle->buf = buf;
|
|
|
|
bundle->conn = co;
|
|
|
|
}
|
|
|
|
for(i = 0; i < MAX_SXMPLTHREADS; i++) {
|
|
|
|
if(bundle == (void *)0xdead) bundle = __sxmpl_bundle_create(co);
|
|
|
|
if(!bundle) goto __fail5;
|
|
|
|
r = pthread_create(&co->thrd_poll[i], NULL, __sxmpl_thread, bundle);
|
|
|
|
if(r) goto __fail5;
|
|
|
|
else {
|
|
|
|
pthread_detach(co->thrd_poll[i]);
|
|
|
|
bundle = (void *)0xdead;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/* all is done, connection now ready */
|
|
|
|
co->flags |= SXMP_ALIVE;
|
|
|
|
|
|
|
|
return co;
|
|
|
|
|
|
|
|
__fail5:
|
|
|
|
r = SXE_ENOMEM;
|
|
|
|
/* bundles will be freed by the threads when SSL_read will fails. */
|
|
|
|
__fail4:
|
|
|
|
__link_second_free(co);
|
|
|
|
__fail3:
|
|
|
|
if(ssys->on_destroy) ssys->on_destroy(co);
|
|
|
|
__fail2:
|
|
|
|
if(buf != MAP_FAILED) munmap(buf, 65536);
|
|
|
|
SSL_shutdown(co->ssl);
|
|
|
|
ERR_remove_thread_state(0);
|
|
|
|
ERR_remove_state(0);
|
|
|
|
|
|
|
|
close(sck);
|
|
|
|
__fail:
|
|
|
|
if(co) {
|
|
|
|
if(co->ssl) SSL_free(co->ssl);
|
|
|
|
__link_minimal_free(co);
|
|
|
|
}
|
|
|
|
errno = r;
|
|
|
|
return NULL;
|
|
|
|
}
|
|
|
|
|
|
|
|
int sxlink_close(sxlink_t *co)
|
|
|
|
{
|
|
|
|
sxmplv2_head_t mhead;
|
|
|
|
|
|
|
|
memset(&mhead, 0, sizeof(sxmplv2_head_t));
|
|
|
|
/* setup header */
|
|
|
|
mhead.attr = SXMSG_LINK | SXMSG_CLOSED;
|
|
|
|
|
|
|
|
pthread_mutex_lock(&(co->sslinout[1]));
|
|
|
|
__conn_write(co, &mhead, sizeof(sxmplv2_head_t));
|
|
|
|
pthread_mutex_unlock(&(co->sslinout[1]));
|
|
|
|
|
|
|
|
/* we will not wait anything */
|
|
|
|
co->flags |= SXMP_CLOSED;
|
|
|
|
|
|
|
|
/* TODO: wait until all threads will finish */
|
|
|
|
usleep(20000);
|
|
|
|
|
|
|
|
return SXE_SUCCESS;
|
|
|
|
}
|