bugs fixes;
This commit is contained in:
parent
5606c799ae
commit
bd2f05e281
@ -16,6 +16,9 @@
|
|||||||
|
|
||||||
#define MAX_RPC_LIST 512
|
#define MAX_RPC_LIST 512
|
||||||
|
|
||||||
#define MAX_RBBUF_LEN (65536 - sizeof(sntllv2_head_t))
|
#define MAX_SNTLLBUFSIZE 65536
|
||||||
|
#define MAX_RBBUF_LEN (MAX_SNTLLBUFSIZE - sizeof(sntllv2_head_t))
|
||||||
|
|
||||||
|
#define MAX_SNTLLTHREADS 8
|
||||||
|
|
||||||
#endif /* __SNTL_LIMITS_H__ */
|
#endif /* __SNTL_LIMITS_H__ */
|
||||||
|
@ -20,11 +20,13 @@
|
|||||||
#include <sys/time.h>
|
#include <sys/time.h>
|
||||||
#include <sys/types.h>
|
#include <sys/types.h>
|
||||||
#include <fcntl.h>
|
#include <fcntl.h>
|
||||||
|
#include <sched.h>
|
||||||
|
|
||||||
#include <tdata/usrtc.h>
|
#include <tdata/usrtc.h>
|
||||||
#include <tdata/list.h>
|
#include <tdata/list.h>
|
||||||
#include <sexpr/sexp.h>
|
#include <sexpr/sexp.h>
|
||||||
|
|
||||||
|
#include <sntl/limits.h>
|
||||||
#include <sntl/sntllv2.h>
|
#include <sntl/sntllv2.h>
|
||||||
|
|
||||||
#include "internal.h"
|
#include "internal.h"
|
||||||
@ -100,9 +102,16 @@ static inline int __sxmsg_send(chnl_t *channel, const char *data, size_t datalen
|
|||||||
/* ready to send it */
|
/* ready to send it */
|
||||||
if(!pp) {
|
if(!pp) {
|
||||||
r = _sntll_writemsg(co, msg);
|
r = _sntll_writemsg(co, msg);
|
||||||
if(r != SNE_SUCCESS) goto __closemsg;
|
if(r != SNE_SUCCESS) {
|
||||||
|
__unpinmsg:
|
||||||
|
pthread_mutex_lock(&co->idx_msg_lock);
|
||||||
|
idx_free(&co->idx_msg, msgidx);
|
||||||
|
co->messages[msgidx] = NULL;
|
||||||
|
pthread_mutex_unlock(&co->idx_msg_lock);
|
||||||
|
goto __freemsg;
|
||||||
|
}
|
||||||
} else { /* postponed */
|
} else { /* postponed */
|
||||||
if(!(ppm = malloc(sizeof(ppmsg_t)))) { r = SNE_ENOMEM; goto __closemsg; }
|
if(!(ppm = malloc(sizeof(ppmsg_t)))) { r = SNE_ENOMEM; goto __unpinmsg; }
|
||||||
list_init_node(&ppm->node);
|
list_init_node(&ppm->node);
|
||||||
ppm->msg = msg;
|
ppm->msg = msg;
|
||||||
|
|
||||||
@ -113,18 +122,16 @@ static inline int __sxmsg_send(chnl_t *channel, const char *data, size_t datalen
|
|||||||
pthread_mutex_unlock(&co->write_pending_lock);
|
pthread_mutex_unlock(&co->write_pending_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_lock(&msg->wait); /* we will sleep here */
|
//pthread_mutex_lock(&msg->wait); /* we will sleep here */
|
||||||
|
while(pthread_mutex_trylock(&msg->wait)) {
|
||||||
|
//printf("here opcode = %d\n", head->opcode);
|
||||||
|
}
|
||||||
|
|
||||||
if(head->payload_length) {
|
if(head->payload_length) {
|
||||||
*omsg = msg;
|
*omsg = msg;
|
||||||
return head->opcode;
|
return head->opcode;
|
||||||
} else r = head->opcode;
|
} else r = head->opcode;
|
||||||
|
|
||||||
__closemsg:
|
|
||||||
pthread_mutex_lock(&co->idx_msg_lock);
|
|
||||||
idx_free(&co->idx_msg, msgidx);
|
|
||||||
co->messages[msgidx] = NULL;
|
|
||||||
pthread_mutex_unlock(&co->idx_msg_lock);
|
|
||||||
__freemsg:
|
__freemsg:
|
||||||
/* free resources for message */
|
/* free resources for message */
|
||||||
pthread_mutex_unlock(&msg->wait);
|
pthread_mutex_unlock(&msg->wait);
|
||||||
@ -187,7 +194,7 @@ static inline int __sxmsg_reply(sxmsg_t *msg, const char *data,
|
|||||||
if(!(co = ch->connection)) return SNE_FAILED;
|
if(!(co = ch->connection)) return SNE_FAILED;
|
||||||
|
|
||||||
/* test for blocking */
|
/* test for blocking */
|
||||||
for(i = 0; i < 8; i++)
|
for(i = 0; i < MAX_SNTLLTHREADS; i++)
|
||||||
if(pthread_equal(self, co->thrd_poll[i])) return SNE_WOULDBLOCK;
|
if(pthread_equal(self, co->thrd_poll[i])) return SNE_WOULDBLOCK;
|
||||||
|
|
||||||
/* prepare it */
|
/* prepare it */
|
||||||
@ -298,6 +305,7 @@ static inline int __sxmsg_return(sxmsg_t *msg, int opcode, int pp)
|
|||||||
pthread_mutex_unlock(&co->idx_msg_lock);
|
pthread_mutex_unlock(&co->idx_msg_lock);
|
||||||
|
|
||||||
r = _sntll_writemsg(co, msg);
|
r = _sntll_writemsg(co, msg);
|
||||||
|
free(msg);
|
||||||
} else {
|
} else {
|
||||||
if(!(ppm = malloc(sizeof(ppmsg_t)))) return SNE_ENOMEM;
|
if(!(ppm = malloc(sizeof(ppmsg_t)))) return SNE_ENOMEM;
|
||||||
else { /* remove it */
|
else { /* remove it */
|
||||||
|
@ -40,6 +40,7 @@
|
|||||||
#include <tdata/list.h>
|
#include <tdata/list.h>
|
||||||
#include <sexpr/sexp.h>
|
#include <sexpr/sexp.h>
|
||||||
|
|
||||||
|
#include <sntl/limits.h>
|
||||||
#include <sntl/sntllv2.h>
|
#include <sntl/sntllv2.h>
|
||||||
|
|
||||||
#include "internal.h"
|
#include "internal.h"
|
||||||
@ -195,6 +196,7 @@ int _sntll_writemsg(conn_t *co, sxmsg_t *msg)
|
|||||||
sntllv2_head_t *head;
|
sntllv2_head_t *head;
|
||||||
size_t rd;
|
size_t rd;
|
||||||
int r;
|
int r;
|
||||||
|
char *buf = NULL;
|
||||||
|
|
||||||
if(!co || !msg) return SNE_FAILED;
|
if(!co || !msg) return SNE_FAILED;
|
||||||
|
|
||||||
@ -202,21 +204,25 @@ int _sntll_writemsg(conn_t *co, sxmsg_t *msg)
|
|||||||
head = &msg->mhead;
|
head = &msg->mhead;
|
||||||
if(head->payload_length && !msg->payload) return SNE_FAILED;
|
if(head->payload_length && !msg->payload) return SNE_FAILED;
|
||||||
|
|
||||||
|
if(head->payload_length) {
|
||||||
|
buf = malloc(sizeof(sntllv2_head_t) + head->payload_length);
|
||||||
|
memcpy(buf, head, sizeof(sntllv2_head_t));
|
||||||
|
memcpy(buf + sizeof(sntllv2_head_t), msg->payload, head->payload_length);
|
||||||
|
}
|
||||||
|
|
||||||
/* write the head and payload if applicable */
|
/* write the head and payload if applicable */
|
||||||
pthread_mutex_lock(&co->sslinout[1]);
|
pthread_mutex_lock(&co->sslinout[1]);
|
||||||
|
if(!buf)
|
||||||
rd = __conn_write(co, head, sizeof(sntllv2_head_t));
|
rd = __conn_write(co, head, sizeof(sntllv2_head_t));
|
||||||
|
else rd = __conn_write(co, buf, sizeof(sntllv2_head_t) + head->payload_length);
|
||||||
if(rd < 0) {
|
if(rd < 0) {
|
||||||
co->flags |= SNSX_CLOSED;
|
co->flags |= SNSX_CLOSED;
|
||||||
r = SNE_ESSL;
|
r = SNE_ESSL;
|
||||||
} else if(head->payload_length) {
|
|
||||||
rd = __conn_write(co, msg->payload, head->payload_length);
|
|
||||||
/* check up again */
|
|
||||||
if(rd < 0) { co->flags |= SNSX_CLOSED; r = SNE_ESSL; }
|
|
||||||
}
|
}
|
||||||
pthread_mutex_unlock(&co->sslinout[1]);
|
pthread_mutex_unlock(&co->sslinout[1]);
|
||||||
|
|
||||||
if(!(co->flags & SNSX_CLOSED)) r = SNE_SUCCESS;
|
if(!(co->flags & SNSX_CLOSED)) r = SNE_SUCCESS;
|
||||||
|
if(buf) free(buf);
|
||||||
return r;
|
return r;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -442,7 +448,7 @@ static int __eval_syssexp(conn_t *co, sexp_t *sx)
|
|||||||
|
|
||||||
static void __connection_destroy(conn_t *co)
|
static void __connection_destroy(conn_t *co)
|
||||||
{
|
{
|
||||||
int i = 0;
|
int i = 0, fd;
|
||||||
sxmsg_t *msg, *omsg;
|
sxmsg_t *msg, *omsg;
|
||||||
ppmsg_t *ppm;
|
ppmsg_t *ppm;
|
||||||
list_node_t *iter, *siter;
|
list_node_t *iter, *siter;
|
||||||
@ -473,6 +479,11 @@ static void __connection_destroy(conn_t *co)
|
|||||||
pthread_mutex_unlock(&co->write_pending_lock);
|
pthread_mutex_unlock(&co->write_pending_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* update use count */
|
||||||
|
_CONN_NOTINUSE(co);
|
||||||
|
|
||||||
|
/* ok, let's free other if we can */
|
||||||
|
if(!_CONN_UCOUNT(co)) {
|
||||||
/* go thru messages */
|
/* go thru messages */
|
||||||
pthread_mutex_lock(&co->idx_msg_lock);
|
pthread_mutex_lock(&co->idx_msg_lock);
|
||||||
for(i = 0; i < 1024; i++) {
|
for(i = 0; i < 1024; i++) {
|
||||||
@ -481,16 +492,13 @@ static void __connection_destroy(conn_t *co)
|
|||||||
else head = &msg->mhead;
|
else head = &msg->mhead;
|
||||||
head->opcode = SNE_LINKERROR;
|
head->opcode = SNE_LINKERROR;
|
||||||
pthread_mutex_unlock(&msg->wait);
|
pthread_mutex_unlock(&msg->wait);
|
||||||
|
pthread_mutex_destroy(&msg->wait);
|
||||||
|
free(msg);
|
||||||
co->messages[i] = NULL;
|
co->messages[i] = NULL;
|
||||||
idx_free(&co->idx_msg, i);
|
idx_free(&co->idx_msg, i);
|
||||||
}
|
}
|
||||||
pthread_mutex_unlock(&co->idx_msg_lock);
|
pthread_mutex_unlock(&co->idx_msg_lock);
|
||||||
|
|
||||||
/* update use count */
|
|
||||||
_CONN_NOTINUSE(co);
|
|
||||||
|
|
||||||
/* ok, let's free other if we can */
|
|
||||||
if(!_CONN_UCOUNT(co)) {
|
|
||||||
/* ok now we will free the channels */
|
/* ok now we will free the channels */
|
||||||
pthread_mutex_lock(&co->idx_ch_lock);
|
pthread_mutex_lock(&co->idx_ch_lock);
|
||||||
for(i = 0; i < 512; i++) {
|
for(i = 0; i < 512; i++) {
|
||||||
@ -506,9 +514,10 @@ static void __connection_destroy(conn_t *co)
|
|||||||
if(co->pctx->passwd) free(co->pctx->passwd);
|
if(co->pctx->passwd) free(co->pctx->passwd);
|
||||||
|
|
||||||
SSL_shutdown(co->ssl);
|
SSL_shutdown(co->ssl);
|
||||||
close(SSL_get_fd(co->ssl));
|
fd = SSL_get_fd(co->ssl);
|
||||||
SSL_free(co->ssl);
|
SSL_free(co->ssl);
|
||||||
SSL_CTX_free(co->ctx);
|
SSL_CTX_free(co->ctx);
|
||||||
|
close(fd);
|
||||||
__connection_second_free(co);
|
__connection_second_free(co);
|
||||||
__connection_minimal_free(co);
|
__connection_minimal_free(co);
|
||||||
}
|
}
|
||||||
@ -533,7 +542,9 @@ static void *__sntll_thread(void *b)
|
|||||||
int dispatch = 0, e;
|
int dispatch = 0, e;
|
||||||
size_t rd, wr;
|
size_t rd, wr;
|
||||||
ulong_t mid;
|
ulong_t mid;
|
||||||
|
#ifdef _PERFPROFILE
|
||||||
|
struct timeval beg, end;
|
||||||
|
#endif
|
||||||
/* byte buffer is following head */
|
/* byte buffer is following head */
|
||||||
bbuf += sizeof(sntllv2_head_t);
|
bbuf += sizeof(sntllv2_head_t);
|
||||||
|
|
||||||
@ -549,7 +560,7 @@ static void *__sntll_thread(void *b)
|
|||||||
|
|
||||||
/* check up a thread */
|
/* check up a thread */
|
||||||
if(pthread_equal(self, co->thrd_poll[7])) /* dispatcher */
|
if(pthread_equal(self, co->thrd_poll[7])) /* dispatcher */
|
||||||
dispatch = 1;
|
dispatch = 0;
|
||||||
|
|
||||||
/* update use count */
|
/* update use count */
|
||||||
_CONN_INUSE(co);
|
_CONN_INUSE(co);
|
||||||
@ -595,11 +606,23 @@ static void *__sntll_thread(void *b)
|
|||||||
pthread_mutex_unlock(&(co->sslinout[0]));
|
pthread_mutex_unlock(&(co->sslinout[0]));
|
||||||
goto __finish;
|
goto __finish;
|
||||||
}
|
}
|
||||||
|
#ifdef _PERFPROFILE
|
||||||
|
gettimeofday(&beg, NULL);
|
||||||
|
#endif
|
||||||
rd = __conn_read(co, mhead, sizeof(sntllv2_head_t));
|
rd = __conn_read(co, mhead, sizeof(sntllv2_head_t));
|
||||||
|
#ifdef _PERFPROFILE
|
||||||
|
gettimeofday(&end, NULL);
|
||||||
|
|
||||||
|
if((end.tv_sec - beg.tv_sec) > 0) {
|
||||||
|
printf("connread(head) Seconds: %ld ", end.tv_sec - beg.tv_sec);
|
||||||
|
printf("µS: %ld\n", end.tv_usec + (1000000 - beg.tv_usec));
|
||||||
|
} else printf("connread(head) µS: %ld\n", end.tv_usec - beg.tv_usec);
|
||||||
|
#endif
|
||||||
|
|
||||||
#ifdef _VERBOSE_DEBUG
|
#ifdef _VERBOSE_DEBUG
|
||||||
dumphead(mhead);
|
dumphead(mhead);
|
||||||
#endif
|
#endif
|
||||||
if(rd != sizeof(sntllv2_head_t)) {
|
if(rd < 0) {
|
||||||
__sslproto_error:
|
__sslproto_error:
|
||||||
co->flags |= SNSX_CLOSED;
|
co->flags |= SNSX_CLOSED;
|
||||||
pthread_mutex_unlock(&(co->sslinout[0]));
|
pthread_mutex_unlock(&(co->sslinout[0]));
|
||||||
@ -607,9 +630,21 @@ static void *__sntll_thread(void *b)
|
|||||||
} else {
|
} else {
|
||||||
/* check up if we can read or not */
|
/* check up if we can read or not */
|
||||||
if(mhead->payload_length) {
|
if(mhead->payload_length) {
|
||||||
|
#ifdef _PERFPROFILE
|
||||||
|
gettimeofday(&beg, NULL);
|
||||||
|
#endif
|
||||||
rd = __conn_read(co, bbuf, mhead->payload_length);
|
rd = __conn_read(co, bbuf, mhead->payload_length);
|
||||||
if(rd < 0) goto __sslproto_error;
|
#ifdef _PERFPROFILE
|
||||||
|
gettimeofday(&end, NULL);
|
||||||
|
if((end.tv_sec - beg.tv_sec) > 0) {
|
||||||
|
printf("connread(payload) Seconds: %ld ", end.tv_sec - beg.tv_sec);
|
||||||
|
printf("µS: %ld\n", end.tv_usec + (1000000 - beg.tv_usec));
|
||||||
|
} else printf("connread(payload) µS: %ld\n", end.tv_usec - beg.tv_usec);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
if(rd == -1) goto __sslproto_error;
|
||||||
else pthread_mutex_unlock(&(co->sslinout[0]));
|
else pthread_mutex_unlock(&(co->sslinout[0]));
|
||||||
|
|
||||||
if(rd != mhead->payload_length) {
|
if(rd != mhead->payload_length) {
|
||||||
mid = mhead->msgid;
|
mid = mhead->msgid;
|
||||||
/* if we're need to do something */
|
/* if we're need to do something */
|
||||||
@ -617,9 +652,9 @@ static void *__sntll_thread(void *b)
|
|||||||
mhead->opcode = SNE_INVALINDEX;
|
mhead->opcode = SNE_INVALINDEX;
|
||||||
goto __return_error;
|
goto __return_error;
|
||||||
} else {
|
} else {
|
||||||
pthread_mutex_lock(&co->idx_msg_lock);
|
// pthread_mutex_lock(&co->idx_msg_lock);
|
||||||
msg = co->messages[mid];
|
msg = co->messages[mid];
|
||||||
pthread_mutex_unlock(&co->idx_msg_lock);
|
//thread_mutex_unlock(&co->idx_msg_lock);
|
||||||
}
|
}
|
||||||
if(!msg) {
|
if(!msg) {
|
||||||
if(mhead->attr & SXMSG_OPEN) mhead->opcode = SNE_BADPROTO;
|
if(mhead->attr & SXMSG_OPEN) mhead->opcode = SNE_BADPROTO;
|
||||||
@ -662,9 +697,9 @@ static void *__sntll_thread(void *b)
|
|||||||
goto __again;
|
goto __again;
|
||||||
}
|
}
|
||||||
mid = mhead->msgid;
|
mid = mhead->msgid;
|
||||||
pthread_mutex_lock(&co->idx_msg_lock);
|
//hread_mutex_lock(&co->idx_msg_lock);
|
||||||
msg = co->messages[mid];
|
msg = co->messages[mid];
|
||||||
pthread_mutex_unlock(&co->idx_msg_lock);
|
//hread_mutex_unlock(&co->idx_msg_lock);
|
||||||
if(!msg) goto __inval_idx_nor;
|
if(!msg) goto __inval_idx_nor;
|
||||||
|
|
||||||
/* ok now we'are copy data and unlock wait mutex */
|
/* ok now we'are copy data and unlock wait mutex */
|
||||||
@ -732,6 +767,7 @@ static void *__sntll_thread(void *b)
|
|||||||
pthread_mutex_unlock(&co->idx_msg_lock); goto __inval_idx_nor; }
|
pthread_mutex_unlock(&co->idx_msg_lock); goto __inval_idx_nor; }
|
||||||
|
|
||||||
/* message dialog is closed - remove this right now */
|
/* message dialog is closed - remove this right now */
|
||||||
|
|
||||||
idx_free(&co->idx_msg, mid);
|
idx_free(&co->idx_msg, mid);
|
||||||
co->messages[mid] = NULL;
|
co->messages[mid] = NULL;
|
||||||
pthread_mutex_unlock(&co->idx_msg_lock);
|
pthread_mutex_unlock(&co->idx_msg_lock);
|
||||||
@ -944,7 +980,7 @@ conn_t *connection_master_link(conn_sys_t *ssys, int sck, struct in_addr *addr)
|
|||||||
bundle->conn = co;
|
bundle->conn = co;
|
||||||
}
|
}
|
||||||
int i;
|
int i;
|
||||||
for(i = 0; i < 8; i++) {
|
for(i = 0; i < MAX_SNTLLTHREADS; i++) {
|
||||||
if(bundle == (void *)0xdead) bundle = __sntll_bundle_create(co);
|
if(bundle == (void *)0xdead) bundle = __sntll_bundle_create(co);
|
||||||
if(!bundle) goto __fail5;
|
if(!bundle) goto __fail5;
|
||||||
r = pthread_create(&co->thrd_poll[i], NULL, __sntll_thread, bundle);
|
r = pthread_create(&co->thrd_poll[i], NULL, __sntll_thread, bundle);
|
||||||
@ -1144,7 +1180,6 @@ conn_t *connection_link(conn_sys_t *ssys, const char *host,
|
|||||||
head->payload_length = 0;
|
head->payload_length = 0;
|
||||||
wr = __conn_write(co, head, sizeof(sntllv2_head_t));
|
wr = __conn_write(co, head, sizeof(sntllv2_head_t));
|
||||||
if(wr < 0) {
|
if(wr < 0) {
|
||||||
blub("fuck");
|
|
||||||
r = SNE_LINKERROR; goto __fail2;}
|
r = SNE_LINKERROR; goto __fail2;}
|
||||||
if(r != SNE_SUCCESS) { r = SNE_LINKERROR; goto __fail2;}
|
if(r != SNE_SUCCESS) { r = SNE_LINKERROR; goto __fail2;}
|
||||||
}
|
}
|
||||||
@ -1163,7 +1198,7 @@ conn_t *connection_link(conn_sys_t *ssys, const char *host,
|
|||||||
bundle->buf = buf;
|
bundle->buf = buf;
|
||||||
bundle->conn = co;
|
bundle->conn = co;
|
||||||
}
|
}
|
||||||
for(i = 0; i < 8; i++) {
|
for(i = 0; i < MAX_SNTLLTHREADS; i++) {
|
||||||
if(bundle == (void *)0xdead) bundle = __sntll_bundle_create(co);
|
if(bundle == (void *)0xdead) bundle = __sntll_bundle_create(co);
|
||||||
if(!bundle) goto __fail5;
|
if(!bundle) goto __fail5;
|
||||||
r = pthread_create(&co->thrd_poll[i], NULL, __sntll_thread, bundle);
|
r = pthread_create(&co->thrd_poll[i], NULL, __sntll_thread, bundle);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user