fixed bugs with message handling on heavyloaded connection;

v0.5.xx
Alexander Vdolainen 10 years ago
parent 06dec797d4
commit eba6c0b9fb

@ -90,7 +90,7 @@ uint8_t _channel_close(conn_t *co, uint16_t chid)
ulong_t chidx = chid;
if(chid > 512) return SNE_INVALINDEX;
else chan = co->channels[chid];
else chan = co->channels[chidx];
if(!chan) return SNE_NOSUCHCHAN;
@ -109,7 +109,7 @@ chnl_t *sxchannel_open(conn_t *co, int type)
chnl_t *chan = NULL;
sxmsg_t *msg = NULL;
sntllv2_head_t *head;
int msgidx, r;
int msgidx, r, ccid;
if(!co) {
r = SNE_FAILED;
@ -159,15 +159,14 @@ chnl_t *sxchannel_open(conn_t *co, int type)
if(msg->mhead.opcode != SNE_SUCCESS) { r = msg->mhead.opcode; goto __reterr3; }
/* ok all is fine */
msgidx = msg->mhead.reserve;
chan->cid = msg->mhead.reserve;
ccid = chan->cid;
pthread_mutex_lock(&co->idx_ch_lock);
idx_reserve(&co->idx_ch, msgidx);
co->channels[msgidx] = chan;
idx_reserve(&co->idx_ch, ccid);
co->channels[ccid] = chan;
pthread_mutex_unlock(&co->idx_ch_lock);
/* destroy a message */
msgidx = head->msgid;
pthread_mutex_lock(&co->idx_msg_lock);
idx_free(&co->idx_msg, msgidx);
co->messages[msgidx] = NULL;
@ -201,13 +200,14 @@ int sxchannel_close(chnl_t *channel)
sxmsg_t *msg;
sntllv2_head_t *head;
conn_t *co;
int msgidx, chidx;
int msgidx = 0, chidx = 0;
/* check channel validity */
if(!channel) return SNE_FAILED;
else if(!(co = channel->connection)) return SNE_FAILED;
if(channel->cid > 512) return SNE_IGNORED;
if(channel != co->channels[channel->cid]) return SNE_IGNORED;
else chidx = channel->cid;
if(channel != co->channels[chidx]) return SNE_IGNORED;
if(!(msg = malloc(sizeof(sxmsg_t)))) return SNE_ENOMEM;
head = &msg->mhead;
@ -229,6 +229,7 @@ int sxchannel_close(chnl_t *channel)
pthread_mutex_unlock(&co->idx_msg_lock);
if(msgidx == IDX_INVAL) { r = SNE_MMESSAGES; goto __reterr2; }
else head->msgid = msgidx;
r = _sntll_writemsg(co, msg);
if(r == SNE_SUCCESS) {
@ -236,7 +237,6 @@ int sxchannel_close(chnl_t *channel)
r = head->opcode;
/* we will free this anyway */
chidx = channel->cid;
pthread_mutex_lock(&co->idx_ch_lock);
idx_free(&co->idx_ch, chidx);
co->channels[chidx] = NULL;

@ -239,7 +239,7 @@ int sxmsg_rreply(sxmsg_t *msg, size_t datalen)
chnl_t *ch;
conn_t *co;
sntllv2_head_t *head;
int r;
int r, mid;
/* a little bit of paranoid tests */
if(!msg) return SNE_FAILED;
@ -253,9 +253,10 @@ int sxmsg_rreply(sxmsg_t *msg, size_t datalen)
head->opcode = SNE_RAPIDMSG;
head->payload_length = datalen;
mid = head->msgid;
pthread_mutex_lock(&co->idx_msg_lock);
idx_free(&co->idx_msg, head->msgid);
co->messages[head->msgid] = NULL;
idx_free(&co->idx_msg, mid);
co->messages[mid] = NULL;
pthread_mutex_unlock(&co->idx_msg_lock);
r = _sntll_writemsg(co, msg);
@ -273,7 +274,7 @@ static inline int __sxmsg_return(sxmsg_t *msg, int opcode, int pp)
conn_t *co;
sntllv2_head_t *head;
ppmsg_t *ppm;
int r;
int r, mid;
/* a little bit of paranoid tests */
if(!msg) return SNE_FAILED;
@ -285,12 +286,13 @@ static inline int __sxmsg_return(sxmsg_t *msg, int opcode, int pp)
head->attr |= SXMSG_CLOSED;
head->opcode = opcode;
head->payload_length = 0;
mid = head->msgid;
if(!pp) {
/* free index */
pthread_mutex_lock(&co->idx_msg_lock);
idx_free(&co->idx_msg, head->msgid);
co->messages[head->msgid] = NULL;
idx_free(&co->idx_msg, mid);
co->messages[mid] = NULL;
pthread_mutex_unlock(&co->idx_msg_lock);
r = _sntll_writemsg(co, msg);
@ -298,8 +300,8 @@ static inline int __sxmsg_return(sxmsg_t *msg, int opcode, int pp)
if(!(ppm = malloc(sizeof(ppmsg_t)))) return SNE_ENOMEM;
else { /* remove it */
pthread_mutex_lock(&co->idx_msg_lock);
idx_free(&co->idx_msg, head->msgid);
co->messages[head->msgid] = NULL;
idx_free(&co->idx_msg, mid);
co->messages[mid] = NULL;
pthread_mutex_unlock(&co->idx_msg_lock);
}

@ -495,11 +495,16 @@ static void *__sntll_thread(void *b)
if(rd < 0) goto __sslproto_error;
else pthread_mutex_unlock(&(co->sslinout[0]));
if(rd != mhead->payload_length) {
mid = mhead->msgid;
/* if we're need to do something */
if(mhead->msgid >= 1024) {
mhead->opcode = SNE_INVALINDEX;
goto __return_error;
} else msg = co->messages[mhead->msgid];
} else {
pthread_mutex_lock(&co->idx_msg_lock);
msg = co->messages[mid];
pthread_mutex_unlock(&co->idx_msg_lock);
}
if(!msg) {
if(mhead->attr & SXMSG_OPEN) mhead->opcode = SNE_BADPROTO;
else {
@ -541,7 +546,9 @@ static void *__sntll_thread(void *b)
goto __again;
}
mid = mhead->msgid;
pthread_mutex_lock(&co->idx_msg_lock);
msg = co->messages[mid];
pthread_mutex_unlock(&co->idx_msg_lock);
if(!msg) goto __inval_idx_nor;
/* ok now we'are copy data and unlock wait mutex */
@ -575,7 +582,8 @@ static void *__sntll_thread(void *b)
else goto __again;
}
/* if message is busy - fails */
msg = co->messages[mhead->msgid];
mid = mhead->msgid;
msg = co->messages[mid];
if(msg) { mhead->opcode = SNE_EBUSY; goto __ret_regerr; }
/* now we will take a deal */
@ -591,10 +599,9 @@ static void *__sntll_thread(void *b)
if(mhead->payload_length) msg->payload = bbuf;
}
mid = mhead->msgid;
pthread_mutex_lock(&co->idx_msg_lock);
idx_reserve(&co->idx_msg, mid);
co->messages[mhead->msgid] = msg;
co->messages[mid] = msg;
pthread_mutex_unlock(&co->idx_msg_lock);
/* now we are able to process the message */
@ -602,13 +609,15 @@ static void *__sntll_thread(void *b)
} else if(mhead->attr & SXMSG_CLOSED) {
/* check for the message */
if(mhead->msgid >= 1024) goto __inval_idx_nor;
msg = co->messages[mhead->msgid];
if(!msg) goto __inval_idx_nor;
mid = mhead->msgid;
pthread_mutex_lock(&co->idx_msg_lock);
msg = co->messages[mid];
if(!msg) {
pthread_mutex_unlock(&co->idx_msg_lock); goto __inval_idx_nor; }
/* message dialog is closed - remove this right now */
pthread_mutex_lock(&co->idx_msg_lock);
idx_free(&co->idx_msg, mhead->msgid);
co->messages[mhead->msgid] = NULL;
idx_free(&co->idx_msg, mid);
co->messages[mid] = NULL;
pthread_mutex_unlock(&co->idx_msg_lock);
if(msg->mhead.attr & SXMSG_TIMEDOUT) { /* nobody wait for it */
@ -630,13 +639,14 @@ static void *__sntll_thread(void *b)
(mhead->attr & SXMSG_REPLYREQ)) { /* ongoing dialog */
/* check for the message */
if(mhead->msgid >= 1024) goto __inval_idx_nor;
msg = co->messages[mhead->msgid];
mid = mhead->msgid;
msg = co->messages[mid];
if(!msg) goto __inval_idx_nor;
if(msg->mhead.attr & SXMSG_TIMEDOUT) { /* nobody wait for it */
pthread_mutex_lock(&co->idx_msg_lock);
idx_free(&co->idx_msg, mhead->msgid);
co->messages[mhead->msgid] = NULL;
idx_free(&co->idx_msg, mid);
co->messages[mid] = NULL;
pthread_mutex_unlock(&co->idx_msg_lock);
/* now just free it */

@ -96,7 +96,7 @@ static void *__addsthrd(void *a)
mr = sxmsg_send(mch, mmbuf, ln, &msg);
switch(mr) {
case SNE_RAPIDMSG:
fprintf(stdout, "Rapidly replied: %s\n", (char *)sxmsg_payload(msg));
//fprintf(stdout, "Rapidly replied: %s\n", (char *)sxmsg_payload(msg));
sxmsg_clean(msg);
break;
case SNE_REPLYREQ:

Loading…
Cancel
Save