numerious bugfixes;

v0.5.xx
Alexander Vdolainen 10 years ago
parent db24e791a4
commit 434c6cb09f

@ -24,10 +24,10 @@
/* define a little bit */ /* define a little bit */
#define DEFAULT_PORT 13133 #define DEFAULT_PORT 13133
#define CHANNEL_COUNT 4000 #define CHANNEL_COUNT 200
#define CLIENT_COUNT 2000 #define CLIENT_COUNT 100
#define MESSAGES_PER_SESSION 10 #define MESSAGES_PER_SESSION 10000
#define ITERATION_COUNT 10 #define ITERATION_COUNT 1000
#define FAILS_ONLY #define FAILS_ONLY
//#define SIMPLE_TESTING //#define SIMPLE_TESTING
@ -206,6 +206,7 @@ void *test_correct_channel(void *ctx)
rc = channel_open(co, &channel, 12); rc = channel_open(co, &channel, 12);
log_assert("channel_open with type 12", rc, 0); log_assert("channel_open with type 12", rc, 0);
log_begin("Test messaging"); log_begin("Test messaging");
//#if 0
for(i = 0; i < MESSAGES_PER_SESSION; ++i) { for(i = 0; i < MESSAGES_PER_SESSION; ++i) {
a = rand() % 100; a = rand() % 100;
b = rand() % 100; b = rand() % 100;
@ -220,10 +221,11 @@ void *test_correct_channel(void *ctx)
log_assert("rpc execution", rc, a + b); log_assert("rpc execution", rc, a + b);
//destroy_sexp(add_request); //destroy_sexp(add_request);
} }
//#endif
log_end("Test messaging"); log_end("Test messaging");
// rc = channel_close(channel); rc = channel_close(channel);
// log_assert("channel_close with type 12", rc, 0); log_assert("channel_close with type 12", rc, 0);
} }
log_end("Channel testing"); log_end("Channel testing");
@ -391,8 +393,8 @@ int main(int argc, char **argv)
log_assert("Thread list deallocation", rc, 0); log_assert("Thread list deallocation", rc, 0);
#endif #endif
#ifdef SIMPLE_TESTING #ifdef SIMPLE_TESTING
//test_channel_handling(co); test_channel_handling(co);
test_message_handling(co); //test_message_handling(co);
#endif #endif
printf("HERE!!!!\n"); printf("HERE!!!!\n");
log_begin("Connection close"); log_begin("Connection close");

@ -135,19 +135,20 @@ static int __conn_write(conn_t *co, void *buf, size_t buf_len)
{ {
int r; int r;
__retry:
pthread_mutex_lock(&(co->oplock)); pthread_mutex_lock(&(co->oplock));
__retry:
r = SSL_write(co->ssl, buf, (int)buf_len); r = SSL_write(co->ssl, buf, (int)buf_len);
switch(SSL_get_error(co->ssl, r)) { switch(SSL_get_error(co->ssl, r)) {
case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_READ:
case SSL_ERROR_WANT_WRITE: case SSL_ERROR_WANT_WRITE:
printf("--------------NEED to retry the write\n");
goto __retry; goto __retry;
break; break;
default: default:
pthread_mutex_unlock(&(co->oplock)); pthread_mutex_unlock(&(co->oplock));
fprintf(stderr, "(WR)Unknown error on %s (%d)\n", co->uuid, r); if(r < 0) {
return -1; fprintf(stderr, "(WR)Unknown error on %s (%d)\n", co->uuid, r);
return -1;
} else return 0;
} }
pthread_mutex_unlock(&(co->oplock)); pthread_mutex_unlock(&(co->oplock));
@ -537,19 +538,19 @@ static int __default_ch_open(void *cctx, sexp_t *sx)
} else rlist = (rpc_typed_list_t *)usrtc_node_getdata(node); } else rlist = (rpc_typed_list_t *)usrtc_node_getdata(node);
/* now we need to check up the channel */ /* now we need to check up the channel */
pthread_mutex_lock(&(co->oplock)); pthread_rwlock_rdlock(&(co->chnl_lock));
node = usrtc_lookup(co->chnl_tree, &cid); node = usrtc_lookup(co->chnl_tree, &cid);
if(node) { if(node) {
pthread_mutex_unlock(&(co->oplock)); pthread_rwlock_unlock(&(co->chnl_lock));
r = EEXIST; r = EEXIST;
goto __send_repl; goto __send_repl;
} else { } else {
idx_reserve(co->idx_ch, cid); idx_reserve(co->idx_ch, cid);
pthread_mutex_unlock(&(co->oplock)); /* now we should alloc channel */ pthread_rwlock_unlock(&(co->chnl_lock)); /* now we should alloc channel */
if((r = __alloc_channel(cid, co, rlist, &channel))) { if((r = __alloc_channel(cid, co, rlist, &channel))) {
pthread_mutex_lock(&(co->oplock)); pthread_rwlock_wrlock(&(co->chnl_lock));
idx_free(co->idx_ch, cid); idx_free(co->idx_ch, cid);
pthread_mutex_unlock(&(co->oplock)); pthread_rwlock_unlock(&(co->chnl_lock));
goto __send_repl; goto __send_repl;
} else { } else {
/* now we ready to confirm channel creation */ /* now we ready to confirm channel creation */
@ -705,26 +706,28 @@ static int __default_ch_close(void *cctx, sexp_t *sx)
//printf("%s(%ld)\n", __FUNCTION__, cid); //printf("%s(%ld)\n", __FUNCTION__, cid);
/* additional check for type of the channel */ /* additional check for type of the channel */
pthread_rwlock_rdlock(&(co->chnl_lock));
node = usrtc_lookup(co->chnl_tree, &cid); node = usrtc_lookup(co->chnl_tree, &cid);
pthread_rwlock_unlock(&(co->chnl_lock));
if(!node) { if(!node) {
r = ENOENT; r = ENOENT;
printf("there is no channel with id=%ld\n", cid); printf("there is no channel with id=%ld\n", cid);
goto __send_repl; goto __send_repl;
} }
channel = (chnl_t *)usrtc_node_getdata(node); channel = (chnl_t *)usrtc_node_getdata(node);
if(!channel) {
r = ENOENT;
printf("there is no channel with id=%ld\n", cid);
goto __send_repl;
}
__send_repl: __send_repl:
buf = malloc(2048); buf = malloc(2048);
snprintf(buf, 2048, "(ch-close-ret ((:id %ld) (:error %d)))", snprintf(buf, 2048, "(ch-close-ret ((:id %ld) (:error %d)))",
channel->cid, r); channel->cid, r);
__conn_write(co, buf, strlen(buf));
//printf("%s: replied %s (%d bytes)\n", __FUNCTION__, buf, nbytes); /* check up the message queue */
free(buf); pthread_rwlock_rdlock(&(channel->msglock));
if(usrtc_count(channel->msgs_tree)) {
/* destroy all */
/* TODO: make a verbose check, because the client cannot be honest :) (medium) */
}
pthread_rwlock_unlock(&(channel->msglock));
/* remove channel from the search tree */ /* remove channel from the search tree */
pthread_rwlock_wrlock(&(co->chnl_lock)); pthread_rwlock_wrlock(&(co->chnl_lock));
@ -734,12 +737,15 @@ __send_repl:
pthread_rwlock_unlock(&(co->chnl_lock)); pthread_rwlock_unlock(&(co->chnl_lock));
idx_allocator_destroy(channel->idx_msg); idx_allocator_destroy(channel->idx_msg);
free(channel->idx_msg);
free(channel->msgs_tree); free(channel->msgs_tree);
pthread_mutex_destroy(&(channel->oplock)); pthread_mutex_destroy(&(channel->oplock));
pthread_rwlock_destroy(&(channel->msglock)); pthread_rwlock_destroy(&(channel->msglock));
free(channel); free(channel);
destroy_sexp(sx); destroy_sexp(sx);
__conn_write(co, buf, strlen(buf));
free(buf);
return 0; return 0;
} }
@ -1987,6 +1993,8 @@ static void __destroy_msg(sxmsg_t *msg)
pthread_mutex_lock(&(ch->oplock)); pthread_mutex_lock(&(ch->oplock));
idx_free(ch->idx_msg, msg->mid); idx_free(ch->idx_msg, msg->mid);
pthread_mutex_unlock(&(ch->oplock)); pthread_mutex_unlock(&(ch->oplock));
} else if(msg->flags & ESXMSG_SYS) {
//if(msg->uuid) free(msg->uuid);
} }
pthread_mutex_unlock(&(msg->wait)); pthread_mutex_unlock(&(msg->wait));
@ -2088,7 +2096,7 @@ int channel_open(conn_t *co, chnl_t **ch, int type)
/* ok now we're ready to create a message and push channel to the list */ /* ok now we're ready to create a message and push channel to the list */
if((r = __create_sys_msg(&sms, uuid_, nch, pl))) { if((r = __create_sys_msg(&sms, uuid_, nch, pl))) {
__fail_chan: __fail_chan:
/* TODO: destroy the channel*/ /* TODO: destroy the channel*/
goto __fini_op; goto __fini_op;
} else { } else {
@ -2122,7 +2130,6 @@ int channel_open(conn_t *co, chnl_t **ch, int type)
goto __fail_chan_r; goto __fail_chan_r;
} else r = 0; } else r = 0;
nch->flags &= ~ESXCHAN_PENDING; /* mark it as established */ nch->flags &= ~ESXCHAN_PENDING; /* mark it as established */
/* TODO: destroy system message in the channel */
free(pl->cstr); free(pl->cstr);
free(pl); free(pl);
__destroy_msg(nch->sysmsg); __destroy_msg(nch->sysmsg);
@ -2132,12 +2139,19 @@ int channel_open(conn_t *co, chnl_t **ch, int type)
if(r) { /* TODO: destroy */ if(r) { /* TODO: destroy */
if(uuid_) free(uuid_); if(uuid_) free(uuid_);
if(pl) { if(pl) {
//if(pl->cstr) free(pl->cstr); if(pl->cstr) free(pl->cstr);
free(pl); free(pl);
} }
pthread_rwlock_wrlock(&(co->chnl_lock)); pthread_rwlock_wrlock(&(co->chnl_lock));
//idx_free(co->idx_ch, cid); idx_free(co->idx_ch, nch->cid);
pthread_rwlock_unlock(&(co->chnl_lock)); pthread_rwlock_unlock(&(co->chnl_lock));
idx_allocator_destroy(nch->idx_msg);
free(nch->idx_msg);
free(nch->msgs_tree);
pthread_mutex_destroy(&(nch->oplock));
pthread_rwlock_destroy(&(nch->msglock));
free(nch);
} else *ch = nch; } else *ch = nch;
return r; return r;
@ -2216,11 +2230,13 @@ __process_smsg:
pthread_rwlock_unlock(&(chnl->msglock)); pthread_rwlock_unlock(&(chnl->msglock));
__destroy_msg(chnl->sysmsg); __destroy_msg(chnl->sysmsg);
free(uuid_);
free(pl->cstr); free(pl->cstr);
free(pl); free(pl);
free(chnl->uuid); free(chnl->uuid);
idx_allocator_destroy(chnl->idx_msg); idx_allocator_destroy(chnl->idx_msg);
free(chnl->idx_msg);
free(chnl->msgs_tree); free(chnl->msgs_tree);
pthread_mutex_destroy(&(chnl->oplock)); pthread_mutex_destroy(&(chnl->oplock));
pthread_rwlock_destroy(&(chnl->msglock)); pthread_rwlock_destroy(&(chnl->msglock));

Loading…
Cancel
Save