diff --git a/examples/sntlc.c b/examples/sntlc.c index 2cd3488..000c0d7 100644 --- a/examples/sntlc.c +++ b/examples/sntlc.c @@ -24,10 +24,10 @@ /* define a little bit */ #define DEFAULT_PORT 13133 -#define CHANNEL_COUNT 4000 -#define CLIENT_COUNT 2000 -#define MESSAGES_PER_SESSION 10 -#define ITERATION_COUNT 10 +#define CHANNEL_COUNT 200 +#define CLIENT_COUNT 100 +#define MESSAGES_PER_SESSION 10000 +#define ITERATION_COUNT 1000 #define FAILS_ONLY //#define SIMPLE_TESTING @@ -206,6 +206,7 @@ void *test_correct_channel(void *ctx) rc = channel_open(co, &channel, 12); log_assert("channel_open with type 12", rc, 0); log_begin("Test messaging"); + //#if 0 for(i = 0; i < MESSAGES_PER_SESSION; ++i) { a = rand() % 100; b = rand() % 100; @@ -220,10 +221,11 @@ void *test_correct_channel(void *ctx) log_assert("rpc execution", rc, a + b); //destroy_sexp(add_request); } + //#endif log_end("Test messaging"); -// rc = channel_close(channel); -// log_assert("channel_close with type 12", rc, 0); + rc = channel_close(channel); + log_assert("channel_close with type 12", rc, 0); } log_end("Channel testing"); @@ -391,8 +393,8 @@ int main(int argc, char **argv) log_assert("Thread list deallocation", rc, 0); #endif #ifdef SIMPLE_TESTING - //test_channel_handling(co); - test_message_handling(co); + test_channel_handling(co); + //test_message_handling(co); #endif printf("HERE!!!!\n"); log_begin("Connection close"); diff --git a/lib/connection.c b/lib/connection.c index 35ed7f5..4c729d5 100644 --- a/lib/connection.c +++ b/lib/connection.c @@ -135,19 +135,20 @@ static int __conn_write(conn_t *co, void *buf, size_t buf_len) { int r; - __retry: pthread_mutex_lock(&(co->oplock)); + __retry: r = SSL_write(co->ssl, buf, (int)buf_len); switch(SSL_get_error(co->ssl, r)) { case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_WRITE: - printf("--------------NEED to retry the write\n"); goto __retry; break; default: pthread_mutex_unlock(&(co->oplock)); - fprintf(stderr, "(WR)Unknown error on %s (%d)\n", co->uuid, r); - return -1; + if(r < 0) { + fprintf(stderr, "(WR)Unknown error on %s (%d)\n", co->uuid, r); + return -1; + } else return 0; } pthread_mutex_unlock(&(co->oplock)); @@ -537,19 +538,19 @@ static int __default_ch_open(void *cctx, sexp_t *sx) } else rlist = (rpc_typed_list_t *)usrtc_node_getdata(node); /* now we need to check up the channel */ - pthread_mutex_lock(&(co->oplock)); + pthread_rwlock_rdlock(&(co->chnl_lock)); node = usrtc_lookup(co->chnl_tree, &cid); if(node) { - pthread_mutex_unlock(&(co->oplock)); + pthread_rwlock_unlock(&(co->chnl_lock)); r = EEXIST; goto __send_repl; } else { idx_reserve(co->idx_ch, cid); - pthread_mutex_unlock(&(co->oplock)); /* now we should alloc channel */ + pthread_rwlock_unlock(&(co->chnl_lock)); /* now we should alloc channel */ if((r = __alloc_channel(cid, co, rlist, &channel))) { - pthread_mutex_lock(&(co->oplock)); + pthread_rwlock_wrlock(&(co->chnl_lock)); idx_free(co->idx_ch, cid); - pthread_mutex_unlock(&(co->oplock)); + pthread_rwlock_unlock(&(co->chnl_lock)); goto __send_repl; } else { /* now we ready to confirm channel creation */ @@ -705,26 +706,28 @@ static int __default_ch_close(void *cctx, sexp_t *sx) //printf("%s(%ld)\n", __FUNCTION__, cid); /* additional check for type of the channel */ + pthread_rwlock_rdlock(&(co->chnl_lock)); node = usrtc_lookup(co->chnl_tree, &cid); + pthread_rwlock_unlock(&(co->chnl_lock)); if(!node) { r = ENOENT; printf("there is no channel with id=%ld\n", cid); goto __send_repl; } channel = (chnl_t *)usrtc_node_getdata(node); - if(!channel) { - r = ENOENT; - printf("there is no channel with id=%ld\n", cid); - goto __send_repl; - } __send_repl: buf = malloc(2048); snprintf(buf, 2048, "(ch-close-ret ((:id %ld) (:error %d)))", channel->cid, r); - __conn_write(co, buf, strlen(buf)); - //printf("%s: replied %s (%d bytes)\n", __FUNCTION__, buf, nbytes); - free(buf); + + /* check up the message queue */ + pthread_rwlock_rdlock(&(channel->msglock)); + if(usrtc_count(channel->msgs_tree)) { + /* destroy all */ + /* TODO: make a verbose check, because the client cannot be honest :) (medium) */ + } + pthread_rwlock_unlock(&(channel->msglock)); /* remove channel from the search tree */ pthread_rwlock_wrlock(&(co->chnl_lock)); @@ -734,12 +737,15 @@ __send_repl: pthread_rwlock_unlock(&(co->chnl_lock)); idx_allocator_destroy(channel->idx_msg); + free(channel->idx_msg); free(channel->msgs_tree); pthread_mutex_destroy(&(channel->oplock)); pthread_rwlock_destroy(&(channel->msglock)); free(channel); destroy_sexp(sx); + __conn_write(co, buf, strlen(buf)); + free(buf); return 0; } @@ -1987,6 +1993,8 @@ static void __destroy_msg(sxmsg_t *msg) pthread_mutex_lock(&(ch->oplock)); idx_free(ch->idx_msg, msg->mid); pthread_mutex_unlock(&(ch->oplock)); + } else if(msg->flags & ESXMSG_SYS) { + //if(msg->uuid) free(msg->uuid); } pthread_mutex_unlock(&(msg->wait)); @@ -2088,7 +2096,7 @@ int channel_open(conn_t *co, chnl_t **ch, int type) /* ok now we're ready to create a message and push channel to the list */ if((r = __create_sys_msg(&sms, uuid_, nch, pl))) { - __fail_chan: + __fail_chan: /* TODO: destroy the channel*/ goto __fini_op; } else { @@ -2122,7 +2130,6 @@ int channel_open(conn_t *co, chnl_t **ch, int type) goto __fail_chan_r; } else r = 0; nch->flags &= ~ESXCHAN_PENDING; /* mark it as established */ - /* TODO: destroy system message in the channel */ free(pl->cstr); free(pl); __destroy_msg(nch->sysmsg); @@ -2132,12 +2139,19 @@ int channel_open(conn_t *co, chnl_t **ch, int type) if(r) { /* TODO: destroy */ if(uuid_) free(uuid_); if(pl) { - //if(pl->cstr) free(pl->cstr); + if(pl->cstr) free(pl->cstr); free(pl); } pthread_rwlock_wrlock(&(co->chnl_lock)); - //idx_free(co->idx_ch, cid); + idx_free(co->idx_ch, nch->cid); pthread_rwlock_unlock(&(co->chnl_lock)); + idx_allocator_destroy(nch->idx_msg); + free(nch->idx_msg); + free(nch->msgs_tree); + pthread_mutex_destroy(&(nch->oplock)); + pthread_rwlock_destroy(&(nch->msglock)); + + free(nch); } else *ch = nch; return r; @@ -2216,11 +2230,13 @@ __process_smsg: pthread_rwlock_unlock(&(chnl->msglock)); __destroy_msg(chnl->sysmsg); + free(uuid_); free(pl->cstr); free(pl); free(chnl->uuid); idx_allocator_destroy(chnl->idx_msg); + free(chnl->idx_msg); free(chnl->msgs_tree); pthread_mutex_destroy(&(chnl->oplock)); pthread_rwlock_destroy(&(chnl->msglock));