pending messages ready;

v0.5.xx
Alexander Vdolainen 10 years ago
parent f3d13966af
commit 617e170087

@ -86,7 +86,7 @@ typedef struct __connection_t {
pthread_mutex_t idx_msg_lock;
list_head_t write_pending; /** < list of messages waiting for write */
pthread_mutex_t write_pending_lock;
volatile uint8_t unused_messages; /** < unused message count */
volatile uint8_t pending_messages; /** < pending message count */
/* Other stuff */
pthread_t thrd_poll[8];
volatile uint8_t flags; /** < flags of the connection */

@ -108,6 +108,7 @@ static inline int __sxmsg_send(chnl_t *channel, const char *data, size_t datalen
/* under locking here */
pthread_mutex_lock(&co->write_pending_lock);
co->pending_messages++;
list_add2tail(&co->write_pending, &ppm->node); /* push it to the FIFO */
pthread_mutex_unlock(&co->write_pending_lock);
}
@ -207,6 +208,7 @@ static inline int __sxmsg_reply(sxmsg_t *msg, const char *data,
/* under locking here */
pthread_mutex_lock(&co->write_pending_lock);
co->pending_messages++;
list_add2tail(&co->write_pending, &ppm->node); /* push it to the FIFO */
pthread_mutex_unlock(&co->write_pending_lock);
}
@ -310,6 +312,7 @@ static inline int __sxmsg_return(sxmsg_t *msg, int opcode, int pp)
/* under locking here */
pthread_mutex_lock(&co->write_pending_lock);
co->pending_messages++;
list_add2tail(&co->write_pending, &ppm->node); /* push it to the FIFO */
pthread_mutex_unlock(&co->write_pending_lock);

@ -443,14 +443,34 @@ static int __eval_syssexp(conn_t *co, sexp_t *sx)
static void __connection_destroy(conn_t *co)
{
int i = 0;
sxmsg_t *msg;
sxmsg_t *msg, *omsg;
ppmsg_t *ppm;
list_node_t *iter, *siter;
chnl_t *chan;
sntllv2_head_t *head;
conn_sys_t *ssys = co->ssys;
/* first we will unpin all messages and mark it as errors on */
if(co->unused_messages) {
/*TODO: kill it all */
if(co->pending_messages) {
pthread_mutex_lock(&co->write_pending_lock);
list_for_each_safe(&co->write_pending, iter, siter) {
ppm = container_of(iter, ppmsg_t, node);
omsg = ppm->msg;
/* ok, now we're able to remove it from list */
list_del(&ppm->node);
if(omsg->mhead.attr & SXMSG_CLOSED) { /* message is closed - destroy it */
pthread_mutex_unlock(&omsg->wait);
pthread_mutex_destroy(&omsg->wait);
free(omsg);
} else { /* wake up */
omsg->mhead.opcode = SNE_LINKERROR;
pthread_mutex_unlock(&omsg->wait);
}
free(ppm);
co->pending_messages--;
}
pthread_mutex_unlock(&co->write_pending_lock);
}
/* go thru messages */
@ -503,11 +523,14 @@ static void *__sntll_thread(void *b)
void *buf = bun->buf;
char *bbuf = (char*)buf;
sntllv2_head_t *mhead = (sntllv2_head_t *)buf;
sxmsg_t *msg;
sxmsg_t *msg, *omsg;
sexp_t *sx;
chnl_t *channel;
list_node_t *iter, *siter;
ppmsg_t *ppm;
pthread_t self = pthread_self();
int dispatch = 0;
struct timespec wtick;
int dispatch = 0, e;
size_t rd, wr;
ulong_t mid;
@ -537,7 +560,37 @@ static void *__sntll_thread(void *b)
*/
while(1) {
__again:
pthread_mutex_lock(&(co->sslinout[0]));
if(co->flags & SNSX_CLOSED) goto __finish; /* go away if required asap */
/* works with pending messages */
if(co->pending_messages) {
pthread_mutex_lock(&co->write_pending_lock);
list_for_each_safe(&co->write_pending, iter, siter) {
ppm = container_of(iter, ppmsg_t, node);
omsg = ppm->msg;
if(_sntll_writemsg(co, omsg) != SNE_SUCCESS) {
pthread_mutex_unlock(&co->write_pending_lock);
goto __finish; /* write failed - finishing ... */
}
/* ok, now we're able to remove it from list */
list_del(&ppm->node);
if(omsg->mhead.attr & SXMSG_CLOSED) { /* message is closed - destroy it */
pthread_mutex_unlock(&omsg->wait);
pthread_mutex_destroy(&omsg->wait);
free(omsg);
}
free(ppm);
co->pending_messages--;
}
pthread_mutex_unlock(&co->write_pending_lock);
}
if(!dispatch) pthread_mutex_lock(&(co->sslinout[0]));
else { /* dispatch thread ticking every ət */
wtick.tv_sec = time(NULL) + 1;
e = pthread_mutex_timedlock(&(co->sslinout[0]), &wtick);
if(e == ETIMEDOUT) goto __again;
}
if(co->flags & SNSX_CLOSED) {
pthread_mutex_unlock(&(co->sslinout[0]));
goto __finish;

Loading…
Cancel
Save