diff --git a/lib/internal.h b/lib/internal.h index fa2565d..0a4794f 100644 --- a/lib/internal.h +++ b/lib/internal.h @@ -38,6 +38,7 @@ /* link related */ int _sxmpl_writemsg(sxlink_t *co, sxmsg_t *msg); +int _sxmpl_rapidwrite(sxlink_t *link, sxmsg_t *msg); /* channel operations */ uint8_t _channel_open(sxlink_t *co, uint16_t *chid); diff --git a/lib/message.c b/lib/message.c index 4c96350..038dfc2 100644 --- a/lib/message.c +++ b/lib/message.c @@ -279,7 +279,7 @@ int sxmsg_rreply(sxmsg_t *msg, size_t datalen) head->attr = 0; head->attr |= SXMSG_CLOSED; head->opcode = SXE_RAPIDMSG; - head->payload_length = datalen; + head->payload_length = (uint16_t)datalen; mid = head->msgid; pthread_mutex_lock(&co->idx_msg_lock); @@ -287,7 +287,7 @@ int sxmsg_rreply(sxmsg_t *msg, size_t datalen) co->messages[mid] = NULL; pthread_mutex_unlock(&co->idx_msg_lock); - r = _sxmpl_writemsg(co, msg); + r = _sxmpl_rapidwrite(co, msg); pthread_mutex_unlock(&msg->wait); /* we able to invalidate it */ pthread_mutex_destroy(&msg->wait); diff --git a/lib/sxmplv2.c b/lib/sxmplv2.c index d78b5cd..ff233e7 100644 --- a/lib/sxmplv2.c +++ b/lib/sxmplv2.c @@ -253,6 +253,29 @@ int _sxmpl_writemsg(sxlink_t *co, sxmsg_t *msg) return r; } +int _sxmpl_rapidwrite(sxlink_t *link, sxmsg_t *msg) +{ + char *buf = msg->payload - sizeof(sxmplv2_head_t); + size_t rd; + int r; + + if(!link || !msg) return SXE_FAILED; + + memcpy(buf, &msg->mhead, sizeof(sxmplv2_head_t)); + + pthread_mutex_lock(&link->sslinout[1]); + rd = __conn_write(link, buf, sizeof(sxmplv2_head_t) + msg->mhead.payload_length); + if(rd < 0) { + link->flags |= SXMP_CLOSED; + r = SXE_ESSL; + } + pthread_mutex_unlock(&link->sslinout[1]); + + if(!(link->flags & SXMP_CLOSED)) r = SXE_SUCCESS; + + return r; +} + static sxmplv2_bundle_t *__sxmpl_bundle_create(sxlink_t *co) { sxmplv2_bundle_t *n = malloc(sizeof(sxmplv2_bundle_t)); @@ -689,7 +712,7 @@ static void *__sxmpl_thread(void *b) pthread_t self = pthread_self(); struct timespec wtick; int dispatch = 0, e; - size_t rd, wr; + size_t rd, wr, total_rd; ulong_t mid = 0; #ifdef _PERFPROFILE struct timeval beg, end; @@ -788,7 +811,13 @@ static void *__sxmpl_thread(void *b) #ifdef _PERFPROFILE gettimeofday(&beg, NULL); #endif - rd = __conn_read(co, bbuf, mhead->payload_length); + total_rd = 0; + while(total_rd != mhead->payload_length) { + total_rd += __conn_read(co, bbuf + total_rd, mhead->payload_length); + if(total_rd == -1) goto __sslproto_error; + } + rd = total_rd; + #ifdef _PERFPROFILE gettimeofday(&end, NULL); if((end.tv_sec - beg.tv_sec) > 0) { @@ -873,7 +902,8 @@ static void *__sxmpl_thread(void *b) } } else { /* regular messages */ if((mhead->attr & SXMSG_OPEN) && (mhead->attr & SXMSG_REPLYREQ)) { /* dialog initiation */ - channel = co->channels[mhead->reserve]; + if(mhead->reserve >= MAX_CHANNELSOPENED) channel = NULL; + else channel = co->channels[mhead->reserve]; if(!channel) { /* ok, we'are failed */ mhead->opcode = SXE_NOSUCHCHAN; __ret_regerr: @@ -914,7 +944,7 @@ static void *__sxmpl_thread(void *b) _message_process(msg); } else if(mhead->attr & SXMSG_CLOSED) { /* check for the message */ - if(mhead->msgid >= 1024) goto __inval_idx_nor; + if(mhead->msgid >= MAX_MSGINPROCESS) goto __inval_idx_nor; mid = mhead->msgid; pthread_mutex_lock(&co->idx_msg_lock); msg = co->messages[mid];