From 205ba8c0a8aad93cfb5b931400a3c411486e9ee1 Mon Sep 17 00:00:00 2001 From: leonid-ed Date: Mon, 27 Jul 2015 14:42:41 +0300 Subject: [PATCH 1/7] added tests/lv2ftp* --- tests/Makefile.am | 17 +- tests/lv2ftpc.c | 353 +++++++++++++++++++++++++++++++++++-- tests/lv2ftpd.c | 438 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 792 insertions(+), 16 deletions(-) diff --git a/tests/Makefile.am b/tests/Makefile.am index 3753f65..295ff01 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -15,19 +15,28 @@ libsntl = ../lib/.libs/libsntl.la if !BUILD_WIN32 -bin_PROGRAMS = lv2sd lv2sc +bin_PROGRAMS = lv2sd lv2sc lv2ftpd lv2ftpc lv2sd_SOURCES = lv2sd.c lv2sd_LDADD = $(LIBTDATA_LIBS) $(LIBSEXPR_LIBS) $(OPENSSL_LIBS) \ - $(LIBUUID_LIBS) $(libsntl) + $(LIBUUID_LIBS) $(libsntl) -lpthread lv2sc_SOURCES = lv2sc.c lv2sc_LDADD = $(LIBTDATA_LIBS) $(LIBSEXPR_LIBS) $(OPENSSL_LIBS) \ - $(LIBUUID_LIBS) $(libsntl) + $(LIBUUID_LIBS) $(libsntl) -lpthread + +lv2ftpd_SOURCES = lv2ftpd.c +lv2ftpd_LDADD = $(LIBTDATA_LIBS) $(LIBSEXPR_LIBS) $(OPENSSL_LIBS) \ + $(LIBUUID_LIBS) $(libsntl) -lpthread + +lv2ftpc_SOURCES = lv2ftpc.c +lv2ftpc_LDADD = $(LIBTDATA_LIBS) $(LIBSEXPR_LIBS) $(OPENSSL_LIBS) \ + $(LIBUUID_LIBS) $(libsntl) -lpthread + else BUILD_WIN32 -bin_PROGRAMS = lv2sc +bin_PROGRAMS = lv2sc lv2sc_SOURCES = lv2sc.c lv2sc_LDADD = $(LIBTDATA_LIBS) $(LIBSEXPR_LIBS) $(OPENSSL_LIBS) \ diff --git a/tests/lv2ftpc.c b/tests/lv2ftpc.c index 1edc43e..01a8294 100644 --- a/tests/lv2ftpc.c +++ b/tests/lv2ftpc.c @@ -1,19 +1,348 @@ -/* - * Secure Network Transport Layer Library v2 implementation. - * (sntllv2) it superseed all versions before due to the: - * - memory consumption - * - new features such as pulse emitting - * - performance optimization - * - * This is a proprietary software. See COPYING for further details. - * - * (c) Askele Group 2013-2015 - * - */ +#include +#define __USE_GNU +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifdef WIN32 +#include +#include +#else +#include +#include +#include +#include +#include +#include +#include +#include +#include +#endif + +#include +#include #include +#include + +#define DEBUG +#define FREE(x) { if (x) { free(x); x = NULL; } } +/* define a little bit */ +#define DEFAULT_PORT 13133 +#define CHANNEL_COUNT 200 +#define CLIENT_COUNT 256 +#define MESSAGES_PER_SESSION 10000 +#define ITERATION_COUNT 1000 + +#define FAILS_ONLY + +struct testdata { + int uc; + pthread_mutex_t ulock; + conn_t *co; +}; + +static int __init_testdata(struct testdata *t, conn_t *co) +{ + t->uc = 0; + pthread_mutex_init(&t->ulock, NULL); + t->co = co; + return 0; +} + +static void __wait_completion(struct testdata *t) +{ + pthread_mutex_lock(&t->ulock); + if(t->uc) { + pthread_mutex_lock(&t->ulock); + } + return; +} + +static int __set_typed_list_callback(conn_t *co, int ch, char *desc) +{ + printf("allowed channel %d (%s)\n", ch, desc); + return SNE_SUCCESS; +} + +static void *__addsthrd(void *a) +{ + struct testdata *t = a; + conn_t *co = t->co; + chnl_t *mch; + sxmsg_t *msg; + char mmbuf[1024]; + size_t ln; + int mr, i; + + pthread_mutex_lock(&t->ulock); + t->uc++; + pthread_mutex_unlock(&t->ulock); + + /* here we go */ + mch = sxchannel_open(co, 12); + if(!mch) { + fprintf(stderr, "Failed to openchannel with %d\n", errno); + goto __fini; + } + + for(i = 0; i < MESSAGES_PER_SESSION; i++) { + ln = snprintf(mmbuf, 1024, "(ar-add (10 10))"); + mr = sxmsg_send(mch, mmbuf, ln, &msg); + switch(mr) { + case SNE_RAPIDMSG: + //fprintf(stdout, "Rapidly replied: %s\n", (char *)sxmsg_payload(msg)); + sxmsg_clean(msg); + break; + case SNE_REPLYREQ: + if(sxmsg_datalen(msg)) fprintf(stdout, "Replied (confirmation required): %s\n", + (char *)sxmsg_payload(msg)); + mr = sxmsg_return(msg, SNE_SUCCESS); + fprintf(stderr, "mr = %d\n", mr); + break; + case SNE_SUCCESS: + fprintf(stdout, "Success.\n"); + break; + default: + fprintf(stderr, "ERROR: %d\n", mr); + break; + } + } + + sxchannel_close(mch); + + __fini: + t->uc--; + if(t->uc <= 1) pthread_mutex_unlock(&t->ulock); + + return NULL; +} + +int msg_send(chnl_t *ch, const char *mmbuf, size_t buflen, sxmsg_t *msg, char **rs) +{ +#ifdef DEBUG + printf("%s: got sx '%s'\n", __FUNCTION__, mmbuf); +#endif /* DEBUG */ + + int mr = sxmsg_send(ch, mmbuf, buflen, &msg); + switch(mr) { + case SNE_RAPIDMSG: + if (rs) *rs = strdup((char *)sxmsg_payload(msg)); + fprintf(stdout, "Rapidly replied: '%s'\n", (char *)sxmsg_payload(msg)); + sxmsg_clean(msg); + break; + case SNE_REPLYREQ: + if(sxmsg_datalen(msg)) fprintf(stdout, "Replied (confirmation required): %s\n", + (char *)sxmsg_payload(msg)); + mr = sxmsg_return(msg, SNE_SUCCESS); + fprintf(stderr, "mr = %d\n", mr); + break; + case SNE_SUCCESS: + fprintf(stdout, "Success.\n"); + break; + default: + fprintf(stderr, "ERROR: %d\n", mr); + break; + } +#ifdef DEBUG + // printf("%s: got rs %d\n", __FUNCTION__, mr); +#endif /* DEBUG */ + return mr; +} int main(int argc, char **argv) { + char *rootca = NULL, *cert = NULL; + int port = DEFAULT_PORT; + char *addr = NULL, *login = NULL, *password = NULL; + int opt; + conn_sys_t *ssys; + conn_t *co; + + while((opt = getopt(argc, argv, "p:r:a:u:l:w:")) != -1) { + switch(opt) { + case 'p': + port = atoi(optarg); + break; + case 'r': + rootca = strdup(optarg); + break; + case 'a': + addr = strdup(optarg); + break; + case 'u': + cert = strdup(optarg); + break; + case 'l': + login = strdup(optarg); + break; + case 'w': + password = strdup(optarg); + break; + default: + fprintf(stderr, "usage: %s [-p ] -r -a -u -l -w \n", argv[0]); + return EINVAL; + } + } + + if(!rootca) { + fprintf(stderr, "Root CA not pointed.\n Failure.\n"); + return EINVAL; + } + + if(!addr) { + fprintf(stderr, "Server address not pointed.\n Failure.\n"); + return EINVAL; + } + + if(!cert) { + fprintf(stderr, "User certificate not pointed.\n Failure.\n"); + return EINVAL; + } + + if(!login) { + fprintf(stderr, "User login not pointed.\n Failure.\n"); + return EINVAL; + } + + if(!password) { + fprintf(stderr, "User password not pointed.\n Failure.\n"); + return EINVAL; + } + + sntl_init(); + /* all is fine let's init connection subsystem */ + ssys = connections_create(); + if(!ssys) { + fprintf(stderr, "Subsystem init failed: %d\n", errno); + return errno; + } + /* set working certificates */ + opt = connections_setsslserts(ssys, rootca, cert, cert); + if(opt) { + fprintf(stderr, "Subsystem init failed (set SSL x.509 pems): %d\n", opt); + return opt; + } + + /* Tests */ + struct timeval beg, end; + /* try to open connection */ + connections_set_channelcall(ssys, __set_typed_list_callback); + + gettimeofday(&beg, NULL); + co = connection_link(ssys, addr, port, cert, login, password); + + if(!co) { + fprintf(stderr, "Failed to connection with %d\n", errno); + return errno; + } + gettimeofday(&end, NULL); + + if((end.tv_sec - beg.tv_sec) > 0) { + printf("Seconds: %ld ", end.tv_sec - beg.tv_sec); + printf("µS: %ld\n", end.tv_usec + (1000000 - beg.tv_usec)); + } else printf("µS: %ld\n", end.tv_usec - beg.tv_usec); + + /* ok now we should open a channel */ + chnl_t *testchannel = sxchannel_open(co, 12); + + if(!testchannel) { + fprintf(stderr, "Failed to openchannel with %d\n", errno); + return errno; + } + gettimeofday(&end, NULL); + + if((end.tv_sec - beg.tv_sec) > 0) { + printf("Seconds: %ld ", end.tv_sec - beg.tv_sec); + printf("µS: %ld\n", end.tv_usec + (1000000 - beg.tv_usec)); + } else printf("µS: %ld\n", end.tv_usec - beg.tv_usec); + + /* ok, send a message */ + char mmbuf[1024]; + char *s_ret = NULL; + sxmsg_t *msg = NULL; + size_t ln; + int mr, ee, stid; + sexp_t *sx_ret = NULL; + + ln = snprintf(mmbuf, 1024, "(dir-open \".\")"); + mr = msg_send(testchannel, mmbuf, ln, msg, &s_ret); + if (mr != SNE_RAPIDMSG) goto __finish; + + sx_ret = parse_sexp(s_ret, strlen(s_ret)); + FREE(s_ret); + if (!sx_ret || !sx_ret->list || strcmp(sx_ret->list->val, "dir-stream") || + !sx_ret->list->next || !sx_ret->list->next->val) + goto __finish; + + stid = atoi(sx_ret->list->next->val); + ln = snprintf(mmbuf, 1024, "(dir-read %d)", stid); + while (1) { + mr = msg_send(testchannel, mmbuf, ln, msg, &s_ret); + if (mr != SNE_RAPIDMSG) goto __finish; + + sx_ret = parse_sexp(s_ret, strlen(s_ret)); + if (!sx_ret || !sx_ret->list || !sx_ret->list->val) + goto __finish; + + if (!strcmp(sx_ret->list->val, "dir-end")) { + ln = snprintf(mmbuf, 1024, "(dir-close %d)", stid); + mr = msg_send(testchannel, mmbuf, ln, msg, &s_ret); + break; + } +#ifdef DEBUG + printf("%s: '%s'\n", __FUNCTION__, s_ret); +#endif /* DEBUG */ + FREE(s_ret); + } + + // switch(mr) { + // case SNE_RAPIDMSG: + // fprintf(stdout, "Rapidly replied: %s\n", (char *)sxmsg_payload(msg)); + // sxmsg_clean(msg); + // break; + // case SNE_REPLYREQ: + // if(sxmsg_datalen(msg)) fprintf(stdout, "Replied (confirmation required): %s\n", + // (char *)sxmsg_payload(msg)); + // mr = sxmsg_return(msg, SNE_SUCCESS); + // fprintf(stderr, "mr = %d\n", mr); + // break; + // case SNE_SUCCESS: + // fprintf(stdout, "Success.\n"); + // break; + // default: + // fprintf(stderr, "ERROR: %d\n", mr); + // break; + // } + +__finish: + ee = sxchannel_close(testchannel); + printf("ee = %d\n", ee); + gettimeofday(&end, NULL); + + if((end.tv_sec - beg.tv_sec) > 0) { + printf("Seconds: %ld ", end.tv_sec - beg.tv_sec); + printf("µS: %ld\n", end.tv_usec + (1000000 - beg.tv_usec)); + } else printf("µS: %ld\n", end.tv_usec - beg.tv_usec); + // sleep(10); + // /* ok, now we need to create many threads */ + // struct testdata trd; + // pthread_t thrd; + // int i; + // __init_testdata(&trd, co); + + // for(i = 0; i < 256; i++) pthread_create(&thrd, NULL, __addsthrd, &trd); + + // __wait_completion(&trd); + + connection_close(co); + return 0; } diff --git a/tests/lv2ftpd.c b/tests/lv2ftpd.c index 1edc43e..6ecbeb7 100644 --- a/tests/lv2ftpd.c +++ b/tests/lv2ftpd.c @@ -11,9 +11,447 @@ * */ +#include +#include +#define __USE_GNU +#include +#include +#include +#include +#ifdef WIN32 +#include +#include +#include +#else +#include +#include +#include +#include +#include +#include +#include +#include +#include +#endif + +#include +#include +#include #include +#include +// #include +#include +#include +#include + +#define DEBUG +#define FREE(x) { if (x) { free(x); x = NULL; } } +#define MAX_STREAMS INT_MAX + +typedef struct __datastream_type { + int dsid; + usrtc_node_t node; + // readdir_r + DIR *dp; + struct dirent dent; + struct dirent *dres; +} datastream_t; + +usrtc_t *_rd_streams; +int _rd_last_id = 0; +pthread_rwlock_t _lock; + +/* helper functions */ +static long __cmp_int(const void *a, const void *b) +{ + return *(int *)a - *(int *)b; +} + +inline void dump_dirent(struct dirent *d, char *buf) +{ + if (d == NULL) return; + strcat(buf, "\""); + strcat(buf, d->d_name); + strcat(buf, "\" "); + switch(d->d_type) + { + case DT_REG: + strcat(buf, "\"regular\""); + break; + case DT_DIR: + strcat(buf, "\"directory\""); + break; + case DT_BLK: + strcat(buf, "\"block\""); + break; + case DT_CHR: + strcat(buf, "\"char\""); + break; + case DT_FIFO: + strcat(buf, "\"fifo\""); + break; + case DT_LNK: + strcat(buf, "\"link\""); + break; + case DT_UNKNOWN: + strcat(buf, "\"unknown\""); + break; + default: + strcat(buf, "\"another\""); + break; + } +} + +int __openlistener(int port) +{ + int sd; + struct sockaddr_in addr; + + sd = socket(PF_INET, SOCK_STREAM, 0); + bzero(&addr, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = INADDR_ANY; + if ( bind(sd, (struct sockaddr*)&addr, sizeof(addr)) != 0 ) { + perror("can't bind port"); + abort(); + } + if ( listen(sd, 10) != 0 ) { + perror("Can't configure listening port"); + abort(); + } + + return sd; +} + +/* + * Validation of the SSL certificate + * this function must be exist. + */ +static int __validate_sslpem(conn_t *co) +{ + return 0; +} + +/* + * validate - authorize user with password + */ +static int __secure_check(conn_t *co) +{ + return SNE_SUCCESS; +} + +/* + * typed list callback + */ +static int __set_typed_list_callback(conn_t *co, int ch, char *desc) +{ + printf("allowed channel %d (%s)\n", ch, desc); + return SNE_SUCCESS; +} + +/* list of rpc calls functions */ +usrtc_t *fulist; + +/* our fake */ +usrtc_t *__rettlist(conn_t *c) +{ + return fulist; +} + +/* RPC functions implementation */ +static int __dir_open(void *m, sexp_t *sx) +{ + sexp_t *lsx = NULL; + sxmsg_t *msg = (sxmsg_t *)m; + char *buf; + size_t ln = 0; + +#ifdef DEBUG + char dbuf[2048]; + print_sexp(dbuf, sizeof(dbuf), sx); + printf("%s: got sx '%s'\n", __FUNCTION__, dbuf); +#endif /* DEBUG */ + + if(sexp_list_cdr(sx, &lsx) || !sx->list->next || !sx->list->next->val) { + printf("Invalid protocol\n"); + return sxmsg_return(msg, SNE_BADPROTO); + } + + char *dir_name = sx->list->next->val; + + if ( usrtc_isfull(_rd_streams) ) { + return sxmsg_return(msg, EINVAL); + } + + datastream_t *item; + if ( !(item = malloc(sizeof(datastream_t))) ) { + return sxmsg_return(msg, ENOMEM); + } + + /* find free id */ + pthread_rwlock_wrlock(&_lock); + do { + if (_rd_last_id == INT_MAX) + _rd_last_id = 1; + else + ++_rd_last_id; + } while (usrtc_lookup(_rd_streams, &_rd_last_id)); + + /* create rd stream */ + DIR *dp = NULL; + if ( !(dp = opendir(dir_name)) ) { + pthread_rwlock_unlock(&_lock); + free(item); + return sxmsg_return(msg, EINVAL); + } + + /* init stream */ + item->dsid = _rd_last_id; + item->dp = dp; + item->dres = NULL; + usrtc_node_init(&(item->node), item); + usrtc_insert(_rd_streams, &(item->node), &item->dsid); + + buf = sxmsg_rapidbuf(msg); + ln = snprintf(buf, MAX_RBBUF_LEN, "(dir-stream %d)", item->dsid); + + pthread_rwlock_unlock(&_lock); + +#ifdef DEBUG + printf("%s: stream %d has been opened\n", __FUNCTION__, item->dsid); +#endif /* DEBUG */ + + return sxmsg_rreply(msg, ln + 1); +} + +static int __dir_read(void *m, sexp_t *sx) +{ + sexp_t *lsx = NULL; + sxmsg_t *msg = (sxmsg_t *)m; + char *buf; + size_t ln = 0; + int stid = -1; + +#ifdef DEBUG + char dbuf[2048]; + print_sexp(dbuf, sizeof(dbuf), sx); + printf("%s: got sx '%s'\n", __FUNCTION__, dbuf); +#endif /* DEBUG */ + + if (sexp_list_cdr(sx, &lsx) || !sx->list->next || !sx->list->next->val + || (0 >= (stid = atoi(sx->list->next->val) )) ) + { + printf("Invalid protocol\n"); + return sxmsg_return(msg, SNE_BADPROTO); + } + + /* get stream item */ + datastream_t *item; + usrtc_node_t *node; + pthread_rwlock_rdlock(&_lock); + if ( !(node = usrtc_lookup(_rd_streams, &stid)) ) { + pthread_rwlock_unlock(&_lock); + return sxmsg_return(msg, ENOENT); + } + pthread_rwlock_unlock(&_lock); + + pthread_rwlock_wrlock(&_lock); + item = usrtc_node_getdata(node); + if (readdir_r(item->dp, &(item->dent), &(item->dres))) { +#ifdef DEBUG + printf("%s: readdir_r() is failed (%d)\n", __FUNCTION__, errno); +#endif /* DEBUG */ + pthread_rwlock_unlock(&_lock); + return sxmsg_return(msg, EINVAL); + } + + if (!item->dres) { + buf = sxmsg_rapidbuf(msg); + ln = snprintf(buf, MAX_RBBUF_LEN, "(dir-end %d)", item->dsid); + goto __finish; + } + + char dump[2048]; + sprintf(dump, "(dir-entry ("); + dump_dirent(item->dres, dump); + strcat(dump, "))"); + +#ifdef DEBUG + printf("%s: dump = '%s'\n", __FUNCTION__, dump); +#endif /* DEBUG */ + + buf = sxmsg_rapidbuf(msg); + ln = snprintf(buf, MAX_RBBUF_LEN, "%s", dump); + +__finish: + pthread_rwlock_unlock(&_lock); + return sxmsg_rreply(msg, ln + 1); +} + +static int __dir_close(void *m, sexp_t *sx) +{ + sexp_t *lsx = NULL; + sxmsg_t *msg = (sxmsg_t *)m; + int stid = -1; + +#ifdef DEBUG + char dbuf[2048]; + print_sexp(dbuf, sizeof(dbuf), sx); + printf("%s: got sx '%s'\n", __FUNCTION__, dbuf); +#endif /* DEBUG */ + + if (sexp_list_cdr(sx, &lsx) || !sx->list->next || !sx->list->next->val + || (0 >= (stid = atoi(sx->list->next->val) )) ) + { + printf("Invalid protocol\n"); + return sxmsg_return(msg, SNE_BADPROTO); + } + + /* get stream item */ + datastream_t *item; + usrtc_node_t *node; + pthread_rwlock_rdlock(&_lock); + if ( !(node = usrtc_lookup(_rd_streams, &stid)) ) { + pthread_rwlock_unlock(&_lock); + return sxmsg_return(msg, ENOENT); + } + pthread_rwlock_unlock(&_lock); + + pthread_rwlock_wrlock(&_lock); + item = usrtc_node_getdata(node); + closedir(item->dp); + FREE(item); + usrtc_delete(_rd_streams, node); + pthread_rwlock_unlock(&_lock); + +#ifdef DEBUG + printf("%s: stream %d has been closed\n", __FUNCTION__, stid); +#endif /* DEBUG */ + + return sxmsg_return(msg, SNE_SUCCESS); +} + +/* define a little bit */ +#define DEFAULT_PORT 13133 + +static void sigpipe_handler(int a) +{ + return; +} + int main(int argc, char **argv) { + signal(SIGPIPE, SIG_IGN); + + char *rootca = NULL, *cert = NULL; + conn_sys_t *ssys = connections_create(); + int port = DEFAULT_PORT; + int opt; + + while((opt = getopt(argc, argv, "p:r:u:")) != -1) { + switch(opt) { + case 'p': + port = atoi(optarg); + break; + case 'r': + rootca = strdup(optarg); + break; + case 'u': + cert = strdup(optarg); + break; + default: + fprintf(stderr, "usage: %s [-p ] -r -u \n", argv[0]); + return EINVAL; + } + } + + if(!rootca) { + fprintf(stderr, "Root CA not pointed.\n Failure.\n"); + return EINVAL; + } + + if(!cert) { + fprintf(stderr, "User certificate not pointed.\n Failure.\n"); + return EINVAL; + } + + sntl_init(); + /* all is fine let's init connection subsystem */ + if(!ssys) { + fprintf(stderr, "Subsystem init failed: %d\n", opt); + return 2; + } + /* set wroking certificates */ + opt = connections_setsslserts(ssys, rootca, cert, cert); + if(opt) { + fprintf(stderr, "Subsystem init failed (set SSL x.509 pems): %d\n", opt); + return opt; + } + + /* clean up */ + free(rootca); + free(cert); + + /* set important callbacks to do the security checking */ + connections_set_authcheck(ssys, __secure_check); + connections_set_sslvalidate(ssys, __validate_sslpem); + /* set a callback, it's optional and doesn't required in server side apps */ + connections_set_channelcall(ssys, __set_typed_list_callback); + + /* ok, now we need to construct RPC lists (channels) */ + if(!(fulist = malloc(sizeof(usrtc_t)))) { + fprintf(stderr, "Cannot allocate memory for RPC lists\n Failure.\n"); + return ENOMEM; + } + opt = sntl_rpclist_init(fulist); + if(opt) { + fprintf(stderr, "Failed to init rpc list\n Failure.\n"); + return opt; + } + + /* we will add one channel with type id 12 "Demo rpc list" */ + opt = sntl_rpclist_add(fulist, 12, "Demo RPC list", NULL); + if(opt) { + fprintf(stderr, "Failed to add typed RPC channel\n Failure.\n"); + return opt; + } + + /* ok, let's add stream functions */ + opt = sntl_rpclist_add_function(fulist, 12, "dir-open", __dir_open); + if(opt) { + __fail: + fprintf(stderr, "Failed to add functions to typed RPC channel\n Failure.\n"); + return opt; + } + opt = sntl_rpclist_add_function(fulist, 12, "dir-read", __dir_read); + if(opt) goto __fail; + opt = sntl_rpclist_add_function(fulist, 12, "dir-close", __dir_close); + if(opt) goto __fail; + + /* ok, setup it */ + connections_set_rpcvalidator(ssys, __rettlist); + + /* create stream tree */ + if(!(_rd_streams = malloc(sizeof(usrtc_t)) )) + return ENOMEM; + usrtc_init(_rd_streams, USRTC_REDBLACK, MAX_STREAMS, __cmp_int); + + /* now we're ready to run the listen process */ + int srv = __openlistener(port); + while(1) { + struct sockaddr_in addr; + socklen_t len = sizeof(addr); + conn_t *co; + + int client = accept(srv, (struct sockaddr*)&addr, &len); /* accept connection as usual */ + co = connection_master_link(ssys, client, NULL); /* create connection, that's all */ + if(!co) { + fprintf(stderr, "Cannot create connetion (%d)\n", opt); + } + } + return 0; } From 0e32b6299010fd42fa043d4868e208cf29a31c1d Mon Sep 17 00:00:00 2001 From: leonid-ed Date: Fri, 31 Jul 2015 14:12:47 +0300 Subject: [PATCH 2/7] tests: fixed lv2ftpd --- tests/lv2ftpd.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/lv2ftpd.c b/tests/lv2ftpd.c index 6ecbeb7..49df47c 100644 --- a/tests/lv2ftpd.c +++ b/tests/lv2ftpd.c @@ -179,15 +179,15 @@ static int __dir_open(void *m, sexp_t *sx) return sxmsg_return(msg, SNE_BADPROTO); } - char *dir_name = sx->list->next->val; + char *dir_name = strdup(sx->list->next->val); if ( usrtc_isfull(_rd_streams) ) { - return sxmsg_return(msg, EINVAL); + return sxmsg_return(msg, SNE_TOOLONG); } datastream_t *item; if ( !(item = malloc(sizeof(datastream_t))) ) { - return sxmsg_return(msg, ENOMEM); + return sxmsg_return(msg, SNE_ENOMEM); } /* find free id */ @@ -204,7 +204,7 @@ static int __dir_open(void *m, sexp_t *sx) if ( !(dp = opendir(dir_name)) ) { pthread_rwlock_unlock(&_lock); free(item); - return sxmsg_return(msg, EINVAL); + return sxmsg_return(msg, SNE_FAILED); } /* init stream */ @@ -253,7 +253,7 @@ static int __dir_read(void *m, sexp_t *sx) pthread_rwlock_rdlock(&_lock); if ( !(node = usrtc_lookup(_rd_streams, &stid)) ) { pthread_rwlock_unlock(&_lock); - return sxmsg_return(msg, ENOENT); + return sxmsg_return(msg, SNE_INVALINDEX); } pthread_rwlock_unlock(&_lock); @@ -264,7 +264,7 @@ static int __dir_read(void *m, sexp_t *sx) printf("%s: readdir_r() is failed (%d)\n", __FUNCTION__, errno); #endif /* DEBUG */ pthread_rwlock_unlock(&_lock); - return sxmsg_return(msg, EINVAL); + return sxmsg_return(msg, SNE_FAILED); } if (!item->dres) { From 5606c799aeed160c1d0d17cd42fdfd8d2830d08f Mon Sep 17 00:00:00 2001 From: leonid-ed Date: Fri, 31 Jul 2015 14:29:02 +0300 Subject: [PATCH 3/7] tests: fixed lv2ftpd one more time --- tests/lv2ftpd.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/lv2ftpd.c b/tests/lv2ftpd.c index 49df47c..183ce89 100644 --- a/tests/lv2ftpd.c +++ b/tests/lv2ftpd.c @@ -322,8 +322,8 @@ static int __dir_close(void *m, sexp_t *sx) pthread_rwlock_wrlock(&_lock); item = usrtc_node_getdata(node); closedir(item->dp); - FREE(item); usrtc_delete(_rd_streams, node); + FREE(item); pthread_rwlock_unlock(&_lock); #ifdef DEBUG From 35019887b56058de909d5cfa9bfe7fed3846d30d Mon Sep 17 00:00:00 2001 From: leonid-ed Date: Fri, 31 Jul 2015 18:57:04 +0300 Subject: [PATCH 4/7] tests: fixed a leak in lv2ftpd --- tests/lv2ftpd.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/lv2ftpd.c b/tests/lv2ftpd.c index 183ce89..c75aeba 100644 --- a/tests/lv2ftpd.c +++ b/tests/lv2ftpd.c @@ -203,10 +203,13 @@ static int __dir_open(void *m, sexp_t *sx) DIR *dp = NULL; if ( !(dp = opendir(dir_name)) ) { pthread_rwlock_unlock(&_lock); - free(item); + FREE(item); + FREE(dir_name); return sxmsg_return(msg, SNE_FAILED); } + FREE(dir_name); + /* init stream */ item->dsid = _rd_last_id; item->dp = dp; From bd2f05e2810378a957b0ae216fc89cca0ca71860 Mon Sep 17 00:00:00 2001 From: Alexander Vdolainen Date: Fri, 31 Jul 2015 20:02:25 +0300 Subject: [PATCH 5/7] bugs fixes; --- include/sntl/limits.h | 5 ++- lib/messagesx.c | 26 ++++++++---- lib/sntllv2.c | 99 +++++++++++++++++++++++++++++-------------- 3 files changed, 88 insertions(+), 42 deletions(-) diff --git a/include/sntl/limits.h b/include/sntl/limits.h index f69a45b..3d9405a 100644 --- a/include/sntl/limits.h +++ b/include/sntl/limits.h @@ -16,6 +16,9 @@ #define MAX_RPC_LIST 512 -#define MAX_RBBUF_LEN (65536 - sizeof(sntllv2_head_t)) +#define MAX_SNTLLBUFSIZE 65536 +#define MAX_RBBUF_LEN (MAX_SNTLLBUFSIZE - sizeof(sntllv2_head_t)) + +#define MAX_SNTLLTHREADS 8 #endif /* __SNTL_LIMITS_H__ */ diff --git a/lib/messagesx.c b/lib/messagesx.c index 6e33d74..90bcc91 100644 --- a/lib/messagesx.c +++ b/lib/messagesx.c @@ -20,11 +20,13 @@ #include #include #include +#include #include #include #include +#include #include #include "internal.h" @@ -100,9 +102,16 @@ static inline int __sxmsg_send(chnl_t *channel, const char *data, size_t datalen /* ready to send it */ if(!pp) { r = _sntll_writemsg(co, msg); - if(r != SNE_SUCCESS) goto __closemsg; + if(r != SNE_SUCCESS) { + __unpinmsg: + pthread_mutex_lock(&co->idx_msg_lock); + idx_free(&co->idx_msg, msgidx); + co->messages[msgidx] = NULL; + pthread_mutex_unlock(&co->idx_msg_lock); + goto __freemsg; + } } else { /* postponed */ - if(!(ppm = malloc(sizeof(ppmsg_t)))) { r = SNE_ENOMEM; goto __closemsg; } + if(!(ppm = malloc(sizeof(ppmsg_t)))) { r = SNE_ENOMEM; goto __unpinmsg; } list_init_node(&ppm->node); ppm->msg = msg; @@ -113,18 +122,16 @@ static inline int __sxmsg_send(chnl_t *channel, const char *data, size_t datalen pthread_mutex_unlock(&co->write_pending_lock); } - pthread_mutex_lock(&msg->wait); /* we will sleep here */ + //pthread_mutex_lock(&msg->wait); /* we will sleep here */ + while(pthread_mutex_trylock(&msg->wait)) { + //printf("here opcode = %d\n", head->opcode); + } if(head->payload_length) { *omsg = msg; return head->opcode; } else r = head->opcode; - __closemsg: - pthread_mutex_lock(&co->idx_msg_lock); - idx_free(&co->idx_msg, msgidx); - co->messages[msgidx] = NULL; - pthread_mutex_unlock(&co->idx_msg_lock); __freemsg: /* free resources for message */ pthread_mutex_unlock(&msg->wait); @@ -187,7 +194,7 @@ static inline int __sxmsg_reply(sxmsg_t *msg, const char *data, if(!(co = ch->connection)) return SNE_FAILED; /* test for blocking */ - for(i = 0; i < 8; i++) + for(i = 0; i < MAX_SNTLLTHREADS; i++) if(pthread_equal(self, co->thrd_poll[i])) return SNE_WOULDBLOCK; /* prepare it */ @@ -298,6 +305,7 @@ static inline int __sxmsg_return(sxmsg_t *msg, int opcode, int pp) pthread_mutex_unlock(&co->idx_msg_lock); r = _sntll_writemsg(co, msg); + free(msg); } else { if(!(ppm = malloc(sizeof(ppmsg_t)))) return SNE_ENOMEM; else { /* remove it */ diff --git a/lib/sntllv2.c b/lib/sntllv2.c index 332b0fc..7cf4669 100644 --- a/lib/sntllv2.c +++ b/lib/sntllv2.c @@ -40,6 +40,7 @@ #include #include +#include #include #include "internal.h" @@ -195,6 +196,7 @@ int _sntll_writemsg(conn_t *co, sxmsg_t *msg) sntllv2_head_t *head; size_t rd; int r; + char *buf = NULL; if(!co || !msg) return SNE_FAILED; @@ -202,21 +204,25 @@ int _sntll_writemsg(conn_t *co, sxmsg_t *msg) head = &msg->mhead; if(head->payload_length && !msg->payload) return SNE_FAILED; + if(head->payload_length) { + buf = malloc(sizeof(sntllv2_head_t) + head->payload_length); + memcpy(buf, head, sizeof(sntllv2_head_t)); + memcpy(buf + sizeof(sntllv2_head_t), msg->payload, head->payload_length); + } + /* write the head and payload if applicable */ pthread_mutex_lock(&co->sslinout[1]); - rd = __conn_write(co, head, sizeof(sntllv2_head_t)); + if(!buf) + rd = __conn_write(co, head, sizeof(sntllv2_head_t)); + else rd = __conn_write(co, buf, sizeof(sntllv2_head_t) + head->payload_length); if(rd < 0) { co->flags |= SNSX_CLOSED; r = SNE_ESSL; - } else if(head->payload_length) { - rd = __conn_write(co, msg->payload, head->payload_length); - /* check up again */ - if(rd < 0) { co->flags |= SNSX_CLOSED; r = SNE_ESSL; } } pthread_mutex_unlock(&co->sslinout[1]); if(!(co->flags & SNSX_CLOSED)) r = SNE_SUCCESS; - + if(buf) free(buf); return r; } @@ -442,7 +448,7 @@ static int __eval_syssexp(conn_t *co, sexp_t *sx) static void __connection_destroy(conn_t *co) { - int i = 0; + int i = 0, fd; sxmsg_t *msg, *omsg; ppmsg_t *ppm; list_node_t *iter, *siter; @@ -473,24 +479,26 @@ static void __connection_destroy(conn_t *co) pthread_mutex_unlock(&co->write_pending_lock); } - /* go thru messages */ - pthread_mutex_lock(&co->idx_msg_lock); - for(i = 0; i < 1024; i++) { - msg = co->messages[i]; - if(!msg) continue; - else head = &msg->mhead; - head->opcode = SNE_LINKERROR; - pthread_mutex_unlock(&msg->wait); - co->messages[i] = NULL; - idx_free(&co->idx_msg, i); - } - pthread_mutex_unlock(&co->idx_msg_lock); - /* update use count */ _CONN_NOTINUSE(co); /* ok, let's free other if we can */ if(!_CONN_UCOUNT(co)) { + /* go thru messages */ + pthread_mutex_lock(&co->idx_msg_lock); + for(i = 0; i < 1024; i++) { + msg = co->messages[i]; + if(!msg) continue; + else head = &msg->mhead; + head->opcode = SNE_LINKERROR; + pthread_mutex_unlock(&msg->wait); + pthread_mutex_destroy(&msg->wait); + free(msg); + co->messages[i] = NULL; + idx_free(&co->idx_msg, i); + } + pthread_mutex_unlock(&co->idx_msg_lock); + /* ok now we will free the channels */ pthread_mutex_lock(&co->idx_ch_lock); for(i = 0; i < 512; i++) { @@ -506,9 +514,10 @@ static void __connection_destroy(conn_t *co) if(co->pctx->passwd) free(co->pctx->passwd); SSL_shutdown(co->ssl); - close(SSL_get_fd(co->ssl)); + fd = SSL_get_fd(co->ssl); SSL_free(co->ssl); SSL_CTX_free(co->ctx); + close(fd); __connection_second_free(co); __connection_minimal_free(co); } @@ -533,7 +542,9 @@ static void *__sntll_thread(void *b) int dispatch = 0, e; size_t rd, wr; ulong_t mid; - +#ifdef _PERFPROFILE + struct timeval beg, end; +#endif /* byte buffer is following head */ bbuf += sizeof(sntllv2_head_t); @@ -549,7 +560,7 @@ static void *__sntll_thread(void *b) /* check up a thread */ if(pthread_equal(self, co->thrd_poll[7])) /* dispatcher */ - dispatch = 1; + dispatch = 0; /* update use count */ _CONN_INUSE(co); @@ -595,11 +606,23 @@ static void *__sntll_thread(void *b) pthread_mutex_unlock(&(co->sslinout[0])); goto __finish; } +#ifdef _PERFPROFILE + gettimeofday(&beg, NULL); +#endif rd = __conn_read(co, mhead, sizeof(sntllv2_head_t)); +#ifdef _PERFPROFILE + gettimeofday(&end, NULL); + + if((end.tv_sec - beg.tv_sec) > 0) { + printf("connread(head) Seconds: %ld ", end.tv_sec - beg.tv_sec); + printf("µS: %ld\n", end.tv_usec + (1000000 - beg.tv_usec)); + } else printf("connread(head) µS: %ld\n", end.tv_usec - beg.tv_usec); +#endif + #ifdef _VERBOSE_DEBUG dumphead(mhead); #endif - if(rd != sizeof(sntllv2_head_t)) { + if(rd < 0) { __sslproto_error: co->flags |= SNSX_CLOSED; pthread_mutex_unlock(&(co->sslinout[0])); @@ -607,9 +630,21 @@ static void *__sntll_thread(void *b) } else { /* check up if we can read or not */ if(mhead->payload_length) { +#ifdef _PERFPROFILE + gettimeofday(&beg, NULL); +#endif rd = __conn_read(co, bbuf, mhead->payload_length); - if(rd < 0) goto __sslproto_error; +#ifdef _PERFPROFILE + gettimeofday(&end, NULL); + if((end.tv_sec - beg.tv_sec) > 0) { + printf("connread(payload) Seconds: %ld ", end.tv_sec - beg.tv_sec); + printf("µS: %ld\n", end.tv_usec + (1000000 - beg.tv_usec)); + } else printf("connread(payload) µS: %ld\n", end.tv_usec - beg.tv_usec); +#endif + + if(rd == -1) goto __sslproto_error; else pthread_mutex_unlock(&(co->sslinout[0])); + if(rd != mhead->payload_length) { mid = mhead->msgid; /* if we're need to do something */ @@ -617,9 +652,9 @@ static void *__sntll_thread(void *b) mhead->opcode = SNE_INVALINDEX; goto __return_error; } else { - pthread_mutex_lock(&co->idx_msg_lock); + // pthread_mutex_lock(&co->idx_msg_lock); msg = co->messages[mid]; - pthread_mutex_unlock(&co->idx_msg_lock); + //thread_mutex_unlock(&co->idx_msg_lock); } if(!msg) { if(mhead->attr & SXMSG_OPEN) mhead->opcode = SNE_BADPROTO; @@ -662,9 +697,9 @@ static void *__sntll_thread(void *b) goto __again; } mid = mhead->msgid; - pthread_mutex_lock(&co->idx_msg_lock); + //hread_mutex_lock(&co->idx_msg_lock); msg = co->messages[mid]; - pthread_mutex_unlock(&co->idx_msg_lock); + //hread_mutex_unlock(&co->idx_msg_lock); if(!msg) goto __inval_idx_nor; /* ok now we'are copy data and unlock wait mutex */ @@ -732,6 +767,7 @@ static void *__sntll_thread(void *b) pthread_mutex_unlock(&co->idx_msg_lock); goto __inval_idx_nor; } /* message dialog is closed - remove this right now */ + idx_free(&co->idx_msg, mid); co->messages[mid] = NULL; pthread_mutex_unlock(&co->idx_msg_lock); @@ -944,7 +980,7 @@ conn_t *connection_master_link(conn_sys_t *ssys, int sck, struct in_addr *addr) bundle->conn = co; } int i; - for(i = 0; i < 8; i++) { + for(i = 0; i < MAX_SNTLLTHREADS; i++) { if(bundle == (void *)0xdead) bundle = __sntll_bundle_create(co); if(!bundle) goto __fail5; r = pthread_create(&co->thrd_poll[i], NULL, __sntll_thread, bundle); @@ -1144,7 +1180,6 @@ conn_t *connection_link(conn_sys_t *ssys, const char *host, head->payload_length = 0; wr = __conn_write(co, head, sizeof(sntllv2_head_t)); if(wr < 0) { - blub("fuck"); r = SNE_LINKERROR; goto __fail2;} if(r != SNE_SUCCESS) { r = SNE_LINKERROR; goto __fail2;} } @@ -1163,7 +1198,7 @@ conn_t *connection_link(conn_sys_t *ssys, const char *host, bundle->buf = buf; bundle->conn = co; } - for(i = 0; i < 8; i++) { + for(i = 0; i < MAX_SNTLLTHREADS; i++) { if(bundle == (void *)0xdead) bundle = __sntll_bundle_create(co); if(!bundle) goto __fail5; r = pthread_create(&co->thrd_poll[i], NULL, __sntll_thread, bundle); From 9c19c112989bc7fe3d8e2a2abe92957952bb0d9d Mon Sep 17 00:00:00 2001 From: Alexander Vdolainen Date: Fri, 31 Jul 2015 20:12:10 +0300 Subject: [PATCH 6/7] avoid of error string loading; --- lib/sntllv2.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/sntllv2.c b/lib/sntllv2.c index 7cf4669..2b4c36c 100644 --- a/lib/sntllv2.c +++ b/lib/sntllv2.c @@ -318,7 +318,7 @@ int sntl_init(void) SSL_library_init(); OpenSSL_add_all_algorithms(); - SSL_load_error_strings(); + //SSL_load_error_strings(); ex_ssldata_index = SSL_get_ex_new_index(0, "__ssldata index", NULL, NULL, NULL); From a40133f23ac9d5de61b1f0eea4925c80392a2f75 Mon Sep 17 00:00:00 2001 From: Alexander Vdolainen Date: Fri, 31 Jul 2015 21:27:52 +0300 Subject: [PATCH 7/7] mutex was returned; --- lib/messagesx.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/messagesx.c b/lib/messagesx.c index 90bcc91..6bf76ca 100644 --- a/lib/messagesx.c +++ b/lib/messagesx.c @@ -122,10 +122,12 @@ static inline int __sxmsg_send(chnl_t *channel, const char *data, size_t datalen pthread_mutex_unlock(&co->write_pending_lock); } - //pthread_mutex_lock(&msg->wait); /* we will sleep here */ + pthread_mutex_lock(&msg->wait); /* we will sleep here */ +#if 0 while(pthread_mutex_trylock(&msg->wait)) { //printf("here opcode = %d\n", head->opcode); } +#endif if(head->payload_length) { *omsg = msg;