From bd2f05e2810378a957b0ae216fc89cca0ca71860 Mon Sep 17 00:00:00 2001 From: Alexander Vdolainen Date: Fri, 31 Jul 2015 20:02:25 +0300 Subject: [PATCH] bugs fixes; --- include/sntl/limits.h | 5 ++- lib/messagesx.c | 26 ++++++++---- lib/sntllv2.c | 99 +++++++++++++++++++++++++++++-------------- 3 files changed, 88 insertions(+), 42 deletions(-) diff --git a/include/sntl/limits.h b/include/sntl/limits.h index f69a45b..3d9405a 100644 --- a/include/sntl/limits.h +++ b/include/sntl/limits.h @@ -16,6 +16,9 @@ #define MAX_RPC_LIST 512 -#define MAX_RBBUF_LEN (65536 - sizeof(sntllv2_head_t)) +#define MAX_SNTLLBUFSIZE 65536 +#define MAX_RBBUF_LEN (MAX_SNTLLBUFSIZE - sizeof(sntllv2_head_t)) + +#define MAX_SNTLLTHREADS 8 #endif /* __SNTL_LIMITS_H__ */ diff --git a/lib/messagesx.c b/lib/messagesx.c index 6e33d74..90bcc91 100644 --- a/lib/messagesx.c +++ b/lib/messagesx.c @@ -20,11 +20,13 @@ #include #include #include +#include #include #include #include +#include #include #include "internal.h" @@ -100,9 +102,16 @@ static inline int __sxmsg_send(chnl_t *channel, const char *data, size_t datalen /* ready to send it */ if(!pp) { r = _sntll_writemsg(co, msg); - if(r != SNE_SUCCESS) goto __closemsg; + if(r != SNE_SUCCESS) { + __unpinmsg: + pthread_mutex_lock(&co->idx_msg_lock); + idx_free(&co->idx_msg, msgidx); + co->messages[msgidx] = NULL; + pthread_mutex_unlock(&co->idx_msg_lock); + goto __freemsg; + } } else { /* postponed */ - if(!(ppm = malloc(sizeof(ppmsg_t)))) { r = SNE_ENOMEM; goto __closemsg; } + if(!(ppm = malloc(sizeof(ppmsg_t)))) { r = SNE_ENOMEM; goto __unpinmsg; } list_init_node(&ppm->node); ppm->msg = msg; @@ -113,18 +122,16 @@ static inline int __sxmsg_send(chnl_t *channel, const char *data, size_t datalen pthread_mutex_unlock(&co->write_pending_lock); } - pthread_mutex_lock(&msg->wait); /* we will sleep here */ + //pthread_mutex_lock(&msg->wait); /* we will sleep here */ + while(pthread_mutex_trylock(&msg->wait)) { + //printf("here opcode = %d\n", head->opcode); + } if(head->payload_length) { *omsg = msg; return head->opcode; } else r = head->opcode; - __closemsg: - pthread_mutex_lock(&co->idx_msg_lock); - idx_free(&co->idx_msg, msgidx); - co->messages[msgidx] = NULL; - pthread_mutex_unlock(&co->idx_msg_lock); __freemsg: /* free resources for message */ pthread_mutex_unlock(&msg->wait); @@ -187,7 +194,7 @@ static inline int __sxmsg_reply(sxmsg_t *msg, const char *data, if(!(co = ch->connection)) return SNE_FAILED; /* test for blocking */ - for(i = 0; i < 8; i++) + for(i = 0; i < MAX_SNTLLTHREADS; i++) if(pthread_equal(self, co->thrd_poll[i])) return SNE_WOULDBLOCK; /* prepare it */ @@ -298,6 +305,7 @@ static inline int __sxmsg_return(sxmsg_t *msg, int opcode, int pp) pthread_mutex_unlock(&co->idx_msg_lock); r = _sntll_writemsg(co, msg); + free(msg); } else { if(!(ppm = malloc(sizeof(ppmsg_t)))) return SNE_ENOMEM; else { /* remove it */ diff --git a/lib/sntllv2.c b/lib/sntllv2.c index 332b0fc..7cf4669 100644 --- a/lib/sntllv2.c +++ b/lib/sntllv2.c @@ -40,6 +40,7 @@ #include #include +#include #include #include "internal.h" @@ -195,6 +196,7 @@ int _sntll_writemsg(conn_t *co, sxmsg_t *msg) sntllv2_head_t *head; size_t rd; int r; + char *buf = NULL; if(!co || !msg) return SNE_FAILED; @@ -202,21 +204,25 @@ int _sntll_writemsg(conn_t *co, sxmsg_t *msg) head = &msg->mhead; if(head->payload_length && !msg->payload) return SNE_FAILED; + if(head->payload_length) { + buf = malloc(sizeof(sntllv2_head_t) + head->payload_length); + memcpy(buf, head, sizeof(sntllv2_head_t)); + memcpy(buf + sizeof(sntllv2_head_t), msg->payload, head->payload_length); + } + /* write the head and payload if applicable */ pthread_mutex_lock(&co->sslinout[1]); - rd = __conn_write(co, head, sizeof(sntllv2_head_t)); + if(!buf) + rd = __conn_write(co, head, sizeof(sntllv2_head_t)); + else rd = __conn_write(co, buf, sizeof(sntllv2_head_t) + head->payload_length); if(rd < 0) { co->flags |= SNSX_CLOSED; r = SNE_ESSL; - } else if(head->payload_length) { - rd = __conn_write(co, msg->payload, head->payload_length); - /* check up again */ - if(rd < 0) { co->flags |= SNSX_CLOSED; r = SNE_ESSL; } } pthread_mutex_unlock(&co->sslinout[1]); if(!(co->flags & SNSX_CLOSED)) r = SNE_SUCCESS; - + if(buf) free(buf); return r; } @@ -442,7 +448,7 @@ static int __eval_syssexp(conn_t *co, sexp_t *sx) static void __connection_destroy(conn_t *co) { - int i = 0; + int i = 0, fd; sxmsg_t *msg, *omsg; ppmsg_t *ppm; list_node_t *iter, *siter; @@ -473,24 +479,26 @@ static void __connection_destroy(conn_t *co) pthread_mutex_unlock(&co->write_pending_lock); } - /* go thru messages */ - pthread_mutex_lock(&co->idx_msg_lock); - for(i = 0; i < 1024; i++) { - msg = co->messages[i]; - if(!msg) continue; - else head = &msg->mhead; - head->opcode = SNE_LINKERROR; - pthread_mutex_unlock(&msg->wait); - co->messages[i] = NULL; - idx_free(&co->idx_msg, i); - } - pthread_mutex_unlock(&co->idx_msg_lock); - /* update use count */ _CONN_NOTINUSE(co); /* ok, let's free other if we can */ if(!_CONN_UCOUNT(co)) { + /* go thru messages */ + pthread_mutex_lock(&co->idx_msg_lock); + for(i = 0; i < 1024; i++) { + msg = co->messages[i]; + if(!msg) continue; + else head = &msg->mhead; + head->opcode = SNE_LINKERROR; + pthread_mutex_unlock(&msg->wait); + pthread_mutex_destroy(&msg->wait); + free(msg); + co->messages[i] = NULL; + idx_free(&co->idx_msg, i); + } + pthread_mutex_unlock(&co->idx_msg_lock); + /* ok now we will free the channels */ pthread_mutex_lock(&co->idx_ch_lock); for(i = 0; i < 512; i++) { @@ -506,9 +514,10 @@ static void __connection_destroy(conn_t *co) if(co->pctx->passwd) free(co->pctx->passwd); SSL_shutdown(co->ssl); - close(SSL_get_fd(co->ssl)); + fd = SSL_get_fd(co->ssl); SSL_free(co->ssl); SSL_CTX_free(co->ctx); + close(fd); __connection_second_free(co); __connection_minimal_free(co); } @@ -533,7 +542,9 @@ static void *__sntll_thread(void *b) int dispatch = 0, e; size_t rd, wr; ulong_t mid; - +#ifdef _PERFPROFILE + struct timeval beg, end; +#endif /* byte buffer is following head */ bbuf += sizeof(sntllv2_head_t); @@ -549,7 +560,7 @@ static void *__sntll_thread(void *b) /* check up a thread */ if(pthread_equal(self, co->thrd_poll[7])) /* dispatcher */ - dispatch = 1; + dispatch = 0; /* update use count */ _CONN_INUSE(co); @@ -595,11 +606,23 @@ static void *__sntll_thread(void *b) pthread_mutex_unlock(&(co->sslinout[0])); goto __finish; } +#ifdef _PERFPROFILE + gettimeofday(&beg, NULL); +#endif rd = __conn_read(co, mhead, sizeof(sntllv2_head_t)); +#ifdef _PERFPROFILE + gettimeofday(&end, NULL); + + if((end.tv_sec - beg.tv_sec) > 0) { + printf("connread(head) Seconds: %ld ", end.tv_sec - beg.tv_sec); + printf("µS: %ld\n", end.tv_usec + (1000000 - beg.tv_usec)); + } else printf("connread(head) µS: %ld\n", end.tv_usec - beg.tv_usec); +#endif + #ifdef _VERBOSE_DEBUG dumphead(mhead); #endif - if(rd != sizeof(sntllv2_head_t)) { + if(rd < 0) { __sslproto_error: co->flags |= SNSX_CLOSED; pthread_mutex_unlock(&(co->sslinout[0])); @@ -607,9 +630,21 @@ static void *__sntll_thread(void *b) } else { /* check up if we can read or not */ if(mhead->payload_length) { +#ifdef _PERFPROFILE + gettimeofday(&beg, NULL); +#endif rd = __conn_read(co, bbuf, mhead->payload_length); - if(rd < 0) goto __sslproto_error; +#ifdef _PERFPROFILE + gettimeofday(&end, NULL); + if((end.tv_sec - beg.tv_sec) > 0) { + printf("connread(payload) Seconds: %ld ", end.tv_sec - beg.tv_sec); + printf("µS: %ld\n", end.tv_usec + (1000000 - beg.tv_usec)); + } else printf("connread(payload) µS: %ld\n", end.tv_usec - beg.tv_usec); +#endif + + if(rd == -1) goto __sslproto_error; else pthread_mutex_unlock(&(co->sslinout[0])); + if(rd != mhead->payload_length) { mid = mhead->msgid; /* if we're need to do something */ @@ -617,9 +652,9 @@ static void *__sntll_thread(void *b) mhead->opcode = SNE_INVALINDEX; goto __return_error; } else { - pthread_mutex_lock(&co->idx_msg_lock); + // pthread_mutex_lock(&co->idx_msg_lock); msg = co->messages[mid]; - pthread_mutex_unlock(&co->idx_msg_lock); + //thread_mutex_unlock(&co->idx_msg_lock); } if(!msg) { if(mhead->attr & SXMSG_OPEN) mhead->opcode = SNE_BADPROTO; @@ -662,9 +697,9 @@ static void *__sntll_thread(void *b) goto __again; } mid = mhead->msgid; - pthread_mutex_lock(&co->idx_msg_lock); + //hread_mutex_lock(&co->idx_msg_lock); msg = co->messages[mid]; - pthread_mutex_unlock(&co->idx_msg_lock); + //hread_mutex_unlock(&co->idx_msg_lock); if(!msg) goto __inval_idx_nor; /* ok now we'are copy data and unlock wait mutex */ @@ -732,6 +767,7 @@ static void *__sntll_thread(void *b) pthread_mutex_unlock(&co->idx_msg_lock); goto __inval_idx_nor; } /* message dialog is closed - remove this right now */ + idx_free(&co->idx_msg, mid); co->messages[mid] = NULL; pthread_mutex_unlock(&co->idx_msg_lock); @@ -944,7 +980,7 @@ conn_t *connection_master_link(conn_sys_t *ssys, int sck, struct in_addr *addr) bundle->conn = co; } int i; - for(i = 0; i < 8; i++) { + for(i = 0; i < MAX_SNTLLTHREADS; i++) { if(bundle == (void *)0xdead) bundle = __sntll_bundle_create(co); if(!bundle) goto __fail5; r = pthread_create(&co->thrd_poll[i], NULL, __sntll_thread, bundle); @@ -1144,7 +1180,6 @@ conn_t *connection_link(conn_sys_t *ssys, const char *host, head->payload_length = 0; wr = __conn_write(co, head, sizeof(sntllv2_head_t)); if(wr < 0) { - blub("fuck"); r = SNE_LINKERROR; goto __fail2;} if(r != SNE_SUCCESS) { r = SNE_LINKERROR; goto __fail2;} } @@ -1163,7 +1198,7 @@ conn_t *connection_link(conn_sys_t *ssys, const char *host, bundle->buf = buf; bundle->conn = co; } - for(i = 0; i < 8; i++) { + for(i = 0; i < MAX_SNTLLTHREADS; i++) { if(bundle == (void *)0xdead) bundle = __sntll_bundle_create(co); if(!bundle) goto __fail5; r = pthread_create(&co->thrd_poll[i], NULL, __sntll_thread, bundle);