diff --git a/include/sntl/sntllv2.h b/include/sntl/sntllv2.h index 508bd05..4cfa653 100644 --- a/include/sntl/sntllv2.h +++ b/include/sntl/sntllv2.h @@ -231,6 +231,7 @@ int sxmsg_reply_pp(sxmsg_t *msg, const char *data, size_t datalen); int sxmsg_rreply(sxmsg_t *msg, size_t datalen); int sxmsg_return(sxmsg_t *msg, int opcode); int sxmsg_return_pp(sxmsg_t *msg, int opcode); +void sxmsg_clean(sxmsg_t *msg); #ifdef __cplusplus } @@ -261,6 +262,8 @@ int sntl_rpclist_filter(usrtc_t *source, usrtc_t **dest, int flag, int *filter); #define blub(txt) fprintf(stderr, "%s:%d in %s > %s\n", __FILE__, __LINE__, __FUNCTION__, txt) +#define dumphead(head) fprintf(stderr, "id: %d, opcode: %d, attr: %d, len = %d\n", head->msgid, head->opcode, head->attr, head->payload_length) + #endif /* __SNTL_SNTLLV2_H__ */ diff --git a/lib/chansx.c b/lib/chansx.c index cc2df7e..f0e9fdb 100644 --- a/lib/chansx.c +++ b/lib/chansx.c @@ -46,14 +46,14 @@ uint8_t _channel_open(conn_t *co, uint16_t *chid) { chnl_t *chan; - uint16_t typeid = *chid; /* our type */ + int typeid = *chid; /* our type */ uint16_t chidx; usrtc_t *rpc_list = co->rpc_list; usrtc_node_t *node; rpc_typed_list_t *rlist; cx_rpc_list_t *rpclist; - node = usrtc_lookup(rpc_list, (void *)&typeid); + node = usrtc_lookup(rpc_list, &typeid); if(!node) return SNE_EPERM; else rlist = (rpc_typed_list_t *)usrtc_node_getdata(node); @@ -87,6 +87,7 @@ uint8_t _channel_open(conn_t *co, uint16_t *chid) uint8_t _channel_close(conn_t *co, uint16_t chid) { chnl_t *chan; + ulong_t chidx = chid; if(chid > 512) return SNE_INVALINDEX; else chan = co->channels[chid]; @@ -94,8 +95,8 @@ uint8_t _channel_close(conn_t *co, uint16_t chid) if(!chan) return SNE_NOSUCHCHAN; pthread_mutex_lock(&co->idx_ch_lock); - idx_free(&co->idx_ch, chid); - co->channels[chid] = NULL; + idx_free(&co->idx_ch, chidx); + co->channels[chidx] = NULL; pthread_mutex_unlock(&co->idx_ch_lock); free(chan); @@ -117,6 +118,7 @@ chnl_t *sxchannel_open(conn_t *co, int type) if(!(chan = malloc(sizeof(chnl_t)))) { __enomem: + if(chan) free(chan); r = SNE_ENOMEM; goto __reterr; } @@ -146,6 +148,7 @@ chnl_t *sxchannel_open(conn_t *co, int type) pthread_mutex_unlock(&co->idx_msg_lock); if(msgidx == IDX_INVAL) { r = SNE_MMESSAGES; goto __reterr2; } + else head->msgid = msgidx; /* now we're ready to write it */ r = _sntll_writemsg(co, msg); @@ -156,13 +159,15 @@ chnl_t *sxchannel_open(conn_t *co, int type) if(msg->mhead.opcode != SNE_SUCCESS) { r = msg->mhead.opcode; goto __reterr3; } /* ok all is fine */ + msgidx = msg->mhead.reserve; chan->cid = msg->mhead.reserve; pthread_mutex_lock(&co->idx_ch_lock); - idx_reserve(&co->idx_ch, msg->mhead.reserve); - co->channels[msg->mhead.reserve] = chan; + idx_reserve(&co->idx_ch, msgidx); + co->channels[msgidx] = chan; pthread_mutex_unlock(&co->idx_ch_lock); /* destroy a message */ + msgidx = head->msgid; pthread_mutex_lock(&co->idx_msg_lock); idx_free(&co->idx_msg, msgidx); co->messages[msgidx] = NULL; @@ -196,7 +201,7 @@ int sxchannel_close(chnl_t *channel) sxmsg_t *msg; sntllv2_head_t *head; conn_t *co; - int msgidx; + int msgidx, chidx; /* check channel validity */ if(!channel) return SNE_FAILED; @@ -231,9 +236,10 @@ int sxchannel_close(chnl_t *channel) r = head->opcode; /* we will free this anyway */ + chidx = channel->cid; pthread_mutex_lock(&co->idx_ch_lock); - idx_free(&co->idx_ch, channel->cid); - co->channels[channel->cid] = NULL; + idx_free(&co->idx_ch, chidx); + co->channels[chidx] = NULL; pthread_mutex_unlock(&co->idx_ch_lock); free(channel); } diff --git a/lib/messagesx.c b/lib/messagesx.c index 213af6b..03ba12c 100644 --- a/lib/messagesx.c +++ b/lib/messagesx.c @@ -325,3 +325,9 @@ int sxmsg_return_pp(sxmsg_t *msg, int opcode) return __sxmsg_return(msg, opcode, 1); } +void sxmsg_clean(sxmsg_t *msg) +{ + free(msg->payload); + free(msg); + return; +} diff --git a/lib/sntllv2.c b/lib/sntllv2.c index 42fa773..68113df 100644 --- a/lib/sntllv2.c +++ b/lib/sntllv2.c @@ -203,7 +203,7 @@ int _sntll_writemsg(conn_t *co, sxmsg_t *msg) if(head->payload_length && !msg->payload) return SNE_FAILED; /* write the head and payload if applicable */ - pthread_mutex_lock(&co->sslinout[2]); + pthread_mutex_lock(&co->sslinout[1]); rd = __conn_write(co, head, sizeof(sntllv2_head_t)); if(rd < 0) { co->flags |= SNSX_CLOSED; @@ -213,7 +213,7 @@ int _sntll_writemsg(conn_t *co, sxmsg_t *msg) /* check up again */ if(rd < 0) { co->flags |= SNSX_CLOSED; r = SNE_ESSL; } } - pthread_mutex_unlock(&co->sslinout[2]); + pthread_mutex_unlock(&co->sslinout[1]); if(!(co->flags & SNSX_CLOSED)) r = SNE_SUCCESS; @@ -359,6 +359,8 @@ static int __connection_second_alloc(conn_t *co) { usrtc_node_init(&co->csnode, co); + memset(&co->idx_ch, 0, sizeof(idx_allocator_t)); + memset(&co->idx_msg, 0, sizeof(idx_allocator_t)); if((idx_allocator_init(&co->idx_ch, 512, 0))) goto __fail; if((idx_allocator_init(&co->idx_msg, 1024, 0))) goto __fail; @@ -447,6 +449,7 @@ static void *__sntll_thread(void *b) pthread_t self = pthread_self(); int dispatch = 0; size_t rd, wr; + ulong_t mid; /* byte buffer is following head */ bbuf += sizeof(sntllv2_head_t); @@ -472,8 +475,15 @@ static void *__sntll_thread(void *b) while(1) { __again: pthread_mutex_lock(&(co->sslinout[0])); + if(co->flags & SNSX_CLOSED) { + pthread_mutex_unlock(&(co->sslinout[0])); + goto __finish; + } rd = __conn_read(co, mhead, sizeof(sntllv2_head_t)); - if(rd != sizeof(sntllv2_bundle_t)) { +#ifdef _VERBOSE_DEBUG + dumphead(mhead); +#endif + if(rd != sizeof(sntllv2_head_t)) { __sslproto_error: co->flags |= SNSX_CLOSED; pthread_mutex_unlock(&(co->sslinout[0])); @@ -530,7 +540,8 @@ static void *__sntll_thread(void *b) fprintf(stderr, "[sntllv2] Invalid index of the message.\n"); goto __again; } - msg = co->messages[mhead->msgid]; + mid = mhead->msgid; + msg = co->messages[mid]; if(!msg) goto __inval_idx_nor; /* ok now we'are copy data and unlock wait mutex */ @@ -580,10 +591,11 @@ static void *__sntll_thread(void *b) if(mhead->payload_length) msg->payload = bbuf; } - pthread_mutex_lock(&co->idx_ch_lock); - idx_reserve(&co->idx_ch, mhead->msgid); + mid = mhead->msgid; + pthread_mutex_lock(&co->idx_msg_lock); + idx_reserve(&co->idx_msg, mid); co->messages[mhead->msgid] = msg; - pthread_mutex_unlock(&co->idx_ch_lock); + pthread_mutex_unlock(&co->idx_msg_lock); /* now we are able to process the message */ _message_process(msg); @@ -594,10 +606,10 @@ static void *__sntll_thread(void *b) if(!msg) goto __inval_idx_nor; /* message dialog is closed - remove this right now */ - pthread_mutex_lock(&co->idx_ch_lock); - idx_free(&co->idx_ch, mhead->msgid); + pthread_mutex_lock(&co->idx_msg_lock); + idx_free(&co->idx_msg, mhead->msgid); co->messages[mhead->msgid] = NULL; - pthread_mutex_unlock(&co->idx_ch_lock); + pthread_mutex_unlock(&co->idx_msg_lock); if(msg->mhead.attr & SXMSG_TIMEDOUT) { /* nobody wait for it */ /* now just free it */ @@ -622,10 +634,10 @@ static void *__sntll_thread(void *b) if(!msg) goto __inval_idx_nor; if(msg->mhead.attr & SXMSG_TIMEDOUT) { /* nobody wait for it */ - pthread_mutex_lock(&co->idx_ch_lock); - idx_free(&co->idx_ch, mhead->msgid); + pthread_mutex_lock(&co->idx_msg_lock); + idx_free(&co->idx_msg, mhead->msgid); co->messages[mhead->msgid] = NULL; - pthread_mutex_unlock(&co->idx_ch_lock); + pthread_mutex_unlock(&co->idx_msg_lock); /* now just free it */ pthread_mutex_destroy(&msg->wait); @@ -775,7 +787,7 @@ conn_t *connection_master_link(conn_sys_t *ssys, int sck, struct in_addr *addr) head->opcode = r; if(r != SNE_SUCCESS) { /* we finish */ head->payload_length = 0; - __conn_write(co, buf, sizeof(sntllv2_head_t)); + __conn_write(co, head, sizeof(sntllv2_head_t)); destroy_sexp(sx); goto __fail3; } @@ -793,6 +805,10 @@ conn_t *connection_master_link(conn_sys_t *ssys, int sck, struct in_addr *addr) r = __connection_second_alloc(co); if(r != SNE_SUCCESS) goto __fail3; + /* free message */ + co->messages[0] = NULL; + free(msg); + /* and now we're need to create a thread poll */ if(!(bundle = malloc(sizeof(sntllv2_bundle_t)))) { r = SNE_ENOMEM; goto __fail4; } else { @@ -992,18 +1008,27 @@ conn_t *connection_link(conn_sys_t *ssys, const char *host, sx = parse_sexp(bbuf, rd); if(!sx) { r = SNE_BADPROTO; goto __fail2; } r = __eval_syssexp(co, sx); + if(!r) r = SNE_SUCCESS; destroy_sexp(sx); + + /* write back */ head->opcode = r; head->payload_length = 0; wr = __conn_write(co, head, sizeof(sntllv2_head_t)); - if(wr < 0) goto __fail2; - if(r != SNE_SUCCESS) goto __fail2; + if(wr < 0) { + blub("fuck"); + r = SNE_LINKERROR; goto __fail2;} + if(r != SNE_SUCCESS) { r = SNE_LINKERROR; goto __fail2;} } /* if we're there - negotiation is done, going to init messaging mode */ r = __connection_second_alloc(co); if(r != SNE_SUCCESS) goto __fail3; + /* free message */ + co->messages[0] = NULL; + free(msg); + /* and now we're need to create a thread poll */ if(!(bundle = malloc(sizeof(sntllv2_bundle_t)))) { r = SNE_ENOMEM; goto __fail4; } else { diff --git a/tests/lv2sc.c b/tests/lv2sc.c index 4e81fe1..81565a9 100644 --- a/tests/lv2sc.c +++ b/tests/lv2sc.c @@ -34,12 +34,95 @@ /* define a little bit */ #define DEFAULT_PORT 13133 #define CHANNEL_COUNT 200 -#define CLIENT_COUNT 100 +#define CLIENT_COUNT 256 #define MESSAGES_PER_SESSION 10000 #define ITERATION_COUNT 1000 #define FAILS_ONLY +struct testdata { + int uc; + pthread_mutex_t ulock; + conn_t *co; +}; + +static int __init_testdata(struct testdata *t, conn_t *co) +{ + t->uc = 0; + pthread_mutex_init(&t->ulock, NULL); + t->co = co; + return 0; +} + +static void __wait_completion(struct testdata *t) +{ + pthread_mutex_lock(&t->ulock); + if(t->uc) { + pthread_mutex_lock(&t->ulock); + } + return; +} + +static int __set_typed_list_callback(conn_t *co, int ch, char *desc) +{ + printf("allowed channel %d (%s)\n", ch, desc); + return SNE_SUCCESS; +} + +static void *__addsthrd(void *a) +{ + struct testdata *t = a; + conn_t *co = t->co; + chnl_t *mch; + sxmsg_t *msg; + char mmbuf[1024]; + size_t ln; + int mr, i; + + pthread_mutex_lock(&t->ulock); + t->uc++; + pthread_mutex_unlock(&t->ulock); + + /* here we go */ + mch = sxchannel_open(co, 12); + + if(!mch) { + fprintf(stderr, "Failed to openchannel with %d\n", errno); + goto __fini; + } + + for(i = 0; i < MESSAGES_PER_SESSION; i++) { + ln = snprintf(mmbuf, 1024, "(ar-add (10 10))"); + mr = sxmsg_send(mch, mmbuf, ln, &msg); + switch(mr) { + case SNE_RAPIDMSG: + fprintf(stdout, "Rapidly replied: %s\n", (char *)sxmsg_payload(msg)); + sxmsg_clean(msg); + break; + case SNE_REPLYREQ: + if(sxmsg_datalen(msg)) fprintf(stdout, "Replied (confirmation required): %s\n", + sxmsg_payload(msg)); + mr = sxmsg_return(msg, SNE_SUCCESS); + fprintf(stderr, "mr = %d\n", mr); + break; + case SNE_SUCCESS: + fprintf(stdout, "Success.\n"); + break; + default: + fprintf(stderr, "ERROR: %d\n", mr); + break; + } + } + + sxchannel_close(mch); + + __fini: + t->uc--; + if(t->uc <= 1) pthread_mutex_unlock(&t->ulock); + + return NULL; +} + int main(int argc, char **argv) { char *rootca = NULL, *cert = NULL; @@ -116,15 +199,82 @@ int main(int argc, char **argv) } /* Tests */ + struct timeval beg, end; /* try to open connection */ - connections_set_channelcall(ssys, NULL); + connections_set_channelcall(ssys, __set_typed_list_callback); + gettimeofday(&beg, NULL); co = connection_link(ssys, addr, port, cert, login, password); if(!co) { fprintf(stderr, "Failed to connection with %d\n", errno); return errno; } + gettimeofday(&end, NULL); + + if((end.tv_sec - beg.tv_sec) > 0) { + printf("Seconds: %ld ", end.tv_sec - beg.tv_sec); + printf("µS: %ld\n", end.tv_usec + (1000000 - beg.tv_usec)); + } else printf("µS: %ld\n", end.tv_usec - beg.tv_usec); + + /* ok now we should open a channel */ + chnl_t *testchannel = sxchannel_open(co, 12); + + if(!testchannel) { + fprintf(stderr, "Failed to openchannel with %d\n", errno); + return errno; + } + gettimeofday(&end, NULL); + + if((end.tv_sec - beg.tv_sec) > 0) { + printf("Seconds: %ld ", end.tv_sec - beg.tv_sec); + printf("µS: %ld\n", end.tv_usec + (1000000 - beg.tv_usec)); + } else printf("µS: %ld\n", end.tv_usec - beg.tv_usec); + + /* ok, send a message */ + char mmbuf[1024]; + sxmsg_t *msg; + size_t ln; + ln = snprintf(mmbuf, 1024, "(ar-add (10 10))"); + int mr = sxmsg_send(testchannel, mmbuf, ln, &msg); + switch(mr) { + case SNE_RAPIDMSG: + fprintf(stdout, "Rapidly replied: %s\n", (char *)sxmsg_payload(msg)); + sxmsg_clean(msg); + break; + case SNE_REPLYREQ: + if(sxmsg_datalen(msg)) fprintf(stdout, "Replied (confirmation required): %s\n", + sxmsg_payload(msg)); + mr = sxmsg_return(msg, SNE_SUCCESS); + fprintf(stderr, "mr = %d\n", mr); + break; + case SNE_SUCCESS: + fprintf(stdout, "Success.\n"); + break; + default: + fprintf(stderr, "ERROR: %d\n", mr); + break; + } + + int ee = sxchannel_close(testchannel); + printf("ee = %d\n", ee); + gettimeofday(&end, NULL); + + if((end.tv_sec - beg.tv_sec) > 0) { + printf("Seconds: %ld ", end.tv_sec - beg.tv_sec); + printf("µS: %ld\n", end.tv_usec + (1000000 - beg.tv_usec)); + } else printf("µS: %ld\n", end.tv_usec - beg.tv_usec); + sleep(10); + /* ok, now we need to create many threads */ + struct testdata trd; + pthread_t thrd; + int i; + __init_testdata(&trd, co); + + for(i = 0; i < 256; i++) pthread_create(&thrd, NULL, __addsthrd, &trd); + + __wait_completion(&trd); return 0; } + diff --git a/tests/lv2sd.c b/tests/lv2sd.c index e20c845..591e2ec 100644 --- a/tests/lv2sd.c +++ b/tests/lv2sd.c @@ -90,7 +90,7 @@ static int __secure_check(conn_t *co) static int __set_typed_list_callback(conn_t *co, int ch, char *desc) { printf("allowed channel %d (%s)\n", ch, desc); - return 0; + return SNE_SUCCESS; } /* list of rpc calls functions */