diff --git a/include/sntl/sntllv2.h b/include/sntl/sntllv2.h index 9486481..0731e8a 100644 --- a/include/sntl/sntllv2.h +++ b/include/sntl/sntllv2.h @@ -86,7 +86,7 @@ typedef struct __connection_t { pthread_mutex_t idx_msg_lock; list_head_t write_pending; /** < list of messages waiting for write */ pthread_mutex_t write_pending_lock; - volatile uint8_t unused_messages; /** < unused message count */ + volatile uint8_t pending_messages; /** < pending message count */ /* Other stuff */ pthread_t thrd_poll[8]; volatile uint8_t flags; /** < flags of the connection */ diff --git a/lib/messagesx.c b/lib/messagesx.c index c63d83d..6e33d74 100644 --- a/lib/messagesx.c +++ b/lib/messagesx.c @@ -108,6 +108,7 @@ static inline int __sxmsg_send(chnl_t *channel, const char *data, size_t datalen /* under locking here */ pthread_mutex_lock(&co->write_pending_lock); + co->pending_messages++; list_add2tail(&co->write_pending, &ppm->node); /* push it to the FIFO */ pthread_mutex_unlock(&co->write_pending_lock); } @@ -207,6 +208,7 @@ static inline int __sxmsg_reply(sxmsg_t *msg, const char *data, /* under locking here */ pthread_mutex_lock(&co->write_pending_lock); + co->pending_messages++; list_add2tail(&co->write_pending, &ppm->node); /* push it to the FIFO */ pthread_mutex_unlock(&co->write_pending_lock); } @@ -310,6 +312,7 @@ static inline int __sxmsg_return(sxmsg_t *msg, int opcode, int pp) /* under locking here */ pthread_mutex_lock(&co->write_pending_lock); + co->pending_messages++; list_add2tail(&co->write_pending, &ppm->node); /* push it to the FIFO */ pthread_mutex_unlock(&co->write_pending_lock); diff --git a/lib/sntllv2.c b/lib/sntllv2.c index f701311..332b0fc 100644 --- a/lib/sntllv2.c +++ b/lib/sntllv2.c @@ -443,14 +443,34 @@ static int __eval_syssexp(conn_t *co, sexp_t *sx) static void __connection_destroy(conn_t *co) { int i = 0; - sxmsg_t *msg; + sxmsg_t *msg, *omsg; + ppmsg_t *ppm; + list_node_t *iter, *siter; chnl_t *chan; sntllv2_head_t *head; conn_sys_t *ssys = co->ssys; /* first we will unpin all messages and mark it as errors on */ - if(co->unused_messages) { - /*TODO: kill it all */ + if(co->pending_messages) { + pthread_mutex_lock(&co->write_pending_lock); + list_for_each_safe(&co->write_pending, iter, siter) { + ppm = container_of(iter, ppmsg_t, node); + omsg = ppm->msg; + + /* ok, now we're able to remove it from list */ + list_del(&ppm->node); + if(omsg->mhead.attr & SXMSG_CLOSED) { /* message is closed - destroy it */ + pthread_mutex_unlock(&omsg->wait); + pthread_mutex_destroy(&omsg->wait); + free(omsg); + } else { /* wake up */ + omsg->mhead.opcode = SNE_LINKERROR; + pthread_mutex_unlock(&omsg->wait); + } + free(ppm); + co->pending_messages--; + } + pthread_mutex_unlock(&co->write_pending_lock); } /* go thru messages */ @@ -503,11 +523,14 @@ static void *__sntll_thread(void *b) void *buf = bun->buf; char *bbuf = (char*)buf; sntllv2_head_t *mhead = (sntllv2_head_t *)buf; - sxmsg_t *msg; + sxmsg_t *msg, *omsg; sexp_t *sx; chnl_t *channel; + list_node_t *iter, *siter; + ppmsg_t *ppm; pthread_t self = pthread_self(); - int dispatch = 0; + struct timespec wtick; + int dispatch = 0, e; size_t rd, wr; ulong_t mid; @@ -537,7 +560,37 @@ static void *__sntll_thread(void *b) */ while(1) { __again: - pthread_mutex_lock(&(co->sslinout[0])); + if(co->flags & SNSX_CLOSED) goto __finish; /* go away if required asap */ + /* works with pending messages */ + if(co->pending_messages) { + pthread_mutex_lock(&co->write_pending_lock); + list_for_each_safe(&co->write_pending, iter, siter) { + ppm = container_of(iter, ppmsg_t, node); + omsg = ppm->msg; + if(_sntll_writemsg(co, omsg) != SNE_SUCCESS) { + pthread_mutex_unlock(&co->write_pending_lock); + goto __finish; /* write failed - finishing ... */ + } + + /* ok, now we're able to remove it from list */ + list_del(&ppm->node); + if(omsg->mhead.attr & SXMSG_CLOSED) { /* message is closed - destroy it */ + pthread_mutex_unlock(&omsg->wait); + pthread_mutex_destroy(&omsg->wait); + free(omsg); + } + free(ppm); + co->pending_messages--; + } + pthread_mutex_unlock(&co->write_pending_lock); + } + + if(!dispatch) pthread_mutex_lock(&(co->sslinout[0])); + else { /* dispatch thread ticking every ət */ + wtick.tv_sec = time(NULL) + 1; + e = pthread_mutex_timedlock(&(co->sslinout[0]), &wtick); + if(e == ETIMEDOUT) goto __again; + } if(co->flags & SNSX_CLOSED) { pthread_mutex_unlock(&(co->sslinout[0])); goto __finish;