few clean ups, connection.c split;
This commit is contained in:
parent
0c458e0819
commit
34a6b55f6b
@ -168,7 +168,6 @@ static int __ar_multiply(void *data, sexp_t *sx)
|
||||
|
||||
static void sigpipe_handler(int a)
|
||||
{
|
||||
//fprintf(stderr, "\n\n\n\n\nBroken pipe\n\n\n\n");
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -14,7 +14,7 @@ lib_LTLIBRARIES = libsntl.la
|
||||
|
||||
|
||||
libsntl_la_SOURCES = \
|
||||
queue.c mcache.c rpclist.c connection.c
|
||||
support.c queue.c mcache.c rpclist.c message.c connection.c
|
||||
|
||||
libsntl_la_LDFLAGS = -Wl,--export-dynamic
|
||||
|
||||
|
358
lib/connection.c
358
lib/connection.c
@ -40,8 +40,8 @@ conn_sys_t *conn_sys = NULL;
|
||||
static long __cmp_ulong(const void *a, const void *b);
|
||||
|
||||
/* message alloc and destroy */
|
||||
static sxmsg_t *__allocate_msg(int *res);
|
||||
static void __destroy_msg(sxmsg_t *msg);
|
||||
extern sxmsg_t *__allocate_msg(int *res);
|
||||
extern void __destroy_msg(sxmsg_t *msg);
|
||||
|
||||
/* examination */
|
||||
static inline int __exam_connection(conn_t *co)
|
||||
@ -1096,9 +1096,9 @@ static int __default_msg_pulse(void *cctx, sexp_t *sx)
|
||||
__destroy_msg(smsg);
|
||||
return r;
|
||||
}
|
||||
|
||||
|
||||
//msg_return(smsg, r);
|
||||
|
||||
|
||||
/* put to the IN queue */
|
||||
return r;
|
||||
}
|
||||
@ -1695,22 +1695,7 @@ static void *__msg_queue_thread(void *ctx)
|
||||
}
|
||||
|
||||
/* this function is an ugly implementation to get C string with uuid */
|
||||
static char *__generate_uuid(void)
|
||||
{
|
||||
char *uuidc = NULL;
|
||||
uuid_t uuid;
|
||||
int len, i = 0;
|
||||
|
||||
len = sizeof(char)*(sizeof(uuid_t)*2) + sizeof(char);
|
||||
if(!(uuidc = malloc(len))) return NULL;
|
||||
|
||||
uuid_generate_time_safe(uuid);
|
||||
|
||||
for(i = 0; i < sizeof(uuid_t); i++)
|
||||
snprintf(uuidc+(2*i*sizeof(char)), len, "%02x", uuid[i]);
|
||||
|
||||
return uuidc;
|
||||
}
|
||||
extern char *__generate_uuid(void);
|
||||
|
||||
/* this is a callback to perform a custom SSL certs chain validation,
|
||||
* as I promised here the comments, a lot of ...
|
||||
@ -1751,7 +1736,7 @@ static int __verify_certcall(int preverify_ok, X509_STORE_CTX *ctx)
|
||||
co->pctx = malloc(sizeof(perm_ctx_t));
|
||||
co->pctx->certid =
|
||||
ASN1_INTEGER_get((const ASN1_INTEGER *)X509_get_serialNumber(ctx->current_cert));
|
||||
printf("Certificate ID: %lu\n", co->pctx->certid);
|
||||
//printf("Certificate ID: %lu\n", co->pctx->certid);
|
||||
/* now we're need to check the ssl cert */
|
||||
if(conn_sys->validate_sslpem) {
|
||||
if(conn_sys->validate_sslpem(co)) return 0;
|
||||
@ -2157,7 +2142,7 @@ int connection_create(conn_t *co, int sck)
|
||||
|
||||
if(!co) return EINVAL;
|
||||
else memset(co, 0, sizeof(conn_t));
|
||||
|
||||
|
||||
pth_dqtpoll_init(tpoll, __rpc_callback); // TODO: check it
|
||||
|
||||
if(!idx_ch) return ENOMEM;
|
||||
@ -2287,7 +2272,7 @@ int connection_create(conn_t *co, int sck)
|
||||
if(r) goto __fail_3;
|
||||
r = pthread_create(&co->rmsgthread, NULL, __rmsg_queue_thread, (void *)co);
|
||||
if(r) goto __fail_3;
|
||||
|
||||
|
||||
pth_dqtpoll_run(tpoll);
|
||||
co->tpoll = tpoll;
|
||||
}
|
||||
@ -2315,93 +2300,9 @@ int connection_reinit(conn_t *co) /* TODO: the next version */
|
||||
return ENOSYS;
|
||||
}
|
||||
|
||||
static sxmsg_t *__allocate_msg(int *res)
|
||||
{
|
||||
sxmsg_t *msg = malloc(sizeof(sxmsg_t));
|
||||
int r = 0;
|
||||
extern int __create_reg_msg(sxmsg_t **msg, chnl_t *ch);
|
||||
|
||||
if(!msg) {
|
||||
*res = ENOMEM;
|
||||
return NULL;
|
||||
} else {
|
||||
memset(msg, 0, sizeof(sxmsg_t));
|
||||
if((r = pthread_mutex_init(&(msg->wait), NULL))) {
|
||||
free(msg);
|
||||
*res = r;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
usrtc_node_init(&(msg->chnl_node), msg);
|
||||
usrtc_node_init(&(msg->poll_node), msg);
|
||||
usrtc_node_init(&(msg->pendingq_node), msg);
|
||||
}
|
||||
|
||||
*res = 0;
|
||||
|
||||
return msg;
|
||||
}
|
||||
|
||||
static void __destroy_msg(sxmsg_t *msg)
|
||||
{
|
||||
chnl_t *ch = msg->pch;
|
||||
|
||||
if(msg->flags & ESXMSG_USR) {
|
||||
pthread_mutex_lock(&(ch->oplock));
|
||||
idx_free(ch->idx_msg, msg->mid);
|
||||
pthread_mutex_unlock(&(ch->oplock));
|
||||
} else if(msg->flags & ESXMSG_SYS) {
|
||||
//if(msg->uuid) free(msg->uuid);
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&(msg->wait));
|
||||
pthread_mutex_destroy(&(msg->wait));
|
||||
free(msg);
|
||||
return;
|
||||
}
|
||||
|
||||
static int __create_reg_msg(sxmsg_t **msg, chnl_t *ch)
|
||||
{
|
||||
int r = 0;
|
||||
sxmsg_t *sm = __allocate_msg(&r);
|
||||
|
||||
if(r) return r;
|
||||
else {
|
||||
sm->pch = ch;
|
||||
sm->flags = (ESXMSG_USR | ESXMSG_PENDING);
|
||||
|
||||
/* ok allocate message ID */
|
||||
pthread_mutex_lock(&(ch->oplock));
|
||||
sm->mid = idx_allocate(ch->idx_msg);
|
||||
pthread_mutex_unlock(&(ch->oplock));
|
||||
|
||||
pthread_mutex_lock(&(sm->wait));
|
||||
*msg = sm;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int __create_sys_msg(sxmsg_t **msg, char *uuid, chnl_t *ch, sxpayload_t *data)
|
||||
{
|
||||
int r = 0;
|
||||
sxmsg_t *m = __allocate_msg(&r);
|
||||
|
||||
if(r) return r;
|
||||
else {
|
||||
/* fill values */
|
||||
m->pch = ch;
|
||||
m->uuid = uuid;
|
||||
m->payload = data;
|
||||
/* set the right flags */
|
||||
m->flags = (ESXMSG_SYS | ESXMSG_PENDING);
|
||||
/* we need to lock the wait mutex */
|
||||
pthread_mutex_lock(&(m->wait));
|
||||
|
||||
*msg = m;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
extern int __create_sys_msg(sxmsg_t **msg, char *uuid, chnl_t *ch, sxpayload_t *data);
|
||||
|
||||
/* channels */
|
||||
int channel_open(conn_t *co, chnl_t **ch, int type)
|
||||
@ -2469,7 +2370,7 @@ int channel_open(conn_t *co, chnl_t **ch, int type)
|
||||
|
||||
/* put system message to the run queue */
|
||||
/* first form the message */
|
||||
snprintf(pl->cstr, sizeof(char)*ESX_SYSMSG_SIZE,
|
||||
snprintf(pl->cstr, sizeof(char)*ESX_SYSMSG_SIZE,
|
||||
"(ch-open ((:id %ld)(:uuid %s)(:type %d)))", nch->cid, nch->uuid, type);
|
||||
nch->sysmsg = sms; /* assign system message to the channel */
|
||||
/* put it */
|
||||
@ -2526,11 +2427,11 @@ int channel_close(chnl_t *chnl)
|
||||
conn_t *co = chnl->connection;
|
||||
sxmsg_t *sms;
|
||||
sxpayload_t *pl;
|
||||
|
||||
|
||||
if(!(co->flags & CXCONN_ESTABL)) {
|
||||
return ESXNOCONNECT;
|
||||
}
|
||||
|
||||
|
||||
uuid_ = __generate_uuid();
|
||||
|
||||
pthread_rwlock_rdlock(&(co->chnl_lock));
|
||||
@ -2613,236 +2514,3 @@ __process_smsg:
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* message passing */
|
||||
|
||||
/*
|
||||
* How message sending works:
|
||||
* 1. Create a message structure assigned to the channel,
|
||||
* 2. Put S-expression context to it
|
||||
* 3. Put the message to the queue
|
||||
* 4. expect the result waiting on the lock mutex
|
||||
*/
|
||||
static int __message_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec *tio)
|
||||
{
|
||||
int r = 0;
|
||||
sxmsg_t *m = NULL;
|
||||
conn_t *co = ch->connection;
|
||||
|
||||
if(!(co->flags & CXCONN_ESTABL)) {
|
||||
destroy_sexp(sx);
|
||||
return ESXNOCONNECT;
|
||||
}
|
||||
|
||||
*msg = NULL;
|
||||
|
||||
r = __create_reg_msg(&m, ch);
|
||||
if(r) return r;
|
||||
else {
|
||||
/* put the message to the search tree */
|
||||
pthread_rwlock_wrlock(&(ch->msglock));
|
||||
usrtc_insert(ch->msgs_tree, &(m->pendingq_node), &(m->mid));
|
||||
pthread_rwlock_unlock(&(ch->msglock));
|
||||
|
||||
/* message assign */
|
||||
m->opcode = 0;
|
||||
m->payload = (void *)sx;
|
||||
/* assign initial sx */
|
||||
m->initial_sx = sx;
|
||||
|
||||
/* put the message to the run queue */
|
||||
r = pth_queue_add(co->mqueue, (void *)m, USR_MSG);
|
||||
if(r) return r; /* FIXME: better give up */
|
||||
|
||||
if(m->flags & ESXMSG_PENDING) {
|
||||
if(!tio) pthread_mutex_lock(&(m->wait));
|
||||
else pthread_mutex_timedlock(&(m->wait), tio);
|
||||
}
|
||||
if(tio && (m->flags & ESXMSG_PENDING))
|
||||
return SXOTIMEDOUT;
|
||||
if(!m->payload) {
|
||||
r = m->opcode;
|
||||
/* first remove the message from tree */
|
||||
pthread_rwlock_wrlock(&(ch->msglock));
|
||||
usrtc_delete(ch->msgs_tree, &(m->pendingq_node));
|
||||
pthread_rwlock_unlock(&(ch->msglock));
|
||||
/* destroy s expression */
|
||||
destroy_sexp(m->initial_sx);
|
||||
/* destroy */
|
||||
__destroy_msg(m);
|
||||
} else {
|
||||
*msg = m;
|
||||
r = SXOREPLYREQ;
|
||||
}
|
||||
}
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
int msg_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg)
|
||||
{
|
||||
return __message_send(ch, sx, msg, NULL);
|
||||
}
|
||||
|
||||
int msg_send_timed(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec *tio)
|
||||
{
|
||||
return __message_send(ch, sx, msg, tio);
|
||||
}
|
||||
|
||||
static int __msg_reply(sxmsg_t *msg, sexp_t *sx, struct timespec *tio, int opcode)
|
||||
{
|
||||
int r = 0;
|
||||
chnl_t *ch = msg->pch;
|
||||
conn_t *co = ch->connection;
|
||||
|
||||
if(!(co->flags & CXCONN_ESTABL)) {
|
||||
destroy_sexp(sx);
|
||||
return ESXNOCONNECT;
|
||||
}
|
||||
|
||||
if(msg->flags & ESXMSG_ISREPLY)
|
||||
destroy_sexp((sexp_t *)msg->payload);
|
||||
|
||||
msg->payload = sx;
|
||||
msg->opcode = opcode;
|
||||
msg->flags |= ESXMSG_PENDING; /* pending */
|
||||
msg->flags |= ESXMSG_ISREPLY; /* this is a reply */
|
||||
|
||||
if(!sx) msg->flags &= ~ESXMSG_PENDING;
|
||||
else msg->flags |= ESXMSG_RMONRETR;
|
||||
|
||||
/* put the message to the queue */
|
||||
r = pth_queue_add(co->mqueue, (void *)msg, USR_MSG);
|
||||
if(r) return r; /* FIXME: better give up */
|
||||
if(!sx) return 0;
|
||||
|
||||
if(msg->flags & ESXMSG_PENDING) {
|
||||
if(!tio) pthread_mutex_lock(&(msg->wait));
|
||||
else pthread_mutex_timedlock(&(msg->wait), tio);
|
||||
}
|
||||
|
||||
if(tio && (msg->flags & ESXMSG_PENDING))
|
||||
return SXOTIMEDOUT;
|
||||
|
||||
r = msg->opcode;
|
||||
|
||||
if(msg->flags & ESXMSG_CLOSURE) {
|
||||
/* destroy */
|
||||
destroy_sexp(msg->initial_sx);
|
||||
__destroy_msg(msg);
|
||||
}
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
int msg_return(sxmsg_t *msg, int opcode)
|
||||
{
|
||||
return __msg_reply(msg, NULL, NULL, opcode);
|
||||
}
|
||||
|
||||
int msg_reply(sxmsg_t *msg, sexp_t *sx)
|
||||
{
|
||||
return __msg_reply(msg, sx, NULL, 0);
|
||||
}
|
||||
|
||||
int msg_reply_timed(sxmsg_t *msg, sexp_t *sx, struct timespec *tio)
|
||||
{
|
||||
return __msg_reply(msg, sx, tio, 0);
|
||||
}
|
||||
|
||||
//TODO: continue. Implement wait for delivery and queue addition
|
||||
/*
|
||||
* How message sending works:
|
||||
* 1. Create a message structure assigned to the channel,
|
||||
* 2. Put S-expression context to it
|
||||
* 3. Put the message to the queue
|
||||
* 4. Wait for job execution
|
||||
*/
|
||||
static int __message_send_pulse(chnl_t *ch, sexp_t *sx, struct timespec *tio)
|
||||
{
|
||||
int r = 0;
|
||||
sxmsg_t *m = NULL;
|
||||
conn_t *co = ch->connection;
|
||||
|
||||
if(!(co->flags & CXCONN_ESTABL)) {
|
||||
destroy_sexp(sx);
|
||||
return ESXNOCONNECT;
|
||||
}
|
||||
|
||||
r = __create_reg_msg(&m, ch);
|
||||
if(r) return r;
|
||||
else {
|
||||
/* put the message to the search tree */
|
||||
pthread_rwlock_wrlock(&(ch->msglock));
|
||||
usrtc_insert(ch->msgs_tree, &(m->pendingq_node), &(m->mid));
|
||||
pthread_rwlock_unlock(&(ch->msglock));
|
||||
|
||||
/* message assign */
|
||||
m->opcode = 0;
|
||||
m->payload = (void *)sx;
|
||||
/* assign initial sx */
|
||||
m->initial_sx = sx;
|
||||
m->flags |= ESXMSG_PULSE;
|
||||
|
||||
/* put the message to the run queue */
|
||||
r = pth_queue_add(co->mqueue, (void *)m, USR_MSG);
|
||||
if(r) return r; /* FIXME: better give up */
|
||||
|
||||
if(m->flags & ESXMSG_PENDING) {
|
||||
if(!tio) pthread_mutex_lock(&(m->wait));
|
||||
else pthread_mutex_timedlock(&(m->wait), tio);
|
||||
}
|
||||
if(tio && (m->flags & ESXMSG_PENDING))
|
||||
return SXOTIMEDOUT;
|
||||
if(!m->payload) {
|
||||
r = m->opcode;
|
||||
/* first remove the message from tree */
|
||||
pthread_rwlock_wrlock(&(ch->msglock));
|
||||
usrtc_delete(ch->msgs_tree, &(m->pendingq_node));
|
||||
pthread_rwlock_unlock(&(ch->msglock));
|
||||
/* destroy s expression */
|
||||
destroy_sexp(m->initial_sx);
|
||||
/* destroy */
|
||||
__destroy_msg(m);
|
||||
} else {
|
||||
r = SXOREPLYREQ;
|
||||
}
|
||||
}
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
int msg_send_pulse(chnl_t *ch, sexp_t *sx)
|
||||
{
|
||||
return __message_send_pulse(ch, sx, NULL);
|
||||
}
|
||||
|
||||
int msg_send_pulse_timed(chnl_t *ch, sexp_t *sx, struct timespec *tio)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
int msg_send_pulse_nowait(chnl_t *ch, sexp_t *sx)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* sexp helpers */
|
||||
int sexp_list_car(sexp_t *expr, sexp_t **sx)
|
||||
{
|
||||
if (!SEXP_IS_LIST(expr) || expr->list->ty != SEXP_VALUE) return 1;
|
||||
|
||||
*sx = expr->list;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int sexp_list_cdr(sexp_t *expr, sexp_t **sx)
|
||||
{
|
||||
/* Dummy function. Can we do cdr properly? */
|
||||
if (!SEXP_IS_LIST(expr) || expr->list->ty != SEXP_VALUE) return 1;
|
||||
|
||||
if (!expr->list->next) *sx = NULL;
|
||||
else *sx = expr->list->next;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
330
lib/message.c
Normal file
330
lib/message.c
Normal file
@ -0,0 +1,330 @@
|
||||
/*
|
||||
* Secure Network Transport Layer Library implementation.
|
||||
* This is a proprietary software. See COPYING for further details.
|
||||
*
|
||||
* (c) 2013-2014 Copyright Askele, inc. <http://askele.com>
|
||||
* (c) 2013-2014 Copyright Askele Ingria, inc. <http://askele-ingria.com>
|
||||
*/
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
#include <errno.h>
|
||||
#include <string.h>
|
||||
#include <pthread.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/time.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/select.h>
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
#include <netdb.h>
|
||||
|
||||
#include <uuid/uuid.h>
|
||||
#include <openssl/ssl.h>
|
||||
#include <openssl/err.h>
|
||||
|
||||
#include <tdata/usrtc.h>
|
||||
#include <sexpr/sexp.h>
|
||||
|
||||
#include <sntl/connection.h>
|
||||
|
||||
void __destroy_msg(sxmsg_t *msg)
|
||||
{
|
||||
chnl_t *ch = msg->pch;
|
||||
|
||||
if(msg->flags & ESXMSG_USR) {
|
||||
pthread_mutex_lock(&(ch->oplock));
|
||||
idx_free(ch->idx_msg, msg->mid);
|
||||
pthread_mutex_unlock(&(ch->oplock));
|
||||
} else if(msg->flags & ESXMSG_SYS) {
|
||||
//if(msg->uuid) free(msg->uuid);
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&(msg->wait));
|
||||
pthread_mutex_destroy(&(msg->wait));
|
||||
free(msg);
|
||||
return;
|
||||
}
|
||||
|
||||
sxmsg_t *__allocate_msg(int *res)
|
||||
{
|
||||
sxmsg_t *msg = malloc(sizeof(sxmsg_t));
|
||||
int r = 0;
|
||||
|
||||
if(!msg) {
|
||||
*res = ENOMEM;
|
||||
return NULL;
|
||||
} else {
|
||||
memset(msg, 0, sizeof(sxmsg_t));
|
||||
if((r = pthread_mutex_init(&(msg->wait), NULL))) {
|
||||
free(msg);
|
||||
*res = r;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
usrtc_node_init(&(msg->chnl_node), msg);
|
||||
usrtc_node_init(&(msg->poll_node), msg);
|
||||
usrtc_node_init(&(msg->pendingq_node), msg);
|
||||
}
|
||||
|
||||
*res = 0;
|
||||
|
||||
return msg;
|
||||
}
|
||||
|
||||
int __create_reg_msg(sxmsg_t **msg, chnl_t *ch)
|
||||
{
|
||||
int r = 0;
|
||||
sxmsg_t *sm = __allocate_msg(&r);
|
||||
|
||||
if(r) return r;
|
||||
else {
|
||||
sm->pch = ch;
|
||||
sm->flags = (ESXMSG_USR | ESXMSG_PENDING);
|
||||
|
||||
/* ok allocate message ID */
|
||||
pthread_mutex_lock(&(ch->oplock));
|
||||
sm->mid = idx_allocate(ch->idx_msg);
|
||||
pthread_mutex_unlock(&(ch->oplock));
|
||||
|
||||
pthread_mutex_lock(&(sm->wait));
|
||||
*msg = sm;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int __create_sys_msg(sxmsg_t **msg, char *uuid, chnl_t *ch, sxpayload_t *data)
|
||||
{
|
||||
int r = 0;
|
||||
sxmsg_t *m = __allocate_msg(&r);
|
||||
|
||||
if(r) return r;
|
||||
else {
|
||||
/* fill values */
|
||||
m->pch = ch;
|
||||
m->uuid = uuid;
|
||||
m->payload = data;
|
||||
/* set the right flags */
|
||||
m->flags = (ESXMSG_SYS | ESXMSG_PENDING);
|
||||
/* we need to lock the wait mutex */
|
||||
pthread_mutex_lock(&(m->wait));
|
||||
|
||||
*msg = m;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* message passing */
|
||||
|
||||
/*
|
||||
* How message sending works:
|
||||
* 1. Create a message structure assigned to the channel,
|
||||
* 2. Put S-expression context to it
|
||||
* 3. Put the message to the queue
|
||||
* 4. expect the result waiting on the lock mutex
|
||||
*/
|
||||
static int __message_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec *tio)
|
||||
{
|
||||
int r = 0;
|
||||
sxmsg_t *m = NULL;
|
||||
conn_t *co = ch->connection;
|
||||
|
||||
if(!(co->flags & CXCONN_ESTABL)) {
|
||||
destroy_sexp(sx);
|
||||
return ESXNOCONNECT;
|
||||
}
|
||||
|
||||
*msg = NULL;
|
||||
|
||||
r = __create_reg_msg(&m, ch);
|
||||
if(r) return r;
|
||||
else {
|
||||
/* put the message to the search tree */
|
||||
pthread_rwlock_wrlock(&(ch->msglock));
|
||||
usrtc_insert(ch->msgs_tree, &(m->pendingq_node), &(m->mid));
|
||||
pthread_rwlock_unlock(&(ch->msglock));
|
||||
|
||||
/* message assign */
|
||||
m->opcode = 0;
|
||||
m->payload = (void *)sx;
|
||||
/* assign initial sx */
|
||||
m->initial_sx = sx;
|
||||
|
||||
/* put the message to the run queue */
|
||||
r = pth_queue_add(co->mqueue, (void *)m, USR_MSG);
|
||||
if(r) return r; /* FIXME: better give up */
|
||||
|
||||
if(m->flags & ESXMSG_PENDING) {
|
||||
if(!tio) pthread_mutex_lock(&(m->wait));
|
||||
else pthread_mutex_timedlock(&(m->wait), tio);
|
||||
}
|
||||
if(tio && (m->flags & ESXMSG_PENDING))
|
||||
return SXOTIMEDOUT;
|
||||
if(!m->payload) {
|
||||
r = m->opcode;
|
||||
/* first remove the message from tree */
|
||||
pthread_rwlock_wrlock(&(ch->msglock));
|
||||
usrtc_delete(ch->msgs_tree, &(m->pendingq_node));
|
||||
pthread_rwlock_unlock(&(ch->msglock));
|
||||
/* destroy s expression */
|
||||
destroy_sexp(m->initial_sx);
|
||||
/* destroy */
|
||||
__destroy_msg(m);
|
||||
} else {
|
||||
*msg = m;
|
||||
r = SXOREPLYREQ;
|
||||
}
|
||||
}
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
int msg_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg)
|
||||
{
|
||||
return __message_send(ch, sx, msg, NULL);
|
||||
}
|
||||
|
||||
int msg_send_timed(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec *tio)
|
||||
{
|
||||
return __message_send(ch, sx, msg, tio);
|
||||
}
|
||||
|
||||
static int __msg_reply(sxmsg_t *msg, sexp_t *sx, struct timespec *tio, int opcode)
|
||||
{
|
||||
int r = 0;
|
||||
chnl_t *ch = msg->pch;
|
||||
conn_t *co = ch->connection;
|
||||
|
||||
if(!(co->flags & CXCONN_ESTABL)) {
|
||||
destroy_sexp(sx);
|
||||
return ESXNOCONNECT;
|
||||
}
|
||||
|
||||
if(msg->flags & ESXMSG_ISREPLY)
|
||||
destroy_sexp((sexp_t *)msg->payload);
|
||||
|
||||
msg->payload = sx;
|
||||
msg->opcode = opcode;
|
||||
msg->flags |= ESXMSG_PENDING; /* pending */
|
||||
msg->flags |= ESXMSG_ISREPLY; /* this is a reply */
|
||||
|
||||
if(!sx) msg->flags &= ~ESXMSG_PENDING;
|
||||
else msg->flags |= ESXMSG_RMONRETR;
|
||||
|
||||
/* put the message to the queue */
|
||||
r = pth_queue_add(co->mqueue, (void *)msg, USR_MSG);
|
||||
if(r) return r; /* FIXME: better give up */
|
||||
if(!sx) return 0;
|
||||
|
||||
if(msg->flags & ESXMSG_PENDING) {
|
||||
if(!tio) pthread_mutex_lock(&(msg->wait));
|
||||
else pthread_mutex_timedlock(&(msg->wait), tio);
|
||||
}
|
||||
|
||||
if(tio && (msg->flags & ESXMSG_PENDING))
|
||||
return SXOTIMEDOUT;
|
||||
|
||||
r = msg->opcode;
|
||||
|
||||
if(msg->flags & ESXMSG_CLOSURE) {
|
||||
/* destroy */
|
||||
destroy_sexp(msg->initial_sx);
|
||||
__destroy_msg(msg);
|
||||
}
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
int msg_return(sxmsg_t *msg, int opcode)
|
||||
{
|
||||
return __msg_reply(msg, NULL, NULL, opcode);
|
||||
}
|
||||
|
||||
int msg_reply(sxmsg_t *msg, sexp_t *sx)
|
||||
{
|
||||
return __msg_reply(msg, sx, NULL, 0);
|
||||
}
|
||||
|
||||
int msg_reply_timed(sxmsg_t *msg, sexp_t *sx, struct timespec *tio)
|
||||
{
|
||||
return __msg_reply(msg, sx, tio, 0);
|
||||
}
|
||||
|
||||
//TODO: continue. Implement wait for delivery and queue addition
|
||||
/*
|
||||
* How message sending works:
|
||||
* 1. Create a message structure assigned to the channel,
|
||||
* 2. Put S-expression context to it
|
||||
* 3. Put the message to the queue
|
||||
* 4. Wait for job execution
|
||||
*/
|
||||
static int __message_send_pulse(chnl_t *ch, sexp_t *sx, struct timespec *tio)
|
||||
{
|
||||
int r = 0;
|
||||
sxmsg_t *m = NULL;
|
||||
conn_t *co = ch->connection;
|
||||
|
||||
if(!(co->flags & CXCONN_ESTABL)) {
|
||||
destroy_sexp(sx);
|
||||
return ESXNOCONNECT;
|
||||
}
|
||||
|
||||
r = __create_reg_msg(&m, ch);
|
||||
if(r) return r;
|
||||
else {
|
||||
/* put the message to the search tree */
|
||||
pthread_rwlock_wrlock(&(ch->msglock));
|
||||
usrtc_insert(ch->msgs_tree, &(m->pendingq_node), &(m->mid));
|
||||
pthread_rwlock_unlock(&(ch->msglock));
|
||||
|
||||
/* message assign */
|
||||
m->opcode = 0;
|
||||
m->payload = (void *)sx;
|
||||
/* assign initial sx */
|
||||
m->initial_sx = sx;
|
||||
m->flags |= ESXMSG_PULSE;
|
||||
|
||||
/* put the message to the run queue */
|
||||
r = pth_queue_add(co->mqueue, (void *)m, USR_MSG);
|
||||
if(r) return r; /* FIXME: better give up */
|
||||
|
||||
if(m->flags & ESXMSG_PENDING) {
|
||||
if(!tio) pthread_mutex_lock(&(m->wait));
|
||||
else pthread_mutex_timedlock(&(m->wait), tio);
|
||||
}
|
||||
if(tio && (m->flags & ESXMSG_PENDING))
|
||||
return SXOTIMEDOUT;
|
||||
if(!m->payload) {
|
||||
r = m->opcode;
|
||||
/* first remove the message from tree */
|
||||
pthread_rwlock_wrlock(&(ch->msglock));
|
||||
usrtc_delete(ch->msgs_tree, &(m->pendingq_node));
|
||||
pthread_rwlock_unlock(&(ch->msglock));
|
||||
/* destroy s expression */
|
||||
destroy_sexp(m->initial_sx);
|
||||
/* destroy */
|
||||
__destroy_msg(m);
|
||||
} else {
|
||||
r = SXOREPLYREQ;
|
||||
}
|
||||
}
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
int msg_send_pulse(chnl_t *ch, sexp_t *sx)
|
||||
{
|
||||
return __message_send_pulse(ch, sx, NULL);
|
||||
}
|
||||
|
||||
int msg_send_pulse_timed(chnl_t *ch, sexp_t *sx, struct timespec *tio)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
int msg_send_pulse_nowait(chnl_t *ch, sexp_t *sx)
|
||||
{
|
||||
return 0;
|
||||
}
|
68
lib/support.c
Normal file
68
lib/support.c
Normal file
@ -0,0 +1,68 @@
|
||||
/*
|
||||
* Secure Network Transport Layer Library implementation.
|
||||
* This is a proprietary software. See COPYING for further details.
|
||||
*
|
||||
* (c) 2013-2014 Copyright Askele, inc. <http://askele.com>
|
||||
* (c) 2013-2014 Copyright Askele Ingria, inc. <http://askele-ingria.com>
|
||||
*/
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
#include <errno.h>
|
||||
#include <string.h>
|
||||
#include <pthread.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/time.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/select.h>
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
#include <netdb.h>
|
||||
|
||||
#include <uuid/uuid.h>
|
||||
#include <openssl/ssl.h>
|
||||
#include <openssl/err.h>
|
||||
|
||||
#include <tdata/usrtc.h>
|
||||
#include <sexpr/sexp.h>
|
||||
|
||||
#include <sntl/connection.h>
|
||||
|
||||
/* this function is an ugly implementation to get C string with uuid */
|
||||
char *__generate_uuid(void)
|
||||
{
|
||||
char *uuidc = NULL;
|
||||
uuid_t uuid;
|
||||
int len, i = 0;
|
||||
|
||||
len = sizeof(char)*(sizeof(uuid_t)*2) + sizeof(char);
|
||||
if(!(uuidc = malloc(len))) return NULL;
|
||||
|
||||
uuid_generate_time_safe(uuid);
|
||||
|
||||
for(i = 0; i < sizeof(uuid_t); i++)
|
||||
snprintf(uuidc+(2*i*sizeof(char)), len, "%02x", uuid[i]);
|
||||
|
||||
return uuidc;
|
||||
}
|
||||
|
||||
/* sexp helpers */
|
||||
int sexp_list_car(sexp_t *expr, sexp_t **sx)
|
||||
{
|
||||
if (!SEXP_IS_LIST(expr) || expr->list->ty != SEXP_VALUE) return 1;
|
||||
|
||||
*sx = expr->list;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int sexp_list_cdr(sexp_t *expr, sexp_t **sx)
|
||||
{
|
||||
/* Dummy function. Can we do cdr properly? */
|
||||
if (!SEXP_IS_LIST(expr) || expr->list->ty != SEXP_VALUE) return 1;
|
||||
|
||||
if (!expr->list->next) *sx = NULL;
|
||||
else *sx = expr->list->next;
|
||||
|
||||
return 0;
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user