From e9c0d3a2e27858e4ee5bd05b00dc8c7ae0d6f1f2 Mon Sep 17 00:00:00 2001 From: Alexander Vdolainen Date: Mon, 2 Feb 2015 15:13:52 +0200 Subject: [PATCH] added pulses, minor fixes --- lib/connection.c | 5 +++-- lib/message.c | 27 ++++++++++++++++----------- lib/queue.c | 1 + 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/lib/connection.c b/lib/connection.c index 0ee6816..97afff2 100644 --- a/lib/connection.c +++ b/lib/connection.c @@ -508,16 +508,17 @@ static int __default_ch_set_types(void *cctx, sexp_t *sx) } else continue; /* ignore */ } -__send_reply: + __send_reply: snprintf(buf, 1024, "(ch-gl-error (%d))", r); if(__conn_write(co, buf, strlen(buf)) < 0) { co->flags &= ~CXCONN_ESTABL; co->flags |= CXCONN_BROKEN; __wake_up_waiters(co, ESXNOCONNECT); } + destroy_sexp(sx); + return r; - } static int __default_ch_gl_error(void *cctx, sexp_t *sx) diff --git a/lib/message.c b/lib/message.c index f3058d5..0dac07c 100644 --- a/lib/message.c +++ b/lib/message.c @@ -174,7 +174,9 @@ static int __message_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec __destroy_msg(m); } else { *msg = m; - r = SXOREPLYREQ; + if(m->opcode == ESXNOCONNECT) + r = m->opcode; + else r = SXOREPLYREQ; } } @@ -252,7 +254,6 @@ int msg_reply_timed(sxmsg_t *msg, sexp_t *sx, struct timespec *tio) return __msg_reply(msg, sx, tio, 0); } -//TODO: continue. Implement wait for delivery and queue addition /* * How message sending works: * 1. Create a message structure assigned to the channel, @@ -260,7 +261,8 @@ int msg_reply_timed(sxmsg_t *msg, sexp_t *sx, struct timespec *tio) * 3. Put the message to the queue * 4. Wait for job execution */ -static int __message_send_pulse(chnl_t *ch, sexp_t *sx, struct timespec *tio) +static int __message_send_pulse(chnl_t *ch, sexp_t *sx, struct timespec *tio, + int nowait) { int r = 0; sxmsg_t *m = NULL; @@ -290,12 +292,15 @@ static int __message_send_pulse(chnl_t *ch, sexp_t *sx, struct timespec *tio) r = pth_queue_add(co->mqueue, (void *)m, USR_MSG); if(r) return r; /* FIXME: better give up */ - if(m->flags & ESXMSG_PENDING) { - if(!tio) pthread_mutex_lock(&(m->wait)); - else pthread_mutex_timedlock(&(m->wait), tio); + if(!nowait) { + if(m->flags & ESXMSG_PENDING) { + if(!tio) pthread_mutex_lock(&(m->wait)); + else pthread_mutex_timedlock(&(m->wait), tio); + } + if(tio && (m->flags & ESXMSG_PENDING)) + return SXOTIMEDOUT; } - if(tio && (m->flags & ESXMSG_PENDING)) - return SXOTIMEDOUT; + if(!m->payload) { r = m->opcode; /* first remove the message from tree */ @@ -316,15 +321,15 @@ static int __message_send_pulse(chnl_t *ch, sexp_t *sx, struct timespec *tio) int msg_send_pulse(chnl_t *ch, sexp_t *sx) { - return __message_send_pulse(ch, sx, NULL); + return __message_send_pulse(ch, sx, NULL, 0); } int msg_send_pulse_timed(chnl_t *ch, sexp_t *sx, struct timespec *tio) { - return 0; + return __message_send_pulse(ch, sx, tio, 0); } int msg_send_pulse_nowait(chnl_t *ch, sexp_t *sx) { - return 0; + return __message_send_pulse(ch, sx, NULL, 1); } diff --git a/lib/queue.c b/lib/queue.c index d270dc6..da6d9fb 100644 --- a/lib/queue.c +++ b/lib/queue.c @@ -413,6 +413,7 @@ int pth_dqtpoll_destroy(pth_dqtpoll_t *tpoll, int force) pthread_rwlock_unlock(&(tpoll->stats_lock)); pth_queue_add(tpoll->queue, &tmpmsg, 0); /* spurious */ } + usleep(100); /* just to sleep and free timeslice to others */ } /* free all */