diff --git a/.gitignore b/.gitignore index 1a46dbb..7abf740 100644 --- a/.gitignore +++ b/.gitignore @@ -40,3 +40,4 @@ config.sub.dh-orig lib/libsntl.pc examples/sntlc examples/sntld +examples/gentest diff --git a/BUGS b/BUGS new file mode 100644 index 0000000..e69de29 diff --git a/examples/Makefile.am b/examples/Makefile.am index a1269e5..5406081 100644 --- a/examples/Makefile.am +++ b/examples/Makefile.am @@ -13,7 +13,7 @@ AM_CFLAGS = -Wall -g libsntl = ../lib/.libs/libsntl.la -bin_PROGRAMS = sntld sntlc +bin_PROGRAMS = sntld sntlc gentest sntld_SOURCES = sntld.c sntld_LDADD = $(LIBTDATA_LIBS) $(LIBSEXPR_LIBS) $(OPENSSL_LIBS) \ @@ -21,6 +21,10 @@ sntld_LDADD = $(LIBTDATA_LIBS) $(LIBSEXPR_LIBS) $(OPENSSL_LIBS) \ sntlc_SOURCES = sntlc.c sntlc_LDADD = $(LIBTDATA_LIBS) $(LIBSEXPR_LIBS) $(OPENSSL_LIBS) \ $(LIBUUID_LIBS) $(libsntl) +gentest_SOURCES = gentest.c +gentest_LDADD = $(LIBTDATA_LIBS) $(LIBSEXPR_LIBS) $(OPENSSL_LIBS) \ + $(LIBUUID_LIBS) $(libsntl) + #zsyncd_LDFLAGS = \ # -Wl,--export-dynamic diff --git a/examples/gentest.c b/examples/gentest.c new file mode 100644 index 0000000..1e85912 --- /dev/null +++ b/examples/gentest.c @@ -0,0 +1,407 @@ +#include +#define __USE_GNU +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +/* define a little bit */ +#define DEFAULT_PORT 13133 +#define CHANNEL_COUNT 200 +#define CLIENT_COUNT 100 +#define MESSAGES_PER_SESSION 10000 +#define ITERATION_COUNT 1000 + +#define FAILS_ONLY +//#define SIMPLE_TESTING + +static FILE *log_file = NULL; + +inline int log_init(const char *file) +{ + if(log_file) { + fclose(log_file); + } + log_file = fopen(file, "w"); + if(!log_file) return EIO; + return 0; +} + +inline void log_msg(const char *prefix, const char *data) +{ + if(log_file) fprintf(log_file, "[%s]: %s\n", prefix, data); +} + +inline void log_begin(const char *data) +{ +#ifndef FAILS_ONLY + log_msg("BEGIN", data); +#endif +} + +inline void log_end(const char *data) +{ +#ifndef FAILS_ONLY + log_msg("END", data); +#endif +} + +inline void log_info(const char *data) +{ +#ifndef FAILS_ONLY + log_msg("INFO", data); +#endif +} + +inline void log_error(const char *data) +{ + log_msg("FAILED", data); +} + +inline void log_assert(const char *info, int rc, int exp) +{ + if(log_file && (rc) != (exp)) { + fprintf(log_file, "[FAILED]: %s result: %d, expected: %d\n", info, rc, exp); + } +} + +inline void log_close() +{ + if(log_file) { + fflush(log_file); + fclose(log_file); + } +} + +inline void log_flush() +{ + fflush(log_file); +} + +void signal_error(int sig, siginfo_t *si, void *ptr) +{ + void* error_addr; + void* trace[16]; + int x; + int trace_size; + char** messages; + + fprintf(stderr, "Something is wrong: backtrace: \n"); + uintptr_t fptr = (uintptr_t)(si->si_addr); + fprintf(stderr, "Signal: %d, function pointer: 0x%.12lX \n", sig, fptr); + #if __WORDSIZE == 64 + error_addr = (void*)((ucontext_t*)ptr)->uc_mcontext.gregs[REG_RIP]; + #else + error_addr = (void*)((ucontext_t*)ptr)->uc_mcontext.gregs[REG_EIP]; + #endif + + trace_size = backtrace(trace, 16); + trace[1] = error_addr; + + messages = backtrace_symbols(trace, trace_size); + if (messages) + { + for (x = 1; x < trace_size; x++) + { + fprintf(stderr, "%s\n", messages[x]); + } + free(messages); + } + + fprintf(stderr, "end of backtrace\n"); + + exit(1); +} + +typedef struct +{ + pthread_t **threads; + int thread_count; + conn_t *co; +} test_data_t; + +/*static*/ sexp_t *make_request(const char *req) +{ + char *request = strdup(req); + sexp_t *sx = parse_sexp(request, strlen(request)); + free(request); + return sx; +} + +/*static */int allocate_threads(int count, test_data_t *data) +{ + int i = 0; + data->threads = (pthread_t **)malloc(count * sizeof(pthread_t *)); + for(i = 0; i < count; ++i) { + data->threads[i] = (pthread_t *)malloc(sizeof(pthread_t)); + if(!data->threads) return ENOMEM; + } + data->thread_count = count; + return 0; +} +/*static */int deallocate_threads(test_data_t *data) +{ + int i = 0; + for(i = 0; i < data->thread_count; ++i) { + if(!data->threads[i]) return EINVAL; + pthread_join(*data->threads[i], NULL); + free(data->threads[i]); + data->threads[i] = 0; + } + free(data->threads); + data->thread_count = 0; + + return 0; +} + +void *test_invalid_channel(void *ctx) +{ + log_begin("Invalid channel testing"); + conn_t *co = (conn_t *)ctx; + chnl_t *channel = NULL; + int rc = 0, i; + for(i = 0; i < ITERATION_COUNT || ITERATION_COUNT < 0; ++i) { + rc = channel_open(co, &channel, 1); + log_assert("channel_open with type 1", rc, EINVAL); + // TODO: segmentation fault below + //rc = channel_close(channel); + //log_assert("channel_close with type 1", rc, EINVAL); + } + log_end("Invalid channel testing"); + return 0x00; +} + +void *test_correct_channel(void *ctx) +{ + log_begin("Channel testing"); + + conn_t *co = (conn_t *)ctx; + chnl_t *channel = NULL; + int rc = 0, i, j; + char buf[128]; + time_t start, end; + int a, b; + sexp_t *add_request = NULL; + sxmsg_t *msg = NULL; + double exec_time; + + for(j = 0; j < ITERATION_COUNT || ITERATION_COUNT < 0; ++j) { + rc = channel_open(co, &channel, 12); + log_assert("channel_open with type 12", rc, 0); + log_begin("Test messaging"); + //#if 0 + for(i = 0; i < MESSAGES_PER_SESSION; ++i) { + a = rand() % 100; + b = rand() % 100; + sprintf(buf, "(ar-add (%d %d))", a, b); + add_request = make_request(buf); + time(&start); + rc = msg_send(channel, add_request, &msg); + time(&end); + exec_time = difftime(end, start); + sprintf(buf, "rpc execution time: %lf", exec_time); + log_info(buf); + log_assert("rpc execution", rc, a + b); + //destroy_sexp(add_request); + } + //#endif + log_end("Test messaging"); + + rc = channel_close(channel); + log_assert("channel_close with type 12", rc, 0); + } + + log_end("Channel testing"); + return 0x00; +} + +int test_channels(test_data_t *data, int index) +{ + int rc = 0; + if(index < CLIENT_COUNT) { + rc = pthread_create(data->threads[index], NULL, test_correct_channel, data->co); + } else { + rc = pthread_create(data->threads[index], NULL, test_invalid_channel, data->co); + } + + return rc; +} + +void test_channel_handling(conn_t *co) +{ + chnl_t *channel = NULL; + int rc = 0, i = 0; + + for(i = 0; i < ITERATION_COUNT; ++i) { + rc = channel_open(co, &channel, 12); + log_assert("channel open function", rc, 0); + rc = channel_close(channel); + log_assert("channel close function", rc, 0); + } +} + +void test_message_handling(conn_t* co) +{ + chnl_t *channel = NULL; + int rc = 0, i = 0, a = 0, b = 0; + sexp_t *sx = NULL; + char *buf = NULL; + sxmsg_t *msg = NULL; + + buf = malloc(4096); + rc = channel_open(co, &channel, 12); + log_assert("channel open function", rc, 0); + for(i = 0; i < ITERATION_COUNT; ++i) { + a = rand() % 100; + b = rand() % 100; + sprintf(buf, "(ar-add (%d %d))", a, b); + sx = parse_sexp(buf, strlen(buf)); + rc = msg_send(channel, sx, &msg); + log_assert("message send function", rc, a + b); + // destroy_sexp(sx); + } + rc = channel_close(channel); + free(buf); + log_assert("channel close function", rc, 0); +} + +int main(int argc, char **argv) +{ + // set detailed signal handler + struct sigaction sigact; + sigact.sa_flags = SA_SIGINFO; + sigact.sa_sigaction = signal_error; + sigemptyset(&sigact.sa_mask); + sigaction(SIGFPE, &sigact, 0); + sigaction(SIGILL, &sigact, 0); + sigaction(SIGSEGV, &sigact, 0); + sigaction(SIGBUS, &sigact, 0); + + char *rootca = NULL, *cert = NULL; + int port = DEFAULT_PORT; + char *addr = NULL, *login = NULL, *password = NULL; + int opt; +#ifndef SIMPLE_TESTING + int rc, i; +#endif + while((opt = getopt(argc, argv, "p:r:a:u:l:w:")) != -1) { + switch(opt) { + case 'p': + port = atoi(optarg); + break; + case 'r': + rootca = strdup(optarg); + break; + case 'a': + addr = strdup(optarg); + break; + case 'u': + cert = strdup(optarg); + break; + case 'l': + login = strdup(optarg); + break; + case 'w': + password = strdup(optarg); + break; + default: + fprintf(stderr, "usage: %s [-p ] -r -a -u -l -w \n", argv[0]); + return EINVAL; + } + } + + if(!rootca) { + fprintf(stderr, "Root CA not pointed.\n Failure.\n"); + return EINVAL; + } + + if(!addr) { + fprintf(stderr, "Server address not pointed.\n Failure.\n"); + return EINVAL; + } + + if(!cert) { + fprintf(stderr, "User certificate not pointed.\n Failure.\n"); + return EINVAL; + } + + if(!login) { + fprintf(stderr, "User login not pointed.\n Failure.\n"); + return EINVAL; + } + + if(!password) { + fprintf(stderr, "User password not pointed.\n Failure.\n"); + return EINVAL; + } + + /* all is fine let's init connection subsystem */ + opt = connections_subsystem_init(); + if(opt) { + fprintf(stderr, "Subsystem init failed: %d\n", opt); + return opt; + } + /* set working certificates */ + opt = connections_subsystem_setsslserts(rootca, cert, cert); + if(opt) { + fprintf(stderr, "Subsystem init failed (set SSL x.509 pems): %d\n", opt); + return opt; + } + + /* Tests */ + /* try to open connection */ + conn_t *co = malloc(sizeof(conn_t)), *co2 = malloc(sizeof(conn_t)); + perm_ctx_t *ctx = (perm_ctx_t *)malloc(sizeof(perm_ctx_t)); + ctx->login = login; + ctx->passwd = password; + + log_init("test.log"); + + log_begin("Connection initiate"); + log_assert("Connection initiate", connection_initiate(co, addr, port, cert, ctx), 0); + log_end("Connection initiate"); + + log_begin("Connection initiate (second one for test)"); + log_assert("Connection initiate (second)", connection_initiate(co2, addr, port, cert, ctx), 0); + log_end("Connection initiate (second)"); + + printf("HERE!!!!\n"); + + log_begin("Connection close"); + log_assert("Connection close", connection_close(co), 0); + log_end("Connection close"); + + log_begin("Connection close (second)"); + log_assert("Connection close (second)", connection_close(co2), 0); + log_end("Connection close (second)"); + + log_close(); + + free(rootca); + free(cert); + free(co); + free(ctx); + free(password); + free(login); + free(addr); + + return 0; +} diff --git a/include/sntl/connection.h b/include/sntl/connection.h index 79b6e8c..cca9908 100644 --- a/include/sntl/connection.h +++ b/include/sntl/connection.h @@ -134,6 +134,12 @@ typedef struct __sexp_payload_t { #define ESXMSG_ISREPLY (1 << 8) #define ESXMSG_CLOSURE (1 << 9) +/** + * \brief Message used in sntl message passing + * + * This structure used to manage a message within a channel + * of the sntl structure stack. + */ typedef struct __message_t { chnl_t *pch; /** < channel of the message(if applicable) */ ulong_t mid; /** < unique ID within connection context */ @@ -160,6 +166,14 @@ typedef struct __connection_rpc_list_type { char *opt_version; /** < reserved for future implementations */ } cx_rpc_list_t; +/** + * \brief Connection subsystem structure. + * + * This structure used for management and control a set of a + * determined connections with the same RPC lists and the same + * mode (server, client). + * + */ typedef struct __connections_subsys_type { int ex_ssldata_index; /** < index used to work with additional data * provided to the special call during SSL handshake */ @@ -215,6 +229,7 @@ int connection_create(conn_t *co, int sck); int connection_close(conn_t *co); +/* FIXME: for the next versions */ int connection_reinit(conn_t *co); /* channels */ @@ -239,6 +254,9 @@ int msg_send_pulse_timed(chnl_t *ch, sexp_t *sx, struct timespec *tio); int msg_send_pulse_nowait(chnl_t *ch, sexp_t *sx); +/* additional functions */ +#define sntl_msg_get_secctx(m) (m)->pch->connection->pctx + /* RPC List API */ #define SNTL_FILTER_INC 0xa #define SNTL_FILTER_EXC 0xb diff --git a/lib/connection.c b/lib/connection.c index ec515dc..5712623 100644 --- a/lib/connection.c +++ b/lib/connection.c @@ -46,8 +46,10 @@ static void __destroy_msg(sxmsg_t *msg); static int __rpc_callback(void *data) { struct __rpc_job *job = (struct __rpc_job *)data; - - return job->rpcf((void *)(job->msg), job->sx); + int rc = 0; + rc = job->rpcf((void *)(job->msg), job->sx); + free(job); + return rc; } int __alloc_channel(ulong_t cid, conn_t *co, rpc_typed_list_t *rlist, chnl_t **channel) @@ -56,7 +58,7 @@ int __alloc_channel(ulong_t cid, conn_t *co, rpc_typed_list_t *rlist, chnl_t **c chnl_t *ch = malloc(sizeof(chnl_t)); usrtc_t *msg_tree = malloc(sizeof(usrtc_t)); idx_allocator_t *idx_msg = malloc(sizeof(idx_allocator_t)); - + if(!idx_msg) goto __fin_enomem; else if(idx_allocator_init(idx_msg, MAX_MSGINDEX, 0)) goto __fin_enomem; @@ -102,7 +104,7 @@ static int __conn_read(conn_t *co, void *buf, size_t buf_len) { int rfd = SSL_get_fd(co->ssl), r; fd_set readset; - + fprintf(stderr, "\tListening ... on %s\n", co->uuid); /* get prepare to select */ FD_ZERO(&readset); @@ -183,11 +185,10 @@ static long __cmp_int(const void *a, const void *b) static long __cmp_ulong(const void *a, const void *b) { - //printf("(??cmp_ulong)a = %ld b = %ld\n", *(ulong_t *)a , *(ulong_t *)b); return *(ulong_t *)a - *(ulong_t *)b; } -static int __resolvehost(const char *hostname, char *buf, int buf_len, +static int __resolvehost(const char *hostname, char *buf, int buf_len, struct hostent **rhp) { struct hostent *hostbuf = malloc(sizeof(struct hostent)); @@ -1233,13 +1234,18 @@ static int __default_msg_return(void *cctx, sexp_t *sx) } else { pthread_rwlock_unlock(&(chan->msglock)); smsg = (sxmsg_t *)usrtc_node_getdata(node); - pthread_rwlock_wrlock(&(chan->msglock)); - idx_free(chan->idx_msg, msg_id); - usrtc_delete(chan->msgs_tree, node); - pthread_rwlock_unlock(&(chan->msglock)); - //smsg = (sxmsg_t *)usrtc_node_getdata(node); smsg->opcode = opcode; + if(smsg->flags & ESXMSG_ISREPLY) + destroy_sexp((sexp_t *)smsg->payload); smsg->payload = NULL; + smsg->flags |= ESXMSG_CLOSURE; + + /* TODO: can we remove the message from the tree there??? */ + /* first remove the message from tree */ + pthread_rwlock_wrlock(&(chan->msglock)); + usrtc_delete(chan->msgs_tree, &(smsg->pendingq_node)); + pthread_rwlock_unlock(&(chan->msglock)); + pthread_mutex_unlock(&(smsg->wait)); } @@ -1505,7 +1511,7 @@ static void *__msg_queue_thread(void *ctx) if(buf) free(buf); return NULL; } - ; + while(1) { r = pth_queue_get(co->mqueue, NULL, tmp); if(r) { @@ -1559,7 +1565,8 @@ static void *__msg_queue_thread(void *ctx) destroy_sexp(msg->initial_sx); msg->initial_sx = NULL; msg->payload = NULL; - destroy_sexp(msg->payload); + if(msg->flags & ESXMSG_ISREPLY) + destroy_sexp(msg->payload); } else { ; pthread_mutex_unlock(&(msg->wait)); @@ -1606,6 +1613,8 @@ static void *__msg_queue_thread(void *ctx) pthread_rwlock_unlock(&(ch->msglock)); /* destroy */ destroy_sexp(msg->initial_sx); + if(msg->flags & ESXMSG_ISREPLY && msg->payload) + destroy_sexp((sexp_t *)msg->payload); __destroy_msg(msg); } @@ -1864,16 +1873,29 @@ int connection_initiate(conn_t *co, const char *host, int port, struct sockaddr_in addr; usrtc_t *ch_tree, *rpc_tree; pth_queue_t *mqueue = malloc(sizeof(pth_queue_t)); - if(!mqueue) return ENOMEM; pth_dqtpoll_t *tpoll = malloc(sizeof(pth_dqtpoll_t)); - if(!tpoll) return ENOMEM; // TODO: fallback idx_allocator_t *idx_ch = malloc(sizeof(idx_allocator_t)); - if(!idx_ch) return ENOMEM; - if(!co) return EINVAL; - if(!host) return EINVAL; - if(!SSL_cert) return EINVAL; - if(!pctx) return EINVAL; + if(!mqueue) { + __fallenomem: + r = ENOMEM; + __fall0: + if(mqueue) free(mqueue); + if(tpoll) free(tpoll); + if(idx_ch) free(idx_ch); + return r; + } + if(!tpoll) goto __fallenomem; + if(!idx_ch) goto __fallenomem; + + if(!co) { + __falleinval: + r = EINVAL; + goto __fall0; + } + if(!host) goto __falleinval; + if(!SSL_cert) goto __falleinval; + if(!pctx) goto __falleinval; memset(co, 0, sizeof(conn_t)); @@ -1915,13 +1937,13 @@ int connection_initiate(conn_t *co, const char *host, int port, /* set the local certificate from CertFile */ if(SSL_CTX_use_certificate_file(co->ctx, SSL_cert, SSL_FILETYPE_PEM)<=0) { - r = EINVAL; printf("%s:%d\n", __FUNCTION__, __LINE__); + r = EINVAL; goto __fail_3; } /* set the private key from KeyFile (may be the same as CertFile) */ if(SSL_CTX_use_PrivateKey_file(co->ctx, SSL_cert, SSL_FILETYPE_PEM)<=0) { - r = EINVAL; printf("%s:%d\n", __FUNCTION__, __LINE__); + r = EINVAL; goto __fail_3; } /* verify private key */ @@ -2012,7 +2034,6 @@ int connection_initiate(conn_t *co, const char *host, int port, free(buf); /* now we can free the temporary buffer */ /* a listening thread creation (incoming messages) */ - printf("%s:%d r = %d\n", __FUNCTION__, __LINE__, r); if(!r) { /* success let's start a listening thread */ r = pthread_create(&co->cthread, NULL, __cxslave_thread_listener, (void *)co); if(!r) { @@ -2022,14 +2043,13 @@ int connection_initiate(conn_t *co, const char *host, int port, pthread_rwlock_wrlock(&conn_sys->rwlock); usrtc_insert(conn_sys->connections, &co->csnode, (void *)co->uuid); pthread_rwlock_unlock(&conn_sys->rwlock); - //return 0; /* FIXME: */ } r = pthread_create(&co->msgthread, NULL, __msg_queue_thread, (void *)co); if(r) goto __fail_3; - + pth_dqtpoll_run(tpoll); co->tpoll = tpoll; - + return 0; } @@ -2058,7 +2078,7 @@ int connection_create(conn_t *co, int sck) idx_allocator_t *idx_ch = malloc(sizeof(idx_allocator_t)); if(!co) return EINVAL; - else memset(co, 0, sizeof(co)); + else memset(co, 0, sizeof(conn_t)); pth_dqtpoll_init(tpoll, __rpc_callback); // TODO: check it @@ -2600,6 +2620,9 @@ static int __msg_reply(sxmsg_t *msg, sexp_t *sx, struct timespec *tio, int opcod return ESXNOCONNECT; } + if(msg->flags & ESXMSG_ISREPLY) + destroy_sexp((sexp_t *)msg->payload); + msg->payload = sx; msg->opcode = opcode; msg->flags |= ESXMSG_PENDING; /* pending */ @@ -2622,17 +2645,12 @@ static int __msg_reply(sxmsg_t *msg, sexp_t *sx, struct timespec *tio, int opcod r = msg->opcode; -#if 0 if(msg->flags & ESXMSG_CLOSURE) { - /* first remove the message from tree */ - pthread_rwlock_wrlock(&(ch->msglock)); - usrtc_delete(ch->msgs_tree, &(msg->pendingq_node)); - pthread_rwlock_unlock(&(ch->msglock)); /* destroy */ destroy_sexp(msg->initial_sx); __destroy_msg(msg); } -#endif + return r; }