more commits;
This commit is contained in:
parent
5fd1ecc796
commit
bbc0516191
@ -33,17 +33,9 @@
|
|||||||
#include <sexpr/sexp.h>
|
#include <sexpr/sexp.h>
|
||||||
#include <sexpr/faststack.h>
|
#include <sexpr/faststack.h>
|
||||||
|
|
||||||
|
#include <sntl/errno.h>
|
||||||
#include <sntl/pth_queue.h>
|
#include <sntl/pth_queue.h>
|
||||||
|
|
||||||
/* TODO: remove to the special separate file - error codes */
|
|
||||||
#define ESXOREPLYREQ 44 /* protocol require reply with expression,
|
|
||||||
* or expression return for the request */
|
|
||||||
#define ESXOTIMEDOUT 45 /* timedout */
|
|
||||||
#define ESXRCBADPROT 46 /* invalid protocol */
|
|
||||||
#define ESXNOCONNECT 47 /* connection is lost */
|
|
||||||
#define ESXNOCHANSUP 48
|
|
||||||
#define ESXRAPIDREPLY 49
|
|
||||||
|
|
||||||
#define VERIFY_DEPTH 1 /* FIXME: */
|
#define VERIFY_DEPTH 1 /* FIXME: */
|
||||||
|
|
||||||
#define MAX_CONNECTIONS 32768
|
#define MAX_CONNECTIONS 32768
|
||||||
@ -88,9 +80,9 @@ typedef struct __connection_t {
|
|||||||
struct __connections_subsys_type *ssys; /* < connections subsystem */
|
struct __connections_subsys_type *ssys; /* < connections subsystem */
|
||||||
char *uuid; /** < uuid of the connection */
|
char *uuid; /** < uuid of the connection */
|
||||||
/* Channels section */
|
/* Channels section */
|
||||||
idx_allocator_t *idx_ch; /** < index allocation for channels */
|
idx_allocator_t idx_ch; /** < index allocation for channels */
|
||||||
pthread_mutex_t idx_ch_lock; /** < mutex for allocating and deallocating channels */
|
pthread_mutex_t idx_ch_lock; /** < mutex for allocating and deallocating channels */
|
||||||
struct __channel_t **channels; /** < channels O(1) storage */
|
volatile struct __channel_t **channels; /** < channels O(1) storage */
|
||||||
/* RPC section */
|
/* RPC section */
|
||||||
usrtc_t *rpc_list; /** < search tree of possible RPC typed lists */
|
usrtc_t *rpc_list; /** < search tree of possible RPC typed lists */
|
||||||
/* SSL related section */
|
/* SSL related section */
|
||||||
@ -101,15 +93,15 @@ typedef struct __connection_t {
|
|||||||
/* Security section */
|
/* Security section */
|
||||||
perm_ctx_t *pctx; /** < higher layer authentification context */
|
perm_ctx_t *pctx; /** < higher layer authentification context */
|
||||||
/* Messages section */
|
/* Messages section */
|
||||||
struct __message_t **messages; /** < messages O(1) storage */
|
struct __message_t** volatile messages; /** < messages O(1) storage */
|
||||||
idx_allocator_t *idx_msg;
|
idx_allocator_t idx_msg;
|
||||||
pthread_mutex_t idx_msg_lock;
|
pthread_mutex_t idx_msg_lock;
|
||||||
list_head_t write_pending; /** < list of messages waiting for write */
|
list_head_t write_pending; /** < list of messages waiting for write */
|
||||||
pthread_mutex_t write_pending_lock;
|
pthread_mutex_t write_pending_lock;
|
||||||
uint8_t unused_messages; /** < unused message count */
|
volatile uint8_t unused_messages; /** < unused message count */
|
||||||
/* Other stuff */
|
/* Other stuff */
|
||||||
pthread_t thrd_poll[8];
|
pthread_t thrd_poll[8];
|
||||||
uint8_t flags; /** < flags of the connection */
|
volatile uint8_t flags; /** < flags of the connection */
|
||||||
usrtc_node_t csnode; /** < node to store the connection within list */
|
usrtc_node_t csnode; /** < node to store the connection within list */
|
||||||
} conn_t;
|
} conn_t;
|
||||||
|
|
||||||
@ -155,11 +147,8 @@ typedef struct __sexp_payload_t {
|
|||||||
typedef struct __message_t {
|
typedef struct __message_t {
|
||||||
chnl_t *pch; /** < channel of the message(if applicable) */
|
chnl_t *pch; /** < channel of the message(if applicable) */
|
||||||
pthread_mutex_t wait; /** < special wait mutex, used for pending list and sync */
|
pthread_mutex_t wait; /** < special wait mutex, used for pending list and sync */
|
||||||
|
sntllv2_head_t mhead;
|
||||||
void *payload; /** < payload */
|
void *payload; /** < payload */
|
||||||
uint16_t payload_length; /** < payload length */
|
|
||||||
uint8_t opcode; /** < opcode for system and pulse messages */
|
|
||||||
uint16_t flags; /** < flags of the message (type, state etc ...)*/
|
|
||||||
uint16_t idx;
|
|
||||||
} sxmsg_t;
|
} sxmsg_t;
|
||||||
|
|
||||||
typedef struct __connection_rpc_entry_type {
|
typedef struct __connection_rpc_entry_type {
|
||||||
|
@ -32,6 +32,12 @@
|
|||||||
#define SNE_IGNORED 213
|
#define SNE_IGNORED 213
|
||||||
#define SNE_REPLYREQ 214
|
#define SNE_REPLYREQ 214
|
||||||
#define SNE_RAPIDMSG 215
|
#define SNE_RAPIDMSG 215
|
||||||
|
#define SNE_ESSL 216
|
||||||
|
#define SNE_NOCHANNELS 217
|
||||||
|
#define SNE_MCHANNELS 218
|
||||||
|
#define SNE_MMESSAGES 219
|
||||||
|
#define SNE_LINKBROKEN 220
|
||||||
|
#define SNE_INVALINDEX 221
|
||||||
|
|
||||||
/* some old errors for compatibility */
|
/* some old errors for compatibility */
|
||||||
#define ESXOREPLYREQ SNE_REPLYREQ /* protocol require reply with expression,
|
#define ESXOREPLYREQ SNE_REPLYREQ /* protocol require reply with expression,
|
||||||
|
@ -126,6 +126,11 @@ static int __get_channels_list(void *cctx, sexp_t *sx)
|
|||||||
|
|
||||||
/* we will avoid S-exp scanning here */
|
/* we will avoid S-exp scanning here */
|
||||||
|
|
||||||
|
/* call the function */
|
||||||
|
if(ssys->get_rpc_typed_list_tree)
|
||||||
|
co->rpc_list = ssys->get_rpc_typed_list_tree(co);
|
||||||
|
if(!co->rpc_list) return SNE_EPERM;
|
||||||
|
|
||||||
buf += sizeof(sntllv2_head_t);
|
buf += sizeof(sntllv2_head_t);
|
||||||
ulen += snprintf(buf + ulen, maxlen - ulen, "(set-channels-list ");
|
ulen += snprintf(buf + ulen, maxlen - ulen, "(set-channels-list ");
|
||||||
for(node = usrtc_first(co->rpc_list); node != NULL;
|
for(node = usrtc_first(co->rpc_list); node != NULL;
|
||||||
|
335
lib/sntllv2.c
335
lib/sntllv2.c
@ -40,6 +40,36 @@
|
|||||||
|
|
||||||
#include <sntl/connection.h>
|
#include <sntl/connection.h>
|
||||||
|
|
||||||
|
typedef struct __sntll_bundle_type {
|
||||||
|
void *buf;
|
||||||
|
conn_t *conn;
|
||||||
|
} sntllv2_bundle_t;
|
||||||
|
|
||||||
|
static sntllv2_bundle_t *__sntll_bundle_create(conn_t *co)
|
||||||
|
{
|
||||||
|
sntllv2_bundle_t *n = malloc(sizeof(sntllv2_bundle_t));
|
||||||
|
|
||||||
|
if(!n) return NULL;
|
||||||
|
else memset(n, 0, sizeof(sntllv2_bundle_t));
|
||||||
|
|
||||||
|
n->buf = mmap(NULL, 65536, PROT_READ|PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
|
||||||
|
if(n->buf == MAP_FAILED) {
|
||||||
|
free(n);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
n->conn = co;
|
||||||
|
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void __sntll_bundle_destroy(sntllv2_bundle_t *n)
|
||||||
|
{
|
||||||
|
munmap(n->buf, 65536)
|
||||||
|
free(n);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
static int ex_ssldata_index; /** < index used to work with additional data
|
static int ex_ssldata_index; /** < index used to work with additional data
|
||||||
* provided to the special call during SSL handshake */
|
* provided to the special call during SSL handshake */
|
||||||
|
|
||||||
@ -150,6 +180,49 @@ conn_t *__connection_minimal_alloc(struct in_addr *addr)
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int __connection_second_alloc(conn_t *co)
|
||||||
|
{
|
||||||
|
usrtc_node_init(&co->csnode, co);
|
||||||
|
|
||||||
|
if((idx_allocator_init(&co->idx_ch, 512, 0))) goto __fail;
|
||||||
|
if((idx_allocator_init(&co->idx_msg, 1024, 0))) goto __fail;
|
||||||
|
|
||||||
|
if(!(co->channels = malloc(sizeof(uintptr_t)*512))) goto __fail;
|
||||||
|
else memset(co->channels, 0, sizeof(uintptr_t)*512);
|
||||||
|
|
||||||
|
/* init mutexes */
|
||||||
|
co->idx_ch_lock = PTHREAD_MUTEX_INITIALIZER;
|
||||||
|
co->idx_msg_lock = PTHREAD_MUTEX_INITIALIZER;
|
||||||
|
co->write_pending_lock = PTHREAD_MUTEX_INITIALIZER;
|
||||||
|
co->sslinout[0] = PTHREAD_MUTEX_INITIALIZER;
|
||||||
|
co->sslinout[1] = PTHREAD_MUTEX_INITIALIZER;
|
||||||
|
|
||||||
|
/* init list */
|
||||||
|
list_head_init(&co->write_pending);
|
||||||
|
|
||||||
|
return SNE_SUCCESS;
|
||||||
|
|
||||||
|
__fail:
|
||||||
|
idx_allocator_destroy(&co->idx_msg);
|
||||||
|
idx_allocator_destroy(&co->idx_ch);
|
||||||
|
return SNE_ENOMEM;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void __connection_second_free(conn_t *co)
|
||||||
|
{
|
||||||
|
if(co->channels) free(co->channels);
|
||||||
|
idx_allocator_destroy(&co->idx_msg);
|
||||||
|
idx_allocator_destroy(&co->idx_ch);
|
||||||
|
|
||||||
|
pthread_mutex_destroy(co->idx_ch_lock);
|
||||||
|
pthread_mutex_destroy(co->idx_msg_lock);
|
||||||
|
pthread_mutex_destroy(co->write_pending_lock);
|
||||||
|
pthread_mutex_destroy(co->sslinout[0]);
|
||||||
|
pthread_mutex_destroy(co->sslinout[1]);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
static void __connection_minimal_free(conn_t *co)
|
static void __connection_minimal_free(conn_t *co)
|
||||||
{
|
{
|
||||||
if(co) {
|
if(co) {
|
||||||
@ -186,6 +259,227 @@ static int __eval_syssexp(conn_t *co, sexp_t *sx)
|
|||||||
return rentry->rpcf((void *)co, sx);
|
return rentry->rpcf((void *)co, sx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void *__sntll_thread(void *b)
|
||||||
|
{
|
||||||
|
sntllv2_bundle_t *bun = (sntllv2_bundle_t *)b;
|
||||||
|
conn_t *co = bun->co;
|
||||||
|
void *buf = bun->buf;
|
||||||
|
char *bbuf = (char*)buf;
|
||||||
|
sntllv2_head_t *mhead = (sntllv2_head_t *)buf;
|
||||||
|
sxmsg_t *msg;
|
||||||
|
chnl_t *channel;
|
||||||
|
pthread_t self = pthread_self();
|
||||||
|
int dispatch = 0;
|
||||||
|
|
||||||
|
/* byte buffer is following head */
|
||||||
|
bbuf += sizeof(sntllv2_head_t);
|
||||||
|
|
||||||
|
__wait_alive:
|
||||||
|
/* flag test - FIXME: make it atomic (it will works atomically on x86, btw on others not) */
|
||||||
|
if(!(co->flags & SNSX_ALIVE)) {
|
||||||
|
if(co->flags & SNSX_CLOSED) goto __finish;
|
||||||
|
else {
|
||||||
|
usleep(20);
|
||||||
|
goto __wait_alive;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* check up a thread */
|
||||||
|
if(pthread_equal(self, co->thrd_poll[7])) /* dispatcher */
|
||||||
|
dispatch = 1;
|
||||||
|
|
||||||
|
/* the following logic : (except dispatcher)
|
||||||
|
* 1. check up pending write -> if exists write one and start again., otherwise go next
|
||||||
|
* 2. read from ssl connection (we will sleep if other already acquire the lock)
|
||||||
|
*/
|
||||||
|
while(1) {
|
||||||
|
__again:
|
||||||
|
pthread_mutex_lock(&(co->sslinout[0]));
|
||||||
|
rd = __conn_read(co, mhead, sizeof(sntllv2_head_t));
|
||||||
|
if(rd != sizeof(sntllv2_bundle_t)) {
|
||||||
|
__sslproto_error:
|
||||||
|
co->flags |= SNSX_CLOSED;
|
||||||
|
pthread_mutex_unlock(&(co->sslinout[0]));
|
||||||
|
goto __finish;
|
||||||
|
} else {
|
||||||
|
/* check up if we can read or not */
|
||||||
|
if(mhead->payload_length) {
|
||||||
|
rd = __conn_read(co, bbuf, mhead->payload_length);
|
||||||
|
if(rd < 0) goto __sslproto_error;
|
||||||
|
else pthread_mutex_unlock(&(co->sslinout[0]));
|
||||||
|
if(rd != mhead->payload_length) {
|
||||||
|
/* if we're need to do something */
|
||||||
|
if(mhead->msgid >= 1024) {
|
||||||
|
mhead->opcode = SNE_INVALINDEX;
|
||||||
|
goto __return_error;
|
||||||
|
} else msg = co->messages[mhead->msgid];
|
||||||
|
if(!msg) {
|
||||||
|
if(mhead->attr & SXMSG_OPEN) mhead->opcode = SNE_BADPROTO;
|
||||||
|
else {
|
||||||
|
if((mhead->attr & SXMSG_PROTO) || (mhead->attr & SXMSG_LINK))
|
||||||
|
mhead->opcode = SNE_BADPROTO;
|
||||||
|
else mhead->opcode = SNE_NOSUCHMSG;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
__return_error:
|
||||||
|
mhead->attr |= SXMSG_CLOSED;
|
||||||
|
mhead->payload_length = 0;
|
||||||
|
pthread_mutex_lock(&(co->sslinout[1]));
|
||||||
|
wr = __conn_write(co, mhead, sizeof(sntllv2_head_t));
|
||||||
|
pthread_mutex_unlock(&(co->sslinout[1]));
|
||||||
|
if(wr < 0) goto __finish;
|
||||||
|
else goto __again;
|
||||||
|
}
|
||||||
|
} else pthread_mutex_unlock(&(co->sslinout[0]));
|
||||||
|
/* take a message */
|
||||||
|
if(mhead->attr & SXMSG_PROTO) { /* protocol message i.e. channel open/close */
|
||||||
|
/* ok, check up the side */
|
||||||
|
if(mhead->attr & SXMSG_REPLYREQ) { /* means we're not initiators and we don't need to allocate a message */
|
||||||
|
if(mhead->attr & SXMSG_OPEN)
|
||||||
|
mhead->opcode = _channel_open(co, &mhead->reserve);
|
||||||
|
else mhead->opcode = _channel_close(co, mhead->reserve);
|
||||||
|
|
||||||
|
/* set flags */
|
||||||
|
mhead->payload_length = 0;
|
||||||
|
mhead->attr &= ~SXMSG_REPLYREQ;
|
||||||
|
pthread_mutex_lock(&(co->sslinout[1]));
|
||||||
|
wr = __conn_write(co, mhead, sizeof(sntllv2_head_t));
|
||||||
|
pthread_mutex_unlock(&(co->sslinout[1]));
|
||||||
|
if(wr < 0) goto __finish;
|
||||||
|
} else { /* it's came back */
|
||||||
|
/* reply came ... */
|
||||||
|
if(mhead->msgid >= 1024) {
|
||||||
|
__inval_idx_nor:
|
||||||
|
fprintf(stderr, "[sntllv2] Invalid index of the message.\n");
|
||||||
|
goto __again;
|
||||||
|
}
|
||||||
|
msg = co->messages[mhead->msgid];
|
||||||
|
if(!msg) goto __inval_idx_nor;
|
||||||
|
|
||||||
|
/* ok now we'are copy data and unlock wait mutex */
|
||||||
|
memcpy(&msg->mhead, mhead, sizeof(sntllv2_head_t));
|
||||||
|
pthread_mutex_unlock(&msg->wait);
|
||||||
|
}
|
||||||
|
} else if(mhead->attr & SXMSG_LINK) { /* link layer messages */
|
||||||
|
if(mhead->attr & SXMSG_CLOSE) goto __finish; /* close the link */
|
||||||
|
if(mhead->attr & SXMSG_PULSE) { /* it's a link pulse messages */
|
||||||
|
/* TODO: syncronization and so on */
|
||||||
|
}
|
||||||
|
} else { /* regular messages */
|
||||||
|
if((mhead->attr & SXMSG_OPEN) && (mhead->attr & SXMSG_REPLYREQ)) { /* dialog initiation */
|
||||||
|
channel = co->channels[mhead->reserve];
|
||||||
|
if(!channel) { /* ok, we'are failed */
|
||||||
|
mhead->opcode = SNE_NOSUCHCHAN;
|
||||||
|
__ret_regerr:
|
||||||
|
mhead->payload_length = 0;
|
||||||
|
mhead->attr &= ~SXMSG_REPLYREQ;
|
||||||
|
mhead->attr &= ~SXMSG_OPEN;
|
||||||
|
mhead->attr |= SXMSG_CLOSE;
|
||||||
|
pthread_mutex_lock(&(co->sslinout[1]));
|
||||||
|
wr = __conn_write(co, mhead, sizeof(sntllv2_head_t));
|
||||||
|
pthread_mutex_unlock(&(co->sslinout[1]));
|
||||||
|
if(wr < 0) goto __finish;
|
||||||
|
else goto __again;
|
||||||
|
}
|
||||||
|
/* if message is busy - fails */
|
||||||
|
msg = co->messages[mhead->msgid];
|
||||||
|
if(msg) { mhead->opcode = SNE_EBUSY; goto __ret_regerr; }
|
||||||
|
|
||||||
|
/* now we will take a deal */
|
||||||
|
if(!(msg = malloc(sizeof(sxmsg_t)))) {
|
||||||
|
mhead->opcode = SNE_ENOMEM; goto __ret_regerr;
|
||||||
|
} else {
|
||||||
|
/* set mutex and channel */
|
||||||
|
msg->wait = PTHREAD_MUTEX_INITIALIZER;
|
||||||
|
msg->pch = channel;
|
||||||
|
/* copy header only */
|
||||||
|
memcpy(&msg->mhead, mhead, sizeof(sntllv2_head_t));
|
||||||
|
msg->payload = bbuf;
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_mutex_lock(&co->idx_ch_lock);
|
||||||
|
idx_reserve(&co->idx_ch, mhead->msgid);
|
||||||
|
co->messages[mhead->msgid] = msg;
|
||||||
|
pthread_mutex_unlock(&co->idx_ch_lock);
|
||||||
|
|
||||||
|
/* now we are able to process the message */
|
||||||
|
__message_process(msg);
|
||||||
|
} else if(mhead->attr & SXMSG_CLOSE) {
|
||||||
|
/* check for the message */
|
||||||
|
if(mhead->msgid >= 1024) goto __inval_idx_nor;
|
||||||
|
msg = co->messages[mhead->msgid];
|
||||||
|
if(!msg) goto __inval_idx_nor;
|
||||||
|
|
||||||
|
if(msg->mhead.attr & SXMSG_TIMEDOUT) { /* nobody wait for it */
|
||||||
|
pthread_mutex_lock(&co->idx_ch_lock);
|
||||||
|
idx_free(&co->idx_ch, mhead->msgid);
|
||||||
|
co->messages[mhead->msgid] = NULL;
|
||||||
|
pthread_mutex_unlock(&co->idx_ch_lock);
|
||||||
|
|
||||||
|
/* now just free it */
|
||||||
|
pthread_mutex_destroy(&msg->wait);
|
||||||
|
free(msg);
|
||||||
|
} else {
|
||||||
|
memcpy(&msg->mhead, mhead, sizeof(sntllv2_head_t));
|
||||||
|
if(mhead->payload_length) {
|
||||||
|
msg->payload = malloc(mhead->payload_length);
|
||||||
|
if(msg->payload) memcpy(msg->payload, bbuf, mhead->payload_length);
|
||||||
|
else msg->mhead.opcode = SNE_ENOMEM;
|
||||||
|
}
|
||||||
|
pthread_mutex_unlock(&msg->wait); /* wake up thread waiting for */
|
||||||
|
}
|
||||||
|
} else if((!(mhead->attr & SXMSG_CLOSE) && !(mhead->attr & SXMSG_OPEN)) &&
|
||||||
|
(mhead->attr & SXMSG_REPLYREQ)) { /* ongoing dialog */
|
||||||
|
/* check for the message */
|
||||||
|
if(mhead->msgid >= 1024) goto __inval_idx_nor;
|
||||||
|
msg = co->messages[mhead->msgid];
|
||||||
|
if(!msg) goto __inval_idx_nor;
|
||||||
|
|
||||||
|
if(msg->mhead.attr & SXMSG_TIMEDOUT) { /* nobody wait for it */
|
||||||
|
pthread_mutex_lock(&co->idx_ch_lock);
|
||||||
|
idx_free(&co->idx_ch, mhead->msgid);
|
||||||
|
co->messages[mhead->msgid] = NULL;
|
||||||
|
pthread_mutex_unlock(&co->idx_ch_lock);
|
||||||
|
|
||||||
|
/* now just free it */
|
||||||
|
pthread_mutex_destroy(&msg->wait);
|
||||||
|
free(msg);
|
||||||
|
|
||||||
|
/* we must reply */
|
||||||
|
mhead->opcode = SNE_ETIMEDOUT;
|
||||||
|
goto __ret_regerr;
|
||||||
|
} else {
|
||||||
|
memcpy(&msg->mhead, mhead, sizeof(sntllv2_head_t));
|
||||||
|
if(mhead->payload_length) {
|
||||||
|
msg->payload = malloc(mhead->payload_length);
|
||||||
|
if(msg->payload) memcpy(msg->payload, bbuf, mhead->payload_length);
|
||||||
|
else {
|
||||||
|
mhead->opcode = msg->mhead.opcode = SNE_ENOMEM; /* we will return it to waitee */
|
||||||
|
msg->mhead.attr &= ~SXMSG_REPLYREQ; /* doesn't need to reply */
|
||||||
|
/* reply here now */
|
||||||
|
mhead->payload_length = 0;
|
||||||
|
mhead->attr &= ~SXMSG_REPLYREQ;
|
||||||
|
mhead->attr &= ~SXMSG_OPEN;
|
||||||
|
mhead->attr |= SXMSG_CLOSE;
|
||||||
|
pthread_mutex_lock(&(co->sslinout[1]));
|
||||||
|
wr = __conn_write(co, mhead, sizeof(sntllv2_head_t));
|
||||||
|
pthread_mutex_unlock(&(co->sslinout[1]));
|
||||||
|
if(wr < 0) goto __finish;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pthread_mutex_unlock(&msg->wait); /* wake up thread waiting for */
|
||||||
|
} else
|
||||||
|
{ mhead->opcode = SNE_BADPROTO; goto __ret_regerr; }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
__finish:
|
||||||
|
__sntll_bundle_destroy(b); /* destroy bundle */
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
int connection_create_fapi_m(conn_sys_t *ssys, conn_t *co, int sck,
|
int connection_create_fapi_m(conn_sys_t *ssys, conn_t *co, int sck,
|
||||||
struct in_addr *addr)
|
struct in_addr *addr)
|
||||||
{
|
{
|
||||||
@ -194,7 +488,9 @@ int connection_create_fapi_m(conn_sys_t *ssys, conn_t *co, int sck,
|
|||||||
conn_t *coc = __connection_minimal_alloc(addr);
|
conn_t *coc = __connection_minimal_alloc(addr);
|
||||||
sx_msg_t *msg = NULL;
|
sx_msg_t *msg = NULL;
|
||||||
sntllv2_head_t *head;
|
sntllv2_head_t *head;
|
||||||
|
sntllv2_bundle_t *bundle;
|
||||||
size_t rd;
|
size_t rd;
|
||||||
|
int r = SNE_FAILED;
|
||||||
|
|
||||||
if(!coc) return SNE_ENOMEM;
|
if(!coc) return SNE_ENOMEM;
|
||||||
|
|
||||||
@ -203,7 +499,7 @@ int connection_create_fapi_m(conn_sys_t *ssys, conn_t *co, int sck,
|
|||||||
|
|
||||||
/* init SSL certificates and context */
|
/* init SSL certificates and context */
|
||||||
co->ctx = SSL_CTX_new(TLSv1_2_server_method());
|
co->ctx = SSL_CTX_new(TLSv1_2_server_method());
|
||||||
if(!co->ctx) { goto __fail; }
|
if(!co->ctx) { r = SNE_ENOMEM; goto __fail; }
|
||||||
else {
|
else {
|
||||||
/* set verify context */
|
/* set verify context */
|
||||||
SSL_CTX_set_verify(co->ctx, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT,
|
SSL_CTX_set_verify(co->ctx, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT,
|
||||||
@ -218,37 +514,40 @@ int connection_create_fapi_m(conn_sys_t *ssys, conn_t *co, int sck,
|
|||||||
if(SSL_CTX_use_certificate_file(co->ctx, ssys->certpem,
|
if(SSL_CTX_use_certificate_file(co->ctx, ssys->certpem,
|
||||||
SSL_FILETYPE_PEM)<=0) {
|
SSL_FILETYPE_PEM)<=0) {
|
||||||
ERR_print_errors_fp(stderr);
|
ERR_print_errors_fp(stderr);
|
||||||
|
r = SNE_ESSL;
|
||||||
goto __fail;
|
goto __fail;
|
||||||
}
|
}
|
||||||
/* set the private key from KeyFile (may be the same as CertFile) */
|
/* set the private key from KeyFile (may be the same as CertFile) */
|
||||||
if(SSL_CTX_use_PrivateKey_file(co->ctx, ssys->certkey,
|
if(SSL_CTX_use_PrivateKey_file(co->ctx, ssys->certkey,
|
||||||
SSL_FILETYPE_PEM)<=0) {
|
SSL_FILETYPE_PEM)<=0) {
|
||||||
|
r = SNE_ESSL;
|
||||||
goto __fail;
|
goto __fail;
|
||||||
}
|
}
|
||||||
/* verify private key */
|
/* verify private key */
|
||||||
if (!SSL_CTX_check_private_key(co->ctx)) {
|
if (!SSL_CTX_check_private_key(co->ctx)) {
|
||||||
|
r = SNE_ESSL;
|
||||||
goto __fail;
|
goto __fail;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* now we will create an SSL connection */
|
/* now we will create an SSL connection */
|
||||||
co->ssl = SSL_new(co->ctx);
|
co->ssl = SSL_new(co->ctx);
|
||||||
if(!co->ssl) goto __fail;
|
if(!co->ssl) { r = SNE_ENOMEM; goto __fail; }
|
||||||
else SSL_set_fd(co->ssl, sck); /* attach connected socket */
|
else SSL_set_fd(co->ssl, sck); /* attach connected socket */
|
||||||
|
|
||||||
SSL_set_accept_state(co->ssl);
|
SSL_set_accept_state(co->ssl);
|
||||||
/* set the context to verify ssl connection */
|
/* set the context to verify ssl connection */
|
||||||
SSL_set_ex_data(co->ssl, ex_ssldata_index, (void *)co);
|
SSL_set_ex_data(co->ssl, ex_ssldata_index, (void *)co);
|
||||||
SSL_set_accept_state(co->ssl);
|
SSL_set_accept_state(co->ssl);
|
||||||
if(SSL_accept(co->ssl) == -1) goto __fail;
|
if(SSL_accept(co->ssl) == -1) { r = SNE_EPERM; goto __fail; }
|
||||||
|
|
||||||
/* ok, now we are able to allocate and so on */
|
/* ok, now we are able to allocate and so on */
|
||||||
/* set connection to the batch mode */
|
/* set connection to the batch mode */
|
||||||
co->flags |= SNSX_BATCHMODE;
|
co->flags |= SNSX_BATCHMODE;
|
||||||
/* allocate our first buffer */
|
/* allocate our first buffer */
|
||||||
buf = mmap(NULL, 65536, PROT_READ|PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
|
buf = mmap(NULL, 65536, PROT_READ|PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
|
||||||
if(buf == MAP_FAILED) goto __fail2;
|
if(buf == MAP_FAILED) { r = SNE_ENOMEM; goto __fail2; }
|
||||||
/* allocate first message */
|
/* allocate first message */
|
||||||
if(!(msg = malloc(sizeof(sx_msg_t)))) goto __fail2;
|
if(!(msg = malloc(sizeof(sx_msg_t)))) { r = SNE_ENOMEM; goto __fail2; }
|
||||||
else {
|
else {
|
||||||
memset(msg, 0, sizeof(sx_msg_t));
|
memset(msg, 0, sizeof(sx_msg_t));
|
||||||
coc->messages[0] = msg;
|
coc->messages[0] = msg;
|
||||||
@ -263,7 +562,7 @@ int connection_create_fapi_m(conn_sys_t *ssys, conn_t *co, int sck,
|
|||||||
head = (sntllv2_head_t *)buf;
|
head = (sntllv2_head_t *)buf;
|
||||||
|
|
||||||
/* check for returns */
|
/* check for returns */
|
||||||
if(head->opcode != SNE_SUCCESS) goto __fail3;
|
if(head->opcode != SNE_SUCCESS) { r = head->opcode; goto __fail3; }
|
||||||
else { /* opcode is fine */
|
else { /* opcode is fine */
|
||||||
/* if we're ready for messaging mode, turn off batch mode */
|
/* if we're ready for messaging mode, turn off batch mode */
|
||||||
if(co->flags & SNSX_MESSAGINGMODE) co->flags &= ~SNSX_BATCHMODE;
|
if(co->flags & SNSX_MESSAGINGMODE) co->flags &= ~SNSX_BATCHMODE;
|
||||||
@ -272,7 +571,7 @@ int connection_create_fapi_m(conn_sys_t *ssys, conn_t *co, int sck,
|
|||||||
if(!head->payload_length) continue; /* pass the following check up */
|
if(!head->payload_length) continue; /* pass the following check up */
|
||||||
|
|
||||||
rd = __conn_read(co, bbuf, head->payload_length);
|
rd = __conn_read(co, bbuf, head->payload_length);
|
||||||
if(rd != head->payload_length) goto __fail3;
|
if(rd != head->payload_length) { r = SNE_LINKERROR; goto __fail3; }
|
||||||
bbuf[rd] = '\0';
|
bbuf[rd] = '\0';
|
||||||
sx = parse_sexp(bbuf, rd);
|
sx = parse_sexp(bbuf, rd);
|
||||||
if(!sx) goto __fail3;
|
if(!sx) goto __fail3;
|
||||||
@ -296,7 +595,7 @@ int connection_create_fapi_m(conn_sys_t *ssys, conn_t *co, int sck,
|
|||||||
}
|
}
|
||||||
|
|
||||||
destroy_sexp(sx);
|
destroy_sexp(sx);
|
||||||
} else goto __fail3;
|
} else { r = SNE_LINKERROR; goto __fail3; }
|
||||||
}
|
}
|
||||||
|
|
||||||
/* if we're there - negotiation is done, going to init messaging mode */
|
/* if we're there - negotiation is done, going to init messaging mode */
|
||||||
@ -304,9 +603,27 @@ int connection_create_fapi_m(conn_sys_t *ssys, conn_t *co, int sck,
|
|||||||
if(r != SNE_SUCCESS) goto __fail3;
|
if(r != SNE_SUCCESS) goto __fail3;
|
||||||
|
|
||||||
/* and now we're need to create a thread poll */
|
/* and now we're need to create a thread poll */
|
||||||
|
if(!(bundle = malloc(sizeof(sntllv2_bundle_t)))) { r = SNE_ENOMEM; goto __fail4; }
|
||||||
|
else {
|
||||||
|
bundle->buf = buf;
|
||||||
|
bundle->conn = co;
|
||||||
|
}
|
||||||
|
for(i = 0; i < 8; i++) {
|
||||||
|
if(bundle == 0xdead) bundle = __sntll_bundle_create(co);
|
||||||
|
if(!bundle) goto __fail5;
|
||||||
|
r = pthread_create(&thrd_poll[i], NULL, __sntll_thread, bundle);
|
||||||
|
if(r) goto __fail5;
|
||||||
|
else bundle = 0xdead;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* all is done, connection now ready */
|
||||||
|
co->flags |= SNSX_ALIVE;
|
||||||
|
|
||||||
return SNE_SUCCESS;
|
return SNE_SUCCESS;
|
||||||
|
|
||||||
|
__fail5:
|
||||||
|
r = SNE_ENOMEM;
|
||||||
|
/* bundles will be freed by the threads when SSL_read will fails. */
|
||||||
__fail4:
|
__fail4:
|
||||||
__connection_second_free(co);
|
__connection_second_free(co);
|
||||||
__fail3:
|
__fail3:
|
||||||
@ -322,5 +639,5 @@ int connection_create_fapi_m(conn_sys_t *ssys, conn_t *co, int sck,
|
|||||||
__connection_minimal_free(coc);
|
__connection_minimal_free(coc);
|
||||||
}
|
}
|
||||||
close(sck);
|
close(sck);
|
||||||
return SNE_FAILED;
|
return r;
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user