bugs fixed;

v0.5.xx
Alexander Vdolainen 10 years ago
parent 438633a911
commit e68b7290b8

@ -231,6 +231,7 @@ int sxmsg_reply_pp(sxmsg_t *msg, const char *data, size_t datalen);
int sxmsg_rreply(sxmsg_t *msg, size_t datalen); int sxmsg_rreply(sxmsg_t *msg, size_t datalen);
int sxmsg_return(sxmsg_t *msg, int opcode); int sxmsg_return(sxmsg_t *msg, int opcode);
int sxmsg_return_pp(sxmsg_t *msg, int opcode); int sxmsg_return_pp(sxmsg_t *msg, int opcode);
void sxmsg_clean(sxmsg_t *msg);
#ifdef __cplusplus #ifdef __cplusplus
} }
@ -261,6 +262,8 @@ int sntl_rpclist_filter(usrtc_t *source, usrtc_t **dest, int flag, int *filter);
#define blub(txt) fprintf(stderr, "%s:%d in %s > %s\n", __FILE__, __LINE__, __FUNCTION__, txt) #define blub(txt) fprintf(stderr, "%s:%d in %s > %s\n", __FILE__, __LINE__, __FUNCTION__, txt)
#define dumphead(head) fprintf(stderr, "id: %d, opcode: %d, attr: %d, len = %d\n", head->msgid, head->opcode, head->attr, head->payload_length)
#endif /* __SNTL_SNTLLV2_H__ */ #endif /* __SNTL_SNTLLV2_H__ */

@ -46,14 +46,14 @@
uint8_t _channel_open(conn_t *co, uint16_t *chid) uint8_t _channel_open(conn_t *co, uint16_t *chid)
{ {
chnl_t *chan; chnl_t *chan;
uint16_t typeid = *chid; /* our type */ int typeid = *chid; /* our type */
uint16_t chidx; uint16_t chidx;
usrtc_t *rpc_list = co->rpc_list; usrtc_t *rpc_list = co->rpc_list;
usrtc_node_t *node; usrtc_node_t *node;
rpc_typed_list_t *rlist; rpc_typed_list_t *rlist;
cx_rpc_list_t *rpclist; cx_rpc_list_t *rpclist;
node = usrtc_lookup(rpc_list, (void *)&typeid); node = usrtc_lookup(rpc_list, &typeid);
if(!node) return SNE_EPERM; if(!node) return SNE_EPERM;
else rlist = (rpc_typed_list_t *)usrtc_node_getdata(node); else rlist = (rpc_typed_list_t *)usrtc_node_getdata(node);
@ -87,6 +87,7 @@ uint8_t _channel_open(conn_t *co, uint16_t *chid)
uint8_t _channel_close(conn_t *co, uint16_t chid) uint8_t _channel_close(conn_t *co, uint16_t chid)
{ {
chnl_t *chan; chnl_t *chan;
ulong_t chidx = chid;
if(chid > 512) return SNE_INVALINDEX; if(chid > 512) return SNE_INVALINDEX;
else chan = co->channels[chid]; else chan = co->channels[chid];
@ -94,8 +95,8 @@ uint8_t _channel_close(conn_t *co, uint16_t chid)
if(!chan) return SNE_NOSUCHCHAN; if(!chan) return SNE_NOSUCHCHAN;
pthread_mutex_lock(&co->idx_ch_lock); pthread_mutex_lock(&co->idx_ch_lock);
idx_free(&co->idx_ch, chid); idx_free(&co->idx_ch, chidx);
co->channels[chid] = NULL; co->channels[chidx] = NULL;
pthread_mutex_unlock(&co->idx_ch_lock); pthread_mutex_unlock(&co->idx_ch_lock);
free(chan); free(chan);
@ -117,6 +118,7 @@ chnl_t *sxchannel_open(conn_t *co, int type)
if(!(chan = malloc(sizeof(chnl_t)))) { if(!(chan = malloc(sizeof(chnl_t)))) {
__enomem: __enomem:
if(chan) free(chan);
r = SNE_ENOMEM; r = SNE_ENOMEM;
goto __reterr; goto __reterr;
} }
@ -146,6 +148,7 @@ chnl_t *sxchannel_open(conn_t *co, int type)
pthread_mutex_unlock(&co->idx_msg_lock); pthread_mutex_unlock(&co->idx_msg_lock);
if(msgidx == IDX_INVAL) { r = SNE_MMESSAGES; goto __reterr2; } if(msgidx == IDX_INVAL) { r = SNE_MMESSAGES; goto __reterr2; }
else head->msgid = msgidx;
/* now we're ready to write it */ /* now we're ready to write it */
r = _sntll_writemsg(co, msg); r = _sntll_writemsg(co, msg);
@ -156,13 +159,15 @@ chnl_t *sxchannel_open(conn_t *co, int type)
if(msg->mhead.opcode != SNE_SUCCESS) { r = msg->mhead.opcode; goto __reterr3; } if(msg->mhead.opcode != SNE_SUCCESS) { r = msg->mhead.opcode; goto __reterr3; }
/* ok all is fine */ /* ok all is fine */
msgidx = msg->mhead.reserve;
chan->cid = msg->mhead.reserve; chan->cid = msg->mhead.reserve;
pthread_mutex_lock(&co->idx_ch_lock); pthread_mutex_lock(&co->idx_ch_lock);
idx_reserve(&co->idx_ch, msg->mhead.reserve); idx_reserve(&co->idx_ch, msgidx);
co->channels[msg->mhead.reserve] = chan; co->channels[msgidx] = chan;
pthread_mutex_unlock(&co->idx_ch_lock); pthread_mutex_unlock(&co->idx_ch_lock);
/* destroy a message */ /* destroy a message */
msgidx = head->msgid;
pthread_mutex_lock(&co->idx_msg_lock); pthread_mutex_lock(&co->idx_msg_lock);
idx_free(&co->idx_msg, msgidx); idx_free(&co->idx_msg, msgidx);
co->messages[msgidx] = NULL; co->messages[msgidx] = NULL;
@ -196,7 +201,7 @@ int sxchannel_close(chnl_t *channel)
sxmsg_t *msg; sxmsg_t *msg;
sntllv2_head_t *head; sntllv2_head_t *head;
conn_t *co; conn_t *co;
int msgidx; int msgidx, chidx;
/* check channel validity */ /* check channel validity */
if(!channel) return SNE_FAILED; if(!channel) return SNE_FAILED;
@ -231,9 +236,10 @@ int sxchannel_close(chnl_t *channel)
r = head->opcode; r = head->opcode;
/* we will free this anyway */ /* we will free this anyway */
chidx = channel->cid;
pthread_mutex_lock(&co->idx_ch_lock); pthread_mutex_lock(&co->idx_ch_lock);
idx_free(&co->idx_ch, channel->cid); idx_free(&co->idx_ch, chidx);
co->channels[channel->cid] = NULL; co->channels[chidx] = NULL;
pthread_mutex_unlock(&co->idx_ch_lock); pthread_mutex_unlock(&co->idx_ch_lock);
free(channel); free(channel);
} }

@ -325,3 +325,9 @@ int sxmsg_return_pp(sxmsg_t *msg, int opcode)
return __sxmsg_return(msg, opcode, 1); return __sxmsg_return(msg, opcode, 1);
} }
void sxmsg_clean(sxmsg_t *msg)
{
free(msg->payload);
free(msg);
return;
}

@ -203,7 +203,7 @@ int _sntll_writemsg(conn_t *co, sxmsg_t *msg)
if(head->payload_length && !msg->payload) return SNE_FAILED; if(head->payload_length && !msg->payload) return SNE_FAILED;
/* write the head and payload if applicable */ /* write the head and payload if applicable */
pthread_mutex_lock(&co->sslinout[2]); pthread_mutex_lock(&co->sslinout[1]);
rd = __conn_write(co, head, sizeof(sntllv2_head_t)); rd = __conn_write(co, head, sizeof(sntllv2_head_t));
if(rd < 0) { if(rd < 0) {
co->flags |= SNSX_CLOSED; co->flags |= SNSX_CLOSED;
@ -213,7 +213,7 @@ int _sntll_writemsg(conn_t *co, sxmsg_t *msg)
/* check up again */ /* check up again */
if(rd < 0) { co->flags |= SNSX_CLOSED; r = SNE_ESSL; } if(rd < 0) { co->flags |= SNSX_CLOSED; r = SNE_ESSL; }
} }
pthread_mutex_unlock(&co->sslinout[2]); pthread_mutex_unlock(&co->sslinout[1]);
if(!(co->flags & SNSX_CLOSED)) r = SNE_SUCCESS; if(!(co->flags & SNSX_CLOSED)) r = SNE_SUCCESS;
@ -359,6 +359,8 @@ static int __connection_second_alloc(conn_t *co)
{ {
usrtc_node_init(&co->csnode, co); usrtc_node_init(&co->csnode, co);
memset(&co->idx_ch, 0, sizeof(idx_allocator_t));
memset(&co->idx_msg, 0, sizeof(idx_allocator_t));
if((idx_allocator_init(&co->idx_ch, 512, 0))) goto __fail; if((idx_allocator_init(&co->idx_ch, 512, 0))) goto __fail;
if((idx_allocator_init(&co->idx_msg, 1024, 0))) goto __fail; if((idx_allocator_init(&co->idx_msg, 1024, 0))) goto __fail;
@ -447,6 +449,7 @@ static void *__sntll_thread(void *b)
pthread_t self = pthread_self(); pthread_t self = pthread_self();
int dispatch = 0; int dispatch = 0;
size_t rd, wr; size_t rd, wr;
ulong_t mid;
/* byte buffer is following head */ /* byte buffer is following head */
bbuf += sizeof(sntllv2_head_t); bbuf += sizeof(sntllv2_head_t);
@ -472,8 +475,15 @@ static void *__sntll_thread(void *b)
while(1) { while(1) {
__again: __again:
pthread_mutex_lock(&(co->sslinout[0])); pthread_mutex_lock(&(co->sslinout[0]));
if(co->flags & SNSX_CLOSED) {
pthread_mutex_unlock(&(co->sslinout[0]));
goto __finish;
}
rd = __conn_read(co, mhead, sizeof(sntllv2_head_t)); rd = __conn_read(co, mhead, sizeof(sntllv2_head_t));
if(rd != sizeof(sntllv2_bundle_t)) { #ifdef _VERBOSE_DEBUG
dumphead(mhead);
#endif
if(rd != sizeof(sntllv2_head_t)) {
__sslproto_error: __sslproto_error:
co->flags |= SNSX_CLOSED; co->flags |= SNSX_CLOSED;
pthread_mutex_unlock(&(co->sslinout[0])); pthread_mutex_unlock(&(co->sslinout[0]));
@ -530,7 +540,8 @@ static void *__sntll_thread(void *b)
fprintf(stderr, "[sntllv2] Invalid index of the message.\n"); fprintf(stderr, "[sntllv2] Invalid index of the message.\n");
goto __again; goto __again;
} }
msg = co->messages[mhead->msgid]; mid = mhead->msgid;
msg = co->messages[mid];
if(!msg) goto __inval_idx_nor; if(!msg) goto __inval_idx_nor;
/* ok now we'are copy data and unlock wait mutex */ /* ok now we'are copy data and unlock wait mutex */
@ -580,10 +591,11 @@ static void *__sntll_thread(void *b)
if(mhead->payload_length) msg->payload = bbuf; if(mhead->payload_length) msg->payload = bbuf;
} }
pthread_mutex_lock(&co->idx_ch_lock); mid = mhead->msgid;
idx_reserve(&co->idx_ch, mhead->msgid); pthread_mutex_lock(&co->idx_msg_lock);
idx_reserve(&co->idx_msg, mid);
co->messages[mhead->msgid] = msg; co->messages[mhead->msgid] = msg;
pthread_mutex_unlock(&co->idx_ch_lock); pthread_mutex_unlock(&co->idx_msg_lock);
/* now we are able to process the message */ /* now we are able to process the message */
_message_process(msg); _message_process(msg);
@ -594,10 +606,10 @@ static void *__sntll_thread(void *b)
if(!msg) goto __inval_idx_nor; if(!msg) goto __inval_idx_nor;
/* message dialog is closed - remove this right now */ /* message dialog is closed - remove this right now */
pthread_mutex_lock(&co->idx_ch_lock); pthread_mutex_lock(&co->idx_msg_lock);
idx_free(&co->idx_ch, mhead->msgid); idx_free(&co->idx_msg, mhead->msgid);
co->messages[mhead->msgid] = NULL; co->messages[mhead->msgid] = NULL;
pthread_mutex_unlock(&co->idx_ch_lock); pthread_mutex_unlock(&co->idx_msg_lock);
if(msg->mhead.attr & SXMSG_TIMEDOUT) { /* nobody wait for it */ if(msg->mhead.attr & SXMSG_TIMEDOUT) { /* nobody wait for it */
/* now just free it */ /* now just free it */
@ -622,10 +634,10 @@ static void *__sntll_thread(void *b)
if(!msg) goto __inval_idx_nor; if(!msg) goto __inval_idx_nor;
if(msg->mhead.attr & SXMSG_TIMEDOUT) { /* nobody wait for it */ if(msg->mhead.attr & SXMSG_TIMEDOUT) { /* nobody wait for it */
pthread_mutex_lock(&co->idx_ch_lock); pthread_mutex_lock(&co->idx_msg_lock);
idx_free(&co->idx_ch, mhead->msgid); idx_free(&co->idx_msg, mhead->msgid);
co->messages[mhead->msgid] = NULL; co->messages[mhead->msgid] = NULL;
pthread_mutex_unlock(&co->idx_ch_lock); pthread_mutex_unlock(&co->idx_msg_lock);
/* now just free it */ /* now just free it */
pthread_mutex_destroy(&msg->wait); pthread_mutex_destroy(&msg->wait);
@ -775,7 +787,7 @@ conn_t *connection_master_link(conn_sys_t *ssys, int sck, struct in_addr *addr)
head->opcode = r; head->opcode = r;
if(r != SNE_SUCCESS) { /* we finish */ if(r != SNE_SUCCESS) { /* we finish */
head->payload_length = 0; head->payload_length = 0;
__conn_write(co, buf, sizeof(sntllv2_head_t)); __conn_write(co, head, sizeof(sntllv2_head_t));
destroy_sexp(sx); destroy_sexp(sx);
goto __fail3; goto __fail3;
} }
@ -793,6 +805,10 @@ conn_t *connection_master_link(conn_sys_t *ssys, int sck, struct in_addr *addr)
r = __connection_second_alloc(co); r = __connection_second_alloc(co);
if(r != SNE_SUCCESS) goto __fail3; if(r != SNE_SUCCESS) goto __fail3;
/* free message */
co->messages[0] = NULL;
free(msg);
/* and now we're need to create a thread poll */ /* and now we're need to create a thread poll */
if(!(bundle = malloc(sizeof(sntllv2_bundle_t)))) { r = SNE_ENOMEM; goto __fail4; } if(!(bundle = malloc(sizeof(sntllv2_bundle_t)))) { r = SNE_ENOMEM; goto __fail4; }
else { else {
@ -992,18 +1008,27 @@ conn_t *connection_link(conn_sys_t *ssys, const char *host,
sx = parse_sexp(bbuf, rd); sx = parse_sexp(bbuf, rd);
if(!sx) { r = SNE_BADPROTO; goto __fail2; } if(!sx) { r = SNE_BADPROTO; goto __fail2; }
r = __eval_syssexp(co, sx); r = __eval_syssexp(co, sx);
if(!r) r = SNE_SUCCESS;
destroy_sexp(sx); destroy_sexp(sx);
/* write back */
head->opcode = r; head->opcode = r;
head->payload_length = 0; head->payload_length = 0;
wr = __conn_write(co, head, sizeof(sntllv2_head_t)); wr = __conn_write(co, head, sizeof(sntllv2_head_t));
if(wr < 0) goto __fail2; if(wr < 0) {
if(r != SNE_SUCCESS) goto __fail2; blub("fuck");
r = SNE_LINKERROR; goto __fail2;}
if(r != SNE_SUCCESS) { r = SNE_LINKERROR; goto __fail2;}
} }
/* if we're there - negotiation is done, going to init messaging mode */ /* if we're there - negotiation is done, going to init messaging mode */
r = __connection_second_alloc(co); r = __connection_second_alloc(co);
if(r != SNE_SUCCESS) goto __fail3; if(r != SNE_SUCCESS) goto __fail3;
/* free message */
co->messages[0] = NULL;
free(msg);
/* and now we're need to create a thread poll */ /* and now we're need to create a thread poll */
if(!(bundle = malloc(sizeof(sntllv2_bundle_t)))) { r = SNE_ENOMEM; goto __fail4; } if(!(bundle = malloc(sizeof(sntllv2_bundle_t)))) { r = SNE_ENOMEM; goto __fail4; }
else { else {

@ -34,12 +34,95 @@
/* define a little bit */ /* define a little bit */
#define DEFAULT_PORT 13133 #define DEFAULT_PORT 13133
#define CHANNEL_COUNT 200 #define CHANNEL_COUNT 200
#define CLIENT_COUNT 100 #define CLIENT_COUNT 256
#define MESSAGES_PER_SESSION 10000 #define MESSAGES_PER_SESSION 10000
#define ITERATION_COUNT 1000 #define ITERATION_COUNT 1000
#define FAILS_ONLY #define FAILS_ONLY
struct testdata {
int uc;
pthread_mutex_t ulock;
conn_t *co;
};
static int __init_testdata(struct testdata *t, conn_t *co)
{
t->uc = 0;
pthread_mutex_init(&t->ulock, NULL);
t->co = co;
return 0;
}
static void __wait_completion(struct testdata *t)
{
pthread_mutex_lock(&t->ulock);
if(t->uc) {
pthread_mutex_lock(&t->ulock);
}
return;
}
static int __set_typed_list_callback(conn_t *co, int ch, char *desc)
{
printf("allowed channel %d (%s)\n", ch, desc);
return SNE_SUCCESS;
}
static void *__addsthrd(void *a)
{
struct testdata *t = a;
conn_t *co = t->co;
chnl_t *mch;
sxmsg_t *msg;
char mmbuf[1024];
size_t ln;
int mr, i;
pthread_mutex_lock(&t->ulock);
t->uc++;
pthread_mutex_unlock(&t->ulock);
/* here we go */
mch = sxchannel_open(co, 12);
if(!mch) {
fprintf(stderr, "Failed to openchannel with %d\n", errno);
goto __fini;
}
for(i = 0; i < MESSAGES_PER_SESSION; i++) {
ln = snprintf(mmbuf, 1024, "(ar-add (10 10))");
mr = sxmsg_send(mch, mmbuf, ln, &msg);
switch(mr) {
case SNE_RAPIDMSG:
fprintf(stdout, "Rapidly replied: %s\n", (char *)sxmsg_payload(msg));
sxmsg_clean(msg);
break;
case SNE_REPLYREQ:
if(sxmsg_datalen(msg)) fprintf(stdout, "Replied (confirmation required): %s\n",
sxmsg_payload(msg));
mr = sxmsg_return(msg, SNE_SUCCESS);
fprintf(stderr, "mr = %d\n", mr);
break;
case SNE_SUCCESS:
fprintf(stdout, "Success.\n");
break;
default:
fprintf(stderr, "ERROR: %d\n", mr);
break;
}
}
sxchannel_close(mch);
__fini:
t->uc--;
if(t->uc <= 1) pthread_mutex_unlock(&t->ulock);
return NULL;
}
int main(int argc, char **argv) int main(int argc, char **argv)
{ {
char *rootca = NULL, *cert = NULL; char *rootca = NULL, *cert = NULL;
@ -116,15 +199,82 @@ int main(int argc, char **argv)
} }
/* Tests */ /* Tests */
struct timeval beg, end;
/* try to open connection */ /* try to open connection */
connections_set_channelcall(ssys, NULL); connections_set_channelcall(ssys, __set_typed_list_callback);
gettimeofday(&beg, NULL);
co = connection_link(ssys, addr, port, cert, login, password); co = connection_link(ssys, addr, port, cert, login, password);
if(!co) { if(!co) {
fprintf(stderr, "Failed to connection with %d\n", errno); fprintf(stderr, "Failed to connection with %d\n", errno);
return errno; return errno;
} }
gettimeofday(&end, NULL);
if((end.tv_sec - beg.tv_sec) > 0) {
printf("Seconds: %ld ", end.tv_sec - beg.tv_sec);
printf("µS: %ld\n", end.tv_usec + (1000000 - beg.tv_usec));
} else printf("µS: %ld\n", end.tv_usec - beg.tv_usec);
/* ok now we should open a channel */
chnl_t *testchannel = sxchannel_open(co, 12);
if(!testchannel) {
fprintf(stderr, "Failed to openchannel with %d\n", errno);
return errno;
}
gettimeofday(&end, NULL);
if((end.tv_sec - beg.tv_sec) > 0) {
printf("Seconds: %ld ", end.tv_sec - beg.tv_sec);
printf("µS: %ld\n", end.tv_usec + (1000000 - beg.tv_usec));
} else printf("µS: %ld\n", end.tv_usec - beg.tv_usec);
/* ok, send a message */
char mmbuf[1024];
sxmsg_t *msg;
size_t ln;
ln = snprintf(mmbuf, 1024, "(ar-add (10 10))");
int mr = sxmsg_send(testchannel, mmbuf, ln, &msg);
switch(mr) {
case SNE_RAPIDMSG:
fprintf(stdout, "Rapidly replied: %s\n", (char *)sxmsg_payload(msg));
sxmsg_clean(msg);
break;
case SNE_REPLYREQ:
if(sxmsg_datalen(msg)) fprintf(stdout, "Replied (confirmation required): %s\n",
sxmsg_payload(msg));
mr = sxmsg_return(msg, SNE_SUCCESS);
fprintf(stderr, "mr = %d\n", mr);
break;
case SNE_SUCCESS:
fprintf(stdout, "Success.\n");
break;
default:
fprintf(stderr, "ERROR: %d\n", mr);
break;
}
int ee = sxchannel_close(testchannel);
printf("ee = %d\n", ee);
gettimeofday(&end, NULL);
if((end.tv_sec - beg.tv_sec) > 0) {
printf("Seconds: %ld ", end.tv_sec - beg.tv_sec);
printf("µS: %ld\n", end.tv_usec + (1000000 - beg.tv_usec));
} else printf("µS: %ld\n", end.tv_usec - beg.tv_usec);
sleep(10);
/* ok, now we need to create many threads */
struct testdata trd;
pthread_t thrd;
int i;
__init_testdata(&trd, co);
for(i = 0; i < 256; i++) pthread_create(&thrd, NULL, __addsthrd, &trd);
__wait_completion(&trd);
return 0; return 0;
} }

@ -90,7 +90,7 @@ static int __secure_check(conn_t *co)
static int __set_typed_list_callback(conn_t *co, int ch, char *desc) static int __set_typed_list_callback(conn_t *co, int ch, char *desc)
{ {
printf("allowed channel %d (%s)\n", ch, desc); printf("allowed channel %d (%s)\n", ch, desc);
return 0; return SNE_SUCCESS;
} }
/* list of rpc calls functions */ /* list of rpc calls functions */

Loading…
Cancel
Save