added pulses, minor fixes

v0.5.xx
Alexander Vdolainen 10 years ago
parent 6e18271550
commit e9c0d3a2e2

@ -508,16 +508,17 @@ static int __default_ch_set_types(void *cctx, sexp_t *sx)
} else continue; /* ignore */ } else continue; /* ignore */
} }
__send_reply: __send_reply:
snprintf(buf, 1024, "(ch-gl-error (%d))", r); snprintf(buf, 1024, "(ch-gl-error (%d))", r);
if(__conn_write(co, buf, strlen(buf)) < 0) { if(__conn_write(co, buf, strlen(buf)) < 0) {
co->flags &= ~CXCONN_ESTABL; co->flags &= ~CXCONN_ESTABL;
co->flags |= CXCONN_BROKEN; co->flags |= CXCONN_BROKEN;
__wake_up_waiters(co, ESXNOCONNECT); __wake_up_waiters(co, ESXNOCONNECT);
} }
destroy_sexp(sx); destroy_sexp(sx);
return r;
return r;
} }
static int __default_ch_gl_error(void *cctx, sexp_t *sx) static int __default_ch_gl_error(void *cctx, sexp_t *sx)

@ -174,7 +174,9 @@ static int __message_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec
__destroy_msg(m); __destroy_msg(m);
} else { } else {
*msg = m; *msg = m;
r = SXOREPLYREQ; if(m->opcode == ESXNOCONNECT)
r = m->opcode;
else r = SXOREPLYREQ;
} }
} }
@ -252,7 +254,6 @@ int msg_reply_timed(sxmsg_t *msg, sexp_t *sx, struct timespec *tio)
return __msg_reply(msg, sx, tio, 0); return __msg_reply(msg, sx, tio, 0);
} }
//TODO: continue. Implement wait for delivery and queue addition
/* /*
* How message sending works: * How message sending works:
* 1. Create a message structure assigned to the channel, * 1. Create a message structure assigned to the channel,
@ -260,7 +261,8 @@ int msg_reply_timed(sxmsg_t *msg, sexp_t *sx, struct timespec *tio)
* 3. Put the message to the queue * 3. Put the message to the queue
* 4. Wait for job execution * 4. Wait for job execution
*/ */
static int __message_send_pulse(chnl_t *ch, sexp_t *sx, struct timespec *tio) static int __message_send_pulse(chnl_t *ch, sexp_t *sx, struct timespec *tio,
int nowait)
{ {
int r = 0; int r = 0;
sxmsg_t *m = NULL; sxmsg_t *m = NULL;
@ -290,12 +292,15 @@ static int __message_send_pulse(chnl_t *ch, sexp_t *sx, struct timespec *tio)
r = pth_queue_add(co->mqueue, (void *)m, USR_MSG); r = pth_queue_add(co->mqueue, (void *)m, USR_MSG);
if(r) return r; /* FIXME: better give up */ if(r) return r; /* FIXME: better give up */
if(m->flags & ESXMSG_PENDING) { if(!nowait) {
if(!tio) pthread_mutex_lock(&(m->wait)); if(m->flags & ESXMSG_PENDING) {
else pthread_mutex_timedlock(&(m->wait), tio); if(!tio) pthread_mutex_lock(&(m->wait));
else pthread_mutex_timedlock(&(m->wait), tio);
}
if(tio && (m->flags & ESXMSG_PENDING))
return SXOTIMEDOUT;
} }
if(tio && (m->flags & ESXMSG_PENDING))
return SXOTIMEDOUT;
if(!m->payload) { if(!m->payload) {
r = m->opcode; r = m->opcode;
/* first remove the message from tree */ /* first remove the message from tree */
@ -316,15 +321,15 @@ static int __message_send_pulse(chnl_t *ch, sexp_t *sx, struct timespec *tio)
int msg_send_pulse(chnl_t *ch, sexp_t *sx) int msg_send_pulse(chnl_t *ch, sexp_t *sx)
{ {
return __message_send_pulse(ch, sx, NULL); return __message_send_pulse(ch, sx, NULL, 0);
} }
int msg_send_pulse_timed(chnl_t *ch, sexp_t *sx, struct timespec *tio) int msg_send_pulse_timed(chnl_t *ch, sexp_t *sx, struct timespec *tio)
{ {
return 0; return __message_send_pulse(ch, sx, tio, 0);
} }
int msg_send_pulse_nowait(chnl_t *ch, sexp_t *sx) int msg_send_pulse_nowait(chnl_t *ch, sexp_t *sx)
{ {
return 0; return __message_send_pulse(ch, sx, NULL, 1);
} }

@ -413,6 +413,7 @@ int pth_dqtpoll_destroy(pth_dqtpoll_t *tpoll, int force)
pthread_rwlock_unlock(&(tpoll->stats_lock)); pthread_rwlock_unlock(&(tpoll->stats_lock));
pth_queue_add(tpoll->queue, &tmpmsg, 0); /* spurious */ pth_queue_add(tpoll->queue, &tmpmsg, 0); /* spurious */
} }
usleep(100); /* just to sleep and free timeslice to others */
} }
/* free all */ /* free all */

Loading…
Cancel
Save