From eba6c0b9fbd7d8f1ce36515510148917aa7995db Mon Sep 17 00:00:00 2001 From: Alexander Vdolainen Date: Fri, 24 Jul 2015 18:34:24 +0300 Subject: [PATCH] fixed bugs with message handling on heavyloaded connection; --- lib/chansx.c | 18 +++++++++--------- lib/messagesx.c | 18 ++++++++++-------- lib/sntllv2.c | 34 ++++++++++++++++++++++------------ tests/lv2sc.c | 2 +- 4 files changed, 42 insertions(+), 30 deletions(-) diff --git a/lib/chansx.c b/lib/chansx.c index f0e9fdb..7ffac6f 100644 --- a/lib/chansx.c +++ b/lib/chansx.c @@ -90,7 +90,7 @@ uint8_t _channel_close(conn_t *co, uint16_t chid) ulong_t chidx = chid; if(chid > 512) return SNE_INVALINDEX; - else chan = co->channels[chid]; + else chan = co->channels[chidx]; if(!chan) return SNE_NOSUCHCHAN; @@ -109,7 +109,7 @@ chnl_t *sxchannel_open(conn_t *co, int type) chnl_t *chan = NULL; sxmsg_t *msg = NULL; sntllv2_head_t *head; - int msgidx, r; + int msgidx, r, ccid; if(!co) { r = SNE_FAILED; @@ -159,15 +159,14 @@ chnl_t *sxchannel_open(conn_t *co, int type) if(msg->mhead.opcode != SNE_SUCCESS) { r = msg->mhead.opcode; goto __reterr3; } /* ok all is fine */ - msgidx = msg->mhead.reserve; chan->cid = msg->mhead.reserve; + ccid = chan->cid; pthread_mutex_lock(&co->idx_ch_lock); - idx_reserve(&co->idx_ch, msgidx); - co->channels[msgidx] = chan; + idx_reserve(&co->idx_ch, ccid); + co->channels[ccid] = chan; pthread_mutex_unlock(&co->idx_ch_lock); /* destroy a message */ - msgidx = head->msgid; pthread_mutex_lock(&co->idx_msg_lock); idx_free(&co->idx_msg, msgidx); co->messages[msgidx] = NULL; @@ -201,13 +200,14 @@ int sxchannel_close(chnl_t *channel) sxmsg_t *msg; sntllv2_head_t *head; conn_t *co; - int msgidx, chidx; + int msgidx = 0, chidx = 0; /* check channel validity */ if(!channel) return SNE_FAILED; else if(!(co = channel->connection)) return SNE_FAILED; if(channel->cid > 512) return SNE_IGNORED; - if(channel != co->channels[channel->cid]) return SNE_IGNORED; + else chidx = channel->cid; + if(channel != co->channels[chidx]) return SNE_IGNORED; if(!(msg = malloc(sizeof(sxmsg_t)))) return SNE_ENOMEM; head = &msg->mhead; @@ -229,6 +229,7 @@ int sxchannel_close(chnl_t *channel) pthread_mutex_unlock(&co->idx_msg_lock); if(msgidx == IDX_INVAL) { r = SNE_MMESSAGES; goto __reterr2; } + else head->msgid = msgidx; r = _sntll_writemsg(co, msg); if(r == SNE_SUCCESS) { @@ -236,7 +237,6 @@ int sxchannel_close(chnl_t *channel) r = head->opcode; /* we will free this anyway */ - chidx = channel->cid; pthread_mutex_lock(&co->idx_ch_lock); idx_free(&co->idx_ch, chidx); co->channels[chidx] = NULL; diff --git a/lib/messagesx.c b/lib/messagesx.c index c49155e..c63d83d 100644 --- a/lib/messagesx.c +++ b/lib/messagesx.c @@ -239,7 +239,7 @@ int sxmsg_rreply(sxmsg_t *msg, size_t datalen) chnl_t *ch; conn_t *co; sntllv2_head_t *head; - int r; + int r, mid; /* a little bit of paranoid tests */ if(!msg) return SNE_FAILED; @@ -253,9 +253,10 @@ int sxmsg_rreply(sxmsg_t *msg, size_t datalen) head->opcode = SNE_RAPIDMSG; head->payload_length = datalen; + mid = head->msgid; pthread_mutex_lock(&co->idx_msg_lock); - idx_free(&co->idx_msg, head->msgid); - co->messages[head->msgid] = NULL; + idx_free(&co->idx_msg, mid); + co->messages[mid] = NULL; pthread_mutex_unlock(&co->idx_msg_lock); r = _sntll_writemsg(co, msg); @@ -273,7 +274,7 @@ static inline int __sxmsg_return(sxmsg_t *msg, int opcode, int pp) conn_t *co; sntllv2_head_t *head; ppmsg_t *ppm; - int r; + int r, mid; /* a little bit of paranoid tests */ if(!msg) return SNE_FAILED; @@ -285,12 +286,13 @@ static inline int __sxmsg_return(sxmsg_t *msg, int opcode, int pp) head->attr |= SXMSG_CLOSED; head->opcode = opcode; head->payload_length = 0; + mid = head->msgid; if(!pp) { /* free index */ pthread_mutex_lock(&co->idx_msg_lock); - idx_free(&co->idx_msg, head->msgid); - co->messages[head->msgid] = NULL; + idx_free(&co->idx_msg, mid); + co->messages[mid] = NULL; pthread_mutex_unlock(&co->idx_msg_lock); r = _sntll_writemsg(co, msg); @@ -298,8 +300,8 @@ static inline int __sxmsg_return(sxmsg_t *msg, int opcode, int pp) if(!(ppm = malloc(sizeof(ppmsg_t)))) return SNE_ENOMEM; else { /* remove it */ pthread_mutex_lock(&co->idx_msg_lock); - idx_free(&co->idx_msg, head->msgid); - co->messages[head->msgid] = NULL; + idx_free(&co->idx_msg, mid); + co->messages[mid] = NULL; pthread_mutex_unlock(&co->idx_msg_lock); } diff --git a/lib/sntllv2.c b/lib/sntllv2.c index 68113df..c033ea3 100644 --- a/lib/sntllv2.c +++ b/lib/sntllv2.c @@ -495,11 +495,16 @@ static void *__sntll_thread(void *b) if(rd < 0) goto __sslproto_error; else pthread_mutex_unlock(&(co->sslinout[0])); if(rd != mhead->payload_length) { + mid = mhead->msgid; /* if we're need to do something */ if(mhead->msgid >= 1024) { mhead->opcode = SNE_INVALINDEX; goto __return_error; - } else msg = co->messages[mhead->msgid]; + } else { + pthread_mutex_lock(&co->idx_msg_lock); + msg = co->messages[mid]; + pthread_mutex_unlock(&co->idx_msg_lock); + } if(!msg) { if(mhead->attr & SXMSG_OPEN) mhead->opcode = SNE_BADPROTO; else { @@ -541,7 +546,9 @@ static void *__sntll_thread(void *b) goto __again; } mid = mhead->msgid; + pthread_mutex_lock(&co->idx_msg_lock); msg = co->messages[mid]; + pthread_mutex_unlock(&co->idx_msg_lock); if(!msg) goto __inval_idx_nor; /* ok now we'are copy data and unlock wait mutex */ @@ -575,7 +582,8 @@ static void *__sntll_thread(void *b) else goto __again; } /* if message is busy - fails */ - msg = co->messages[mhead->msgid]; + mid = mhead->msgid; + msg = co->messages[mid]; if(msg) { mhead->opcode = SNE_EBUSY; goto __ret_regerr; } /* now we will take a deal */ @@ -591,10 +599,9 @@ static void *__sntll_thread(void *b) if(mhead->payload_length) msg->payload = bbuf; } - mid = mhead->msgid; pthread_mutex_lock(&co->idx_msg_lock); idx_reserve(&co->idx_msg, mid); - co->messages[mhead->msgid] = msg; + co->messages[mid] = msg; pthread_mutex_unlock(&co->idx_msg_lock); /* now we are able to process the message */ @@ -602,13 +609,15 @@ static void *__sntll_thread(void *b) } else if(mhead->attr & SXMSG_CLOSED) { /* check for the message */ if(mhead->msgid >= 1024) goto __inval_idx_nor; - msg = co->messages[mhead->msgid]; - if(!msg) goto __inval_idx_nor; + mid = mhead->msgid; + pthread_mutex_lock(&co->idx_msg_lock); + msg = co->messages[mid]; + if(!msg) { + pthread_mutex_unlock(&co->idx_msg_lock); goto __inval_idx_nor; } /* message dialog is closed - remove this right now */ - pthread_mutex_lock(&co->idx_msg_lock); - idx_free(&co->idx_msg, mhead->msgid); - co->messages[mhead->msgid] = NULL; + idx_free(&co->idx_msg, mid); + co->messages[mid] = NULL; pthread_mutex_unlock(&co->idx_msg_lock); if(msg->mhead.attr & SXMSG_TIMEDOUT) { /* nobody wait for it */ @@ -630,13 +639,14 @@ static void *__sntll_thread(void *b) (mhead->attr & SXMSG_REPLYREQ)) { /* ongoing dialog */ /* check for the message */ if(mhead->msgid >= 1024) goto __inval_idx_nor; - msg = co->messages[mhead->msgid]; + mid = mhead->msgid; + msg = co->messages[mid]; if(!msg) goto __inval_idx_nor; if(msg->mhead.attr & SXMSG_TIMEDOUT) { /* nobody wait for it */ pthread_mutex_lock(&co->idx_msg_lock); - idx_free(&co->idx_msg, mhead->msgid); - co->messages[mhead->msgid] = NULL; + idx_free(&co->idx_msg, mid); + co->messages[mid] = NULL; pthread_mutex_unlock(&co->idx_msg_lock); /* now just free it */ diff --git a/tests/lv2sc.c b/tests/lv2sc.c index 81565a9..b2e11b9 100644 --- a/tests/lv2sc.c +++ b/tests/lv2sc.c @@ -96,7 +96,7 @@ static void *__addsthrd(void *a) mr = sxmsg_send(mch, mmbuf, ln, &msg); switch(mr) { case SNE_RAPIDMSG: - fprintf(stdout, "Rapidly replied: %s\n", (char *)sxmsg_payload(msg)); + //fprintf(stdout, "Rapidly replied: %s\n", (char *)sxmsg_payload(msg)); sxmsg_clean(msg); break; case SNE_REPLYREQ: