[core] bugfix: fixed stupid bug on big messages;

v0.5.xx
Alexander Vdolainen 9 years ago
parent b21dce9d51
commit c6b795039a

@ -38,6 +38,7 @@
/* link related */
int _sxmpl_writemsg(sxlink_t *co, sxmsg_t *msg);
int _sxmpl_rapidwrite(sxlink_t *link, sxmsg_t *msg);
/* channel operations */
uint8_t _channel_open(sxlink_t *co, uint16_t *chid);

@ -279,7 +279,7 @@ int sxmsg_rreply(sxmsg_t *msg, size_t datalen)
head->attr = 0;
head->attr |= SXMSG_CLOSED;
head->opcode = SXE_RAPIDMSG;
head->payload_length = datalen;
head->payload_length = (uint16_t)datalen;
mid = head->msgid;
pthread_mutex_lock(&co->idx_msg_lock);
@ -287,7 +287,7 @@ int sxmsg_rreply(sxmsg_t *msg, size_t datalen)
co->messages[mid] = NULL;
pthread_mutex_unlock(&co->idx_msg_lock);
r = _sxmpl_writemsg(co, msg);
r = _sxmpl_rapidwrite(co, msg);
pthread_mutex_unlock(&msg->wait); /* we able to invalidate it */
pthread_mutex_destroy(&msg->wait);

@ -253,6 +253,29 @@ int _sxmpl_writemsg(sxlink_t *co, sxmsg_t *msg)
return r;
}
int _sxmpl_rapidwrite(sxlink_t *link, sxmsg_t *msg)
{
char *buf = msg->payload - sizeof(sxmplv2_head_t);
size_t rd;
int r;
if(!link || !msg) return SXE_FAILED;
memcpy(buf, &msg->mhead, sizeof(sxmplv2_head_t));
pthread_mutex_lock(&link->sslinout[1]);
rd = __conn_write(link, buf, sizeof(sxmplv2_head_t) + msg->mhead.payload_length);
if(rd < 0) {
link->flags |= SXMP_CLOSED;
r = SXE_ESSL;
}
pthread_mutex_unlock(&link->sslinout[1]);
if(!(link->flags & SXMP_CLOSED)) r = SXE_SUCCESS;
return r;
}
static sxmplv2_bundle_t *__sxmpl_bundle_create(sxlink_t *co)
{
sxmplv2_bundle_t *n = malloc(sizeof(sxmplv2_bundle_t));
@ -689,7 +712,7 @@ static void *__sxmpl_thread(void *b)
pthread_t self = pthread_self();
struct timespec wtick;
int dispatch = 0, e;
size_t rd, wr;
size_t rd, wr, total_rd;
ulong_t mid = 0;
#ifdef _PERFPROFILE
struct timeval beg, end;
@ -788,7 +811,13 @@ static void *__sxmpl_thread(void *b)
#ifdef _PERFPROFILE
gettimeofday(&beg, NULL);
#endif
rd = __conn_read(co, bbuf, mhead->payload_length);
total_rd = 0;
while(total_rd != mhead->payload_length) {
total_rd += __conn_read(co, bbuf + total_rd, mhead->payload_length);
if(total_rd == -1) goto __sslproto_error;
}
rd = total_rd;
#ifdef _PERFPROFILE
gettimeofday(&end, NULL);
if((end.tv_sec - beg.tv_sec) > 0) {
@ -873,7 +902,8 @@ static void *__sxmpl_thread(void *b)
}
} else { /* regular messages */
if((mhead->attr & SXMSG_OPEN) && (mhead->attr & SXMSG_REPLYREQ)) { /* dialog initiation */
channel = co->channels[mhead->reserve];
if(mhead->reserve >= MAX_CHANNELSOPENED) channel = NULL;
else channel = co->channels[mhead->reserve];
if(!channel) { /* ok, we'are failed */
mhead->opcode = SXE_NOSUCHCHAN;
__ret_regerr:
@ -914,7 +944,7 @@ static void *__sxmpl_thread(void *b)
_message_process(msg);
} else if(mhead->attr & SXMSG_CLOSED) {
/* check for the message */
if(mhead->msgid >= 1024) goto __inval_idx_nor;
if(mhead->msgid >= MAX_MSGINPROCESS) goto __inval_idx_nor;
mid = mhead->msgid;
pthread_mutex_lock(&co->idx_msg_lock);
msg = co->messages[mid];

Loading…
Cancel
Save