more compilable sources;

v0.5.xx
Alexander Vdolainen 10 years ago
parent 1872fddbbf
commit 7b025730d7

1
.gitignore vendored

@ -49,3 +49,4 @@ aclocal
coq coq
*.log *.log
*.crt *.crt
lib/libsntllv2.pc

@ -54,7 +54,7 @@ AM_CONDITIONAL(BUILD_WIN32, test "x$enable_win32_build" = "xyes")
AC_OUTPUT([ AC_OUTPUT([
Makefile Makefile
lib/libsntl.pc lib/libsntllv2.pc
lib/Makefile lib/Makefile
include/Makefile include/Makefile
examples/Makefile]) examples/Makefile])

@ -0,0 +1,228 @@
/*
* Secure Network Transport Layer Library v2 implementation.
* (sntllv2) it superseed all versions before due to the:
* - memory consumption
* - new features such as pulse emitting
* - performance optimization
*
* This is a proprietary software. See COPYING for further details.
*
* (c) Askele Group 2013-2015 <http://askele.com>
*
*/
#ifndef __SNTL_SNTLLV2_H__
#define __SNTL_SNTLLV2_H__
#include <stdint.h>
#include <time.h>
#include <sys/types.h>
#include <pthread.h>
#include <openssl/ssl.h>
#include <tdata/usrtc.h>
#include <tdata/idx_allocator.h>
#include <tdata/list.h>
#include <sexpr/sexp.h>
#include <sexpr/faststack.h>
#include <sntl/errno.h>
#define VERIFY_DEPTH 1 /* FIXME: */
typedef struct __perm_context_type {
char *login;
char *passwd;
uint64_t certid;
struct in_addr *addr;
void *priv;
} perm_ctx_t;
/* 8 byte header */
typedef struct __sntllv2_head_type {
uint16_t msgid;
uint16_t payload_length;
uint8_t attr;
uint8_t opcode;
uint16_t reserve;
}__attribute__((packed)) sntllv2_head_t;
struct __connections_subsys_type;
struct __channel_t;
struct __message_t;
/* flags for the connection link */
#define SNSX_BATCHMODE (1 << 1)
#define SNSX_MESSAGINGMODE (1 << 2)
#define SNSX_ALIVE (1 << 3)
#define SNSX_CLOSED (1 << 4)
/*
* älä jätä kommentteja omalla kielellä! yksinkertaisia englanti sijaan!
* i found somebody who write comments and messages in non-english,
* it's a fucking practice - forget it.
*/
typedef struct __connection_t {
/* General section */
struct __connections_subsys_type *ssys; /* < connections subsystem */
char *uuid; /** < uuid of the connection */
/* Channels section */
idx_allocator_t idx_ch; /** < index allocation for channels */
pthread_mutex_t idx_ch_lock; /** < mutex for allocating and deallocating channels */
struct __channel_t **channels; /** < channels O(1) storage */
/* RPC section */
usrtc_t *rpc_list; /** < search tree of possible RPC typed lists */
/* SSL related section */
SSL_CTX *ctx; /** < SSL context */
SSL *ssl; /** < SSL connection */
int ssl_data_index; /** < SSL index for the custom data */
pthread_mutex_t sslinout[2]; /** < SSL related locks for in and out */
/* Security section */
perm_ctx_t *pctx; /** < higher layer authentification context */
/* Messages section */
struct __message_t **messages; /** < messages O(1) storage */
idx_allocator_t idx_msg;
pthread_mutex_t idx_msg_lock;
list_head_t write_pending; /** < list of messages waiting for write */
pthread_mutex_t write_pending_lock;
volatile uint8_t unused_messages; /** < unused message count */
/* Other stuff */
pthread_t thrd_poll[8];
volatile uint8_t flags; /** < flags of the connection */
usrtc_node_t csnode; /** < node to store the connection within list */
} conn_t;
struct __connection_rpc_list_type;
struct __message_t;
typedef struct __channel_t {
uint16_t cid; /** < ID of the channel */
conn_t *connection; /** < pointer to the parent connection */
struct __connection_rpc_list_type *rpc_list; /** < rpc functions list */
int flags; /** < flags of the channel */
} chnl_t;
/* message flags */
#define SXMSG_OPEN (1 << 1)
#define SXMSG_CLOSED (1 << 2)
#define SXMSG_PROTO (1 << 3)
#define SXMSG_LINK (1 << 4)
#define SXMSG_REPLYREQ (1 << 5)
#define SXMSG_PULSE (1 << 6)
#define SXMSG_TIMEDOUT (1 << 7)
/**
* \brief Message used in sntl message passing
*
* This structure used to manage a message within a channel
* of the sntl structure stack.
*/
typedef struct __message_t {
chnl_t *pch; /** < channel of the message(if applicable) */
pthread_mutex_t wait; /** < special wait mutex, used for pending list and sync */
sntllv2_head_t mhead;
void *payload; /** < payload */
} sxmsg_t;
#define sxmsg_payload(m) (m)->payload
#define sxmsg_datalen(m) (m)->mhead.payload_length
#define sxmsg_rapidbuf(m) (m)->payload
#define sxmsg_retcode(m) (m)->mhead.opcode
#define sxmsg_waitlock(m) pthread_mutex_lock(&((m)->wait))
#define sxmsg_waitunlock(m) pthread_mutex_unlock(&((m)->wait))
typedef struct __connection_rpc_entry_type {
char *name;
int (*rpcf)(void *, sexp_t *);
usrtc_node_t node;
} cx_rpc_t;
typedef struct __connection_rpc_list_type {
usrtc_t *rpc_tree; /** < search tree for the rpc lookup */
char *opt_version; /** < reserved for future implementations */
} cx_rpc_list_t;
#define MAX_CONNECTIONS 32768
/**
* \brief Connection subsystem structure.
*
* This structure used for management and control a set of a
* determined connections with the same RPC lists and the same
* mode (server, client).
*
*/
typedef struct __connections_subsys_type {
usrtc_t *connections;
pthread_rwlock_t rwlock;
char *rootca, *certpem, *certkey; /* path name to the certificates */
cx_rpc_list_t *system_rpc;
/* special functions pointers */
int (*validate_sslpem)(conn_t *); /** < this function used to validate SSL certificate while SSL handshake */
int (*secure_check)(conn_t *); /** < this function authorize user to login,
* and also should check SSL cert and user, and already made sessions */
usrtc_t* (*get_rpc_typed_list_tree)(conn_t *); /** < this function is used to set RPC list of the functions */
int (*set_typed_list_callback)(conn_t *, int, char *); /** < this function is a callback
* during setting up a typed channel */
void (*on_destroy)(conn_t *); /** < callback on connection destroy */
void (*on_pulse)(conn_t *, sxmsg_t *); /** < callback on pulse emit */
void *priv;
} conn_sys_t;
#define connections_set_sslvalidate(c, f) (c)->validate_sslpem = (f)
#define connections_set_authcheck(c, f) (c)->secure_check = (f)
#define connections_set_rpcvalidator(c, f) (c)->get_rpc_typed_list_tree = (f)
#define connections_set_channelcall(c, f) (c)->set_typed_list_callback = (f)
#define connections_set_ondestroy(c, f) (c)->on_destroy = (f)
#define connections_set_onpulse(c, f) (c)->on_pulse = (f)
typedef struct __rpc_typed_list_type {
int type_id;
char *description;
cx_rpc_list_t *rpc_list;
usrtc_node_t lnode;
} rpc_typed_list_t;
/* API */
int connections_init(conn_sys_t *ssys);
conn_sys_t *connections_create(void);
int connections_destroy(conn_sys_t *ssys);
int connections_free(conn_sys_t *ssys);
/* create links */
conn_t *connection_master_link(conn_sys_t *ssys, int sck, struct in_addr *addr);
conn_t *connection_link(conn_sys_t *ssys, const char *host,
int port, const char *SSL_cert, const char *login,
const char *passwd);
int connection_close(conn_t *co);
/* channels */
chnl_t *sxchannel_open(conn_t *co, int type);
int sxchannel_close(chnl_t *channel);
/* messages */
/*
* creates a message with a payload.
* Will return a error code, and, if applicable, pointer to message
*/
int sxmsg_send(chnl_t *channel, const char *data, size_t datalen, sxmsg_t **msg);
/* the same - postponed message i.e. will be written to the queue - not to write immendatly */
int sxmsg_send_pp(chnl_t *channel, const char *data, size_t datalen, sxmsg_t **msg);
/* send a pulse message */
int sxmsg_pulse(conn_t *co, const char *data, size_t datalen);
/* the same but will be postponed */
int sxmsg_pulse_pp(conn_t *co, const char *data, size_t datalen);
int sxmsg_reply(sxmsg_t *msg, const char *data, size_t datalen);
int sxmsg_reply_pp(sxmsg_t *msg, const char *data, size_t datalen);
int sxmsg_rreply(sxmsg_t *msg, const char *data, size_t datalen);
int sxmsg_rreply_pp(sxmsg_t *msg, const char *data, size_t datalen);
int sxmsg_return(sxmsg_t *msg, int opcode);
int sxmsg_return_pp(sxmsg_t *msg, int opcode);
#endif /* __SNTL_SNTLLV2_H__ */

@ -10,25 +10,25 @@ AM_CFLAGS =\
-Wall\ -Wall\
-g -g
lib_LTLIBRARIES = libsntl.la lib_LTLIBRARIES = libsntllv2.la
libsntl_la_SOURCES = \ libsntllv2_la_SOURCES = \
support.c queue.c rpclist.c message.c channel.c connection.c connex.c sntllv2.c
libsntl_la_LDFLAGS = -Wl,--export-dynamic libsntllv2_la_LDFLAGS = -Wl,--export-dynamic
libsntl_la_LIBADD = -lpthread -lcrypto $(LIBTDATA_LIBS) $(LIBSEXPR_LIBS) $(OPENSSL_LIBS) libsntllv2_la_LIBADD = -lpthread -lcrypto $(LIBTDATA_LIBS) $(LIBSEXPR_LIBS) $(OPENSSL_LIBS)
if BUILD_WIN32 if BUILD_WIN32
libsntl_la_LIBADD += -luuid libsntllv2_la_LIBADD += -luuid
else else
libsntl_la_LIBADD += $(LIBUUID_LIBS) libsntllv2_la_LIBADD += $(LIBUUID_LIBS)
endif !BUILD_WIN32 endif !BUILD_WIN32
pkgconfigdir = $(libdir)/pkgconfig pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = libsntl.pc pkgconfig_DATA = libsntllv2.pc
EXTRA_DIST = \ EXTRA_DIST = \
libsntl.pc.in libsntllv2.pc.in

@ -0,0 +1,50 @@
/*
* Secure Network Transport Layer Library v2 implementation.
* (sntllv2) it superseed all versions before due to the:
* - memory consumption
* - new features such as pulse emitting
* - performance optimization
*
* This is a proprietary software. See COPYING for further details.
*
* (c) Askele Group 2013-2015 <http://askele.com>
*
*/
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <fcntl.h>
#ifdef WIN32
#include <Winsock2.h>
#define EBADE 1
#define NETDB_SUCCESS 0
#else
#include <sys/select.h>
#include <netdb.h>
#include <unistd.h>
#include <uuid/uuid.h>
#endif
#include <openssl/ssl.h>
#include <openssl/err.h>
#include <tdata/usrtc.h>
#include <sexpr/sexp.h>
#include <sntl/sntllv2.h>
/* locally used functions */
uint8_t _channel_open(conn_t *co, uint16_t *chid)
{
}
uint8_t _channel_close(conn_t *co, uint16_t chid)
{
}

@ -54,6 +54,12 @@ static int ex_ssldata_index; /** < index used to work with additional data
int sntl_init(void) int sntl_init(void)
{ {
/* init SSL library */
SSL_library_init();
OpenSSL_add_all_algorithms();
SSL_load_error_strings();
ex_ssldata_index = SSL_get_ex_new_index(0, "__ssldata index", NULL, NULL, NULL); ex_ssldata_index = SSL_get_ex_new_index(0, "__ssldata index", NULL, NULL, NULL);
return 0; return 0;

@ -38,7 +38,7 @@
#include <tdata/usrtc.h> #include <tdata/usrtc.h>
#include <sexpr/sexp.h> #include <sexpr/sexp.h>
#include <sntl/connection.h> #include <sntl/sntllv2.h>
static int __insert_rpc_function(usrtc_t *tree, const char *name, int (*rpcf)(void *, sexp_t *)) static int __insert_rpc_function(usrtc_t *tree, const char *name, int (*rpcf)(void *, sexp_t *))
{ {
@ -117,12 +117,12 @@ static int __get_channels_list(void *cctx, sexp_t *sx)
{ {
conn_t *co = (conn_t *)cctx; conn_t *co = (conn_t *)cctx;
conn_sys_t *ssys = co->ssys; conn_sys_t *ssys = co->ssys;
sx_msg_t *msg = co->messages[0]; sxmsg_t *msg = co->messages[0];
char *buf = msg->payload; char *buf = msg->payload;
usrtc_node_t *node; usrtc_node_t *node;
rpc_typed_list_t *list_ent; rpc_typed_list_t *list_ent;
size_t maxlen = 65535 - sizeof(sntllv2_head_t); size_t maxlen = 65535 - sizeof(sntllv2_head_t);
size_t len, ulen = 0; size_t ulen = 0;
/* we will avoid S-exp scanning here */ /* we will avoid S-exp scanning here */
@ -140,7 +140,7 @@ static int __get_channels_list(void *cctx, sexp_t *sx)
list_ent->type_id, list_ent->description); list_ent->type_id, list_ent->description);
} }
ulen += snprintf(buf + ulen, maxlen - ulen, ")"); ulen += snprintf(buf + ulen, maxlen - ulen, ")");
msg->payload_length = ulen + sizeof(sntllv2_head_t); msg->mhead.payload_length = ulen + sizeof(sntllv2_head_t);
/* we're ready for messaging mode */ /* we're ready for messaging mode */
co->flags |= SNSX_MESSAGINGMODE; co->flags |= SNSX_MESSAGINGMODE;
@ -201,6 +201,11 @@ static int __init_systemrpc_tree(usrtc_t *rtree)
return ENOMEM; return ENOMEM;
} }
static long __cmp_cstr(const void *a, const void *b)
{
return (long)strcmp((const char *)a, (const char *)b);
}
int connections_init(conn_sys_t *ssys) int connections_init(conn_sys_t *ssys)
{ {
int r = 0; int r = 0;

@ -5,9 +5,9 @@ datarootdir=@datarootdir@
datadir=@datadir@ datadir=@datadir@
includedir=@includedir@ includedir=@includedir@
Name: libsntl Name: libsntllv2
Description: Secure Network Transport Layer library implementation Description: Secure Network Transport Layer library implementation
Version: @VERSION@ Version: @VERSION@
Requires: Requires:
Libs: -L${libdir} -lsntl Libs: -L${libdir} -lsntllv2
Cflags: -I${includedir} Cflags: -I${includedir}

@ -19,6 +19,7 @@
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/time.h> #include <sys/time.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/mman.h>
#include <fcntl.h> #include <fcntl.h>
#ifdef WIN32 #ifdef WIN32
@ -36,9 +37,10 @@
#include <openssl/err.h> #include <openssl/err.h>
#include <tdata/usrtc.h> #include <tdata/usrtc.h>
#include <tdata/list.h>
#include <sexpr/sexp.h> #include <sexpr/sexp.h>
#include <sntl/connection.h> #include <sntl/sntllv2.h>
typedef struct __sntll_bundle_type { typedef struct __sntll_bundle_type {
void *buf; void *buf;
@ -56,7 +58,7 @@ static int __conn_read(conn_t *co, void *buf, size_t buf_len)
ofcmode = fcntl(rfd, F_GETFL,0); ofcmode = fcntl(rfd, F_GETFL,0);
ofcmode |= O_NDELAY; ofcmode |= O_NDELAY;
if(fcntl(rfd, F_SETFL, ofcmode)) if(fcntl(rfd, F_SETFL, ofcmode))
fprintf(stderr, "Couldn't make socket nonblocking"); fprintf(stderr, "[sntllv2] (RD)Couldn't make socket nonblocking");
#endif #endif
__retry: __retry:
@ -78,24 +80,24 @@ static int __conn_read(conn_t *co, void *buf, size_t buf_len)
case SSL_ERROR_SYSCALL: case SSL_ERROR_SYSCALL:
if(errno == EAGAIN || errno == EINTR) goto __try_again; if(errno == EAGAIN || errno == EINTR) goto __try_again;
else { else {
fprintf(stderr, "SSL syscall error.\n"); fprintf(stderr, "[sntllv2] (RD)SSL syscall error.\n");
goto __close_conn; goto __close_conn;
} }
break; break;
case SSL_ERROR_WANT_CONNECT: case SSL_ERROR_WANT_CONNECT:
case SSL_ERROR_WANT_ACCEPT: case SSL_ERROR_WANT_ACCEPT:
fprintf(stderr, "SSL negotiation required. Trying again.\n"); fprintf(stderr, "[sntllv2] (RD)SSL negotiation required. Trying again.\n");
goto __try_again; goto __try_again;
break; break;
case SSL_ERROR_SSL: case SSL_ERROR_SSL:
fprintf(stderr, "SSL error occured. Connection will be closed.\n"); fprintf(stderr, "[sntllv2] (RD)SSL error occured. Connection will be closed.\n");
goto __close_conn; goto __close_conn;
break; break;
case SSL_ERROR_ZERO_RETURN: case SSL_ERROR_ZERO_RETURN:
fprintf(stderr, "SSL connection is cleary closed.\n"); fprintf(stderr, "[sntllv2] (RD)SSL connection is cleary closed.\n");
default: default:
__close_conn: __close_conn:
fprintf(stderr, "(RD)Unknown error on %s (errno = %d)\n", co->uuid, errno); fprintf(stderr, "[sntllv2] (RD)Unknown error on %s (errno = %d)\n", co->uuid, errno);
return -1; return -1;
} }
} while(SSL_pending(co->ssl) && !read_blocked); } while(SSL_pending(co->ssl) && !read_blocked);
@ -109,11 +111,11 @@ static int __conn_read(conn_t *co, void *buf, size_t buf_len)
r = select(rfd + 1, &readset, NULL, NULL, NULL); r = select(rfd + 1, &readset, NULL, NULL, NULL);
if(r < 0) { if(r < 0) {
if(errno == EINTR || errno == EAGAIN) goto __select_retry; if(errno == EINTR || errno == EAGAIN) goto __select_retry;
printf("(RD) select (%d) on %s\n", errno, co->uuid); fprintf(stderr, "[sntllv2] (RD)Select (%d) on %s\n", errno, co->uuid);
return -1; return -1;
} }
if(!r) { if(!r) {
printf("Nothing to wait for\n"); fprintf(stderr, "[sntllv2] (RD)Nothing to wait for\n");
return 0; return 0;
} }
read_blocked = 0; read_blocked = 0;
@ -130,7 +132,7 @@ static int __conn_read(conn_t *co, void *buf, size_t buf_len)
if(r && FD_ISSET(rfd, &writeset)) goto __retry; if(r && FD_ISSET(rfd, &writeset)) goto __retry;
} }
return 0; return r;
} }
static int __conn_write(conn_t *co, void *buf, size_t buf_len) static int __conn_write(conn_t *co, void *buf, size_t buf_len)
@ -154,15 +156,14 @@ static int __conn_write(conn_t *co, void *buf, size_t buf_len)
else goto __close_conn; else goto __close_conn;
break; break;
default: default:
pthread_mutex_unlock(&(co->oplock));
__close_conn: __close_conn:
if(r < 0) { if(r < 0) {
fprintf(stderr, "[sntllv2] (WR)Unknown error on %s (%d)\n", co->uuid, r); fprintf(stderr, "[sntllv2] (WR)Unknown error on %s (%d)\n", co->uuid, r);
return -1; return -1;
} else return 0; } else return r;
} }
return 0; return r;
} }
static sntllv2_bundle_t *__sntll_bundle_create(conn_t *co) static sntllv2_bundle_t *__sntll_bundle_create(conn_t *co)
@ -185,7 +186,7 @@ static sntllv2_bundle_t *__sntll_bundle_create(conn_t *co)
static void __sntll_bundle_destroy(sntllv2_bundle_t *n) static void __sntll_bundle_destroy(sntllv2_bundle_t *n)
{ {
munmap(n->buf, 65536) munmap(n->buf, 65536);
free(n); free(n);
return; return;
} }
@ -311,14 +312,14 @@ static int __connection_second_alloc(conn_t *co)
else memset(co->channels, 0, sizeof(uintptr_t)*512); else memset(co->channels, 0, sizeof(uintptr_t)*512);
/* init mutexes */ /* init mutexes */
co->idx_ch_lock = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_init(&co->idx_ch_lock, NULL);
co->idx_msg_lock = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_init(&co->idx_msg_lock, NULL);
co->write_pending_lock = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_init(&co->write_pending_lock, NULL);
co->sslinout[0] = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_init(&co->sslinout[0], NULL);
co->sslinout[1] = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_init(&co->sslinout[1], NULL);
/* init list */ /* init list */
list_head_init(&co->write_pending); list_init_head(&co->write_pending);
return SNE_SUCCESS; return SNE_SUCCESS;
@ -334,11 +335,11 @@ static void __connection_second_free(conn_t *co)
idx_allocator_destroy(&co->idx_msg); idx_allocator_destroy(&co->idx_msg);
idx_allocator_destroy(&co->idx_ch); idx_allocator_destroy(&co->idx_ch);
pthread_mutex_destroy(co->idx_ch_lock); pthread_mutex_destroy(&co->idx_ch_lock);
pthread_mutex_destroy(co->idx_msg_lock); pthread_mutex_destroy(&co->idx_msg_lock);
pthread_mutex_destroy(co->write_pending_lock); pthread_mutex_destroy(&co->write_pending_lock);
pthread_mutex_destroy(co->sslinout[0]); pthread_mutex_destroy(&co->sslinout[0]);
pthread_mutex_destroy(co->sslinout[1]); pthread_mutex_destroy(&co->sslinout[1]);
return; return;
} }
@ -382,7 +383,7 @@ static int __eval_syssexp(conn_t *co, sexp_t *sx)
static void *__sntll_thread(void *b) static void *__sntll_thread(void *b)
{ {
sntllv2_bundle_t *bun = (sntllv2_bundle_t *)b; sntllv2_bundle_t *bun = (sntllv2_bundle_t *)b;
conn_t *co = bun->co; conn_t *co = bun->conn;
void *buf = bun->buf; void *buf = bun->buf;
char *bbuf = (char*)buf; char *bbuf = (char*)buf;
sntllv2_head_t *mhead = (sntllv2_head_t *)buf; sntllv2_head_t *mhead = (sntllv2_head_t *)buf;
@ -390,6 +391,7 @@ static void *__sntll_thread(void *b)
chnl_t *channel; chnl_t *channel;
pthread_t self = pthread_self(); pthread_t self = pthread_self();
int dispatch = 0; int dispatch = 0;
size_t rd, wr;
/* byte buffer is following head */ /* byte buffer is following head */
bbuf += sizeof(sntllv2_head_t); bbuf += sizeof(sntllv2_head_t);
@ -481,7 +483,7 @@ static void *__sntll_thread(void *b)
pthread_mutex_unlock(&msg->wait); pthread_mutex_unlock(&msg->wait);
} }
} else if(mhead->attr & SXMSG_LINK) { /* link layer messages */ } else if(mhead->attr & SXMSG_LINK) { /* link layer messages */
if(mhead->attr & SXMSG_CLOSE) goto __finish; /* close the link */ if(mhead->attr & SXMSG_CLOSED) goto __finish; /* close the link */
if(mhead->attr & SXMSG_PULSE) { /* it's a link pulse messages */ if(mhead->attr & SXMSG_PULSE) { /* it's a link pulse messages */
/* TODO: syncronization and so on */ /* TODO: syncronization and so on */
} }
@ -494,7 +496,7 @@ static void *__sntll_thread(void *b)
mhead->payload_length = 0; mhead->payload_length = 0;
mhead->attr &= ~SXMSG_REPLYREQ; mhead->attr &= ~SXMSG_REPLYREQ;
mhead->attr &= ~SXMSG_OPEN; mhead->attr &= ~SXMSG_OPEN;
mhead->attr |= SXMSG_CLOSE; mhead->attr |= SXMSG_CLOSED;
pthread_mutex_lock(&(co->sslinout[1])); pthread_mutex_lock(&(co->sslinout[1]));
wr = __conn_write(co, mhead, sizeof(sntllv2_head_t)); wr = __conn_write(co, mhead, sizeof(sntllv2_head_t));
pthread_mutex_unlock(&(co->sslinout[1])); pthread_mutex_unlock(&(co->sslinout[1]));
@ -510,7 +512,7 @@ static void *__sntll_thread(void *b)
mhead->opcode = SNE_ENOMEM; goto __ret_regerr; mhead->opcode = SNE_ENOMEM; goto __ret_regerr;
} else { } else {
/* set mutex and channel */ /* set mutex and channel */
msg->wait = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_init(&msg->wait, NULL);
msg->pch = channel; msg->pch = channel;
/* copy header only */ /* copy header only */
memcpy(&msg->mhead, mhead, sizeof(sntllv2_head_t)); memcpy(&msg->mhead, mhead, sizeof(sntllv2_head_t));
@ -524,7 +526,7 @@ static void *__sntll_thread(void *b)
/* now we are able to process the message */ /* now we are able to process the message */
__message_process(msg); __message_process(msg);
} else if(mhead->attr & SXMSG_CLOSE) { } else if(mhead->attr & SXMSG_CLOSED) {
/* check for the message */ /* check for the message */
if(mhead->msgid >= 1024) goto __inval_idx_nor; if(mhead->msgid >= 1024) goto __inval_idx_nor;
msg = co->messages[mhead->msgid]; msg = co->messages[mhead->msgid];
@ -548,7 +550,7 @@ static void *__sntll_thread(void *b)
} }
pthread_mutex_unlock(&msg->wait); /* wake up thread waiting for */ pthread_mutex_unlock(&msg->wait); /* wake up thread waiting for */
} }
} else if((!(mhead->attr & SXMSG_CLOSE) && !(mhead->attr & SXMSG_OPEN)) && } else if((!(mhead->attr & SXMSG_CLOSED) && !(mhead->attr & SXMSG_OPEN)) &&
(mhead->attr & SXMSG_REPLYREQ)) { /* ongoing dialog */ (mhead->attr & SXMSG_REPLYREQ)) { /* ongoing dialog */
/* check for the message */ /* check for the message */
if(mhead->msgid >= 1024) goto __inval_idx_nor; if(mhead->msgid >= 1024) goto __inval_idx_nor;
@ -580,7 +582,7 @@ static void *__sntll_thread(void *b)
mhead->payload_length = 0; mhead->payload_length = 0;
mhead->attr &= ~SXMSG_REPLYREQ; mhead->attr &= ~SXMSG_REPLYREQ;
mhead->attr &= ~SXMSG_OPEN; mhead->attr &= ~SXMSG_OPEN;
mhead->attr |= SXMSG_CLOSE; mhead->attr |= SXMSG_CLOSED;
pthread_mutex_lock(&(co->sslinout[1])); pthread_mutex_lock(&(co->sslinout[1]));
wr = __conn_write(co, mhead, sizeof(sntllv2_head_t)); wr = __conn_write(co, mhead, sizeof(sntllv2_head_t));
pthread_mutex_unlock(&(co->sslinout[1])); pthread_mutex_unlock(&(co->sslinout[1]));
@ -588,9 +590,8 @@ static void *__sntll_thread(void *b)
} }
} }
pthread_mutex_unlock(&msg->wait); /* wake up thread waiting for */ pthread_mutex_unlock(&msg->wait); /* wake up thread waiting for */
} else }
{ mhead->opcode = SNE_BADPROTO; goto __ret_regerr; } } else { mhead->opcode = SNE_BADPROTO; goto __ret_regerr; }
}
} }
} }
} }
@ -600,20 +601,21 @@ static void *__sntll_thread(void *b)
return NULL; return NULL;
} }
/* FIXME: AWARE coc and co - fix it */ conn_t *connection_master_link(conn_sys_t *ssys, int sck, struct in_addr *addr)
int connection_create_fapi_m(conn_sys_t *ssys, conn_t *co, int sck,
struct in_addr *addr)
{ {
void *buf = NULL; void *buf = NULL;
char *bbuf; char *bbuf;
conn_t *coc = __connection_minimal_alloc(addr); conn_t *co = __connection_minimal_alloc(addr);
sxmsg_t *msg = NULL; sxmsg_t *msg = NULL;
sntllv2_head_t *head; sntllv2_head_t *head;
sntllv2_bundle_t *bundle; sntllv2_bundle_t *bundle;
size_t rd; size_t rd;
int r = SNE_FAILED; int r = SNE_FAILED;
if(!coc) return SNE_ENOMEM; if(!co) {
errno = SNE_ENOMEM;
return NULL;
}
/* ok, now we need to init ssl stuff */ /* ok, now we need to init ssl stuff */
co->ssys = ssys; co->ssys = ssys;
@ -671,7 +673,7 @@ int connection_create_fapi_m(conn_sys_t *ssys, conn_t *co, int sck,
if(!(msg = malloc(sizeof(sxmsg_t)))) { r = SNE_ENOMEM; goto __fail2; } if(!(msg = malloc(sizeof(sxmsg_t)))) { r = SNE_ENOMEM; goto __fail2; }
else { else {
memset(msg, 0, sizeof(sxmsg_t)); memset(msg, 0, sizeof(sxmsg_t));
coc->messages[0] = msg; co->messages[0] = msg;
} }
bbuf = (char *)buf; bbuf = (char *)buf;
bbuf += sizeof(sntllv2_head_t); bbuf += sizeof(sntllv2_head_t);
@ -699,7 +701,7 @@ int connection_create_fapi_m(conn_sys_t *ssys, conn_t *co, int sck,
/* initialize message */ /* initialize message */
msg->payload = bbuf; msg->payload = bbuf;
msg->payload_length = 0; msg->mhead.payload_length = 0;
/* deal with it */ /* deal with it */
r = __eval_syssexp(co, sx); r = __eval_syssexp(co, sx);
head->opcode = r; head->opcode = r;
@ -709,8 +711,8 @@ int connection_create_fapi_m(conn_sys_t *ssys, conn_t *co, int sck,
destroy_sexp(sx); destroy_sexp(sx);
goto __fail3; goto __fail3;
} }
rd = __conn_write(co, buf, sizeof(sntllv2_head_t) + msg->payload_length); rd = __conn_write(co, buf, sizeof(sntllv2_head_t) + msg->mhead.payload_length);
if(rd != sizeof(sntllv2_head_t) + msg->payload_length) { if(rd != sizeof(sntllv2_head_t) + msg->mhead.payload_length) {
destroy_sexp(sx); destroy_sexp(sx);
goto __fail3; goto __fail3;
} }
@ -729,18 +731,22 @@ int connection_create_fapi_m(conn_sys_t *ssys, conn_t *co, int sck,
bundle->buf = buf; bundle->buf = buf;
bundle->conn = co; bundle->conn = co;
} }
int i;
for(i = 0; i < 8; i++) { for(i = 0; i < 8; i++) {
if(bundle == 0xdead) bundle = __sntll_bundle_create(co); if(bundle == (void *)0xdead) bundle = __sntll_bundle_create(co);
if(!bundle) goto __fail5; if(!bundle) goto __fail5;
r = pthread_create(&thrd_poll[i], NULL, __sntll_thread, bundle); r = pthread_create(&co->thrd_poll[i], NULL, __sntll_thread, bundle);
if(r) goto __fail5; if(r) goto __fail5;
else bundle = 0xdead; else bundle = (void *)0xdead;
} }
/* all is done, connection now ready */ /* all is done, connection now ready */
co->flags |= SNSX_ALIVE; co->flags |= SNSX_ALIVE;
return SNE_SUCCESS; r = SNE_SUCCESS;
errno = r;
return co;
__fail5: __fail5:
r = SNE_ENOMEM; r = SNE_ENOMEM;
@ -754,20 +760,22 @@ int connection_create_fapi_m(conn_sys_t *ssys, conn_t *co, int sck,
if(buf != MAP_FAILED) munmap(buf, 65536); if(buf != MAP_FAILED) munmap(buf, 65536);
SSL_shutdown(co->ssl); SSL_shutdown(co->ssl);
__fail: __fail:
if(coc) { if(co) {
if(co->ssl) SSL_free(co->ssl); if(co->ssl) SSL_free(co->ssl);
if(co->ctx) SSL_CTX_free(co->ctx); if(co->ctx) SSL_CTX_free(co->ctx);
__connection_minimal_free(coc); __connection_minimal_free(co);
} }
close(sck); close(sck);
return r; errno = r;
return NULL;
} }
conn_t *connection_link(conn_sys_t *ssys, const char *host, conn_t *connection_link(conn_sys_t *ssys, const char *host,
int port, const char *SSL_cert, const char *login, int port, const char *SSL_cert, const char *login,
const char *passwd) const char *passwd)
{ {
conn_t *co = __connection_minimal_alloc(addr); conn_t *co = __connection_minimal_alloc(NULL);
struct hostent *host_; struct hostent *host_;
struct sockaddr_in addr; struct sockaddr_in addr;
int r = SNE_SUCCESS, sck; int r = SNE_SUCCESS, sck;
@ -779,7 +787,9 @@ conn_t *connection_link(conn_sys_t *ssys, const char *host,
char *bbuf; char *bbuf;
sntllv2_head_t *head; sntllv2_head_t *head;
sntllv2_bundle_t *bundle; sntllv2_bundle_t *bundle;
size_t rd; sxmsg_t *msg;
size_t rd, wr;
int i;
r = SNE_IGNORED; r = SNE_IGNORED;
if(!host || !SSL_cert) goto __fail; if(!host || !SSL_cert) goto __fail;
@ -928,11 +938,11 @@ conn_t *connection_link(conn_sys_t *ssys, const char *host,
bundle->conn = co; bundle->conn = co;
} }
for(i = 0; i < 8; i++) { for(i = 0; i < 8; i++) {
if(bundle == 0xdead) bundle = __sntll_bundle_create(co); if(bundle == (void *)0xdead) bundle = __sntll_bundle_create(co);
if(!bundle) goto __fail5; if(!bundle) goto __fail5;
r = pthread_create(&thrd_poll[i], NULL, __sntll_thread, bundle); r = pthread_create(&co->thrd_poll[i], NULL, __sntll_thread, bundle);
if(r) goto __fail5; if(r) goto __fail5;
else bundle = 0xdead; else bundle = (void *)0xdead;
} }
/* all is done, connection now ready */ /* all is done, connection now ready */
@ -940,6 +950,13 @@ conn_t *connection_link(conn_sys_t *ssys, const char *host,
return co; return co;
__fail5:
r = SNE_ENOMEM;
/* bundles will be freed by the threads when SSL_read will fails. */
__fail4:
__connection_second_free(co);
__fail3:
if(ssys->on_destroy) ssys->on_destroy(co);
__fail2: __fail2:
if(buf != MAP_FAILED) munmap(buf, 65536); if(buf != MAP_FAILED) munmap(buf, 65536);
SSL_shutdown(co->ssl); SSL_shutdown(co->ssl);

Loading…
Cancel
Save