connection read/write fixed, few race conditions fixed, added faster (rapid) message;
This commit is contained in:
parent
5ffcd38303
commit
27f1faed01
@ -23,12 +23,13 @@
|
||||
#include <sntl/pth_queue.h>
|
||||
|
||||
/* error codes */
|
||||
#define SXOREPLYREQ 44 /* protocol require reply with expression,
|
||||
* or expression return for the request */
|
||||
#define SXOTIMEDOUT 45 /* timedout */
|
||||
#define ESXRCBADPROT 46 /* invalid protocol */
|
||||
#define ESXNOCONNECT 47 /* connection is lost */
|
||||
#define ESXNOCHANSUP 48
|
||||
#define ESXOREPLYREQ 44 /* protocol require reply with expression,
|
||||
* or expression return for the request */
|
||||
#define ESXOTIMEDOUT 45 /* timedout */
|
||||
#define ESXRCBADPROT 46 /* invalid protocol */
|
||||
#define ESXNOCONNECT 47 /* connection is lost */
|
||||
#define ESXNOCHANSUP 48
|
||||
#define ESXRAPIDREPLY 49
|
||||
|
||||
/* sexp helpers */
|
||||
#define SEXP_IS_LIST(sx) \
|
||||
@ -136,6 +137,7 @@ typedef struct __sexp_payload_t {
|
||||
#define ESXMSG_CLOSURE (1 << 9)
|
||||
#define ESXMSG_RMONRETR (1 << 10)
|
||||
#define ESXMSG_KILLTHRD (1 << 11)
|
||||
#define ESXMSG_ISRAPID (1 << 12)
|
||||
|
||||
/**
|
||||
* \brief Message used in sntl message passing
|
||||
@ -147,8 +149,6 @@ typedef struct __message_t {
|
||||
chnl_t *pch; /** < channel of the message(if applicable) */
|
||||
ulong_t mid; /** < unique ID within connection context */
|
||||
char *uuid; /** < UUID of the message, used for special messages */
|
||||
usrtc_node_t chnl_node; /** < node for channel search tree */
|
||||
usrtc_node_t poll_node; /** < node for the poll of the messages */
|
||||
usrtc_node_t pendingq_node; /** < node for the pending queue */
|
||||
pthread_mutex_t wait; /** < special wait mutex, used for sync */
|
||||
void *payload; /** < payload */
|
||||
@ -251,6 +251,12 @@ int msg_reply(sxmsg_t *msg, sexp_t *sx);
|
||||
|
||||
int msg_reply_timed(sxmsg_t *msg, sexp_t *sx, struct timespec *tio);
|
||||
|
||||
/* reply with S expression without confirmation of delivery and applying */
|
||||
int msg_reply_rapid(sxmsg_t *msg, sexp_t *sx);
|
||||
|
||||
/* this is required to clean the message in case if it's a rapid message */
|
||||
int msg_rapid_clean(sxmsg_t *msg);
|
||||
|
||||
int msg_send_pulse(chnl_t *ch, sexp_t *sx);
|
||||
|
||||
int msg_send_pulse_timed(chnl_t *ch, sexp_t *sx, struct timespec *tio);
|
||||
|
256
lib/connection.c
256
lib/connection.c
@ -70,6 +70,7 @@ static int __rpc_callback(void *data)
|
||||
int rc = 0;
|
||||
|
||||
rc = job->rpcf((void *)(job->msg), job->sx);
|
||||
|
||||
free(job);
|
||||
|
||||
return rc;
|
||||
@ -81,47 +82,61 @@ extern int __alloc_channel(ulong_t cid, conn_t *co, rpc_typed_list_t *rlist,
|
||||
static int __conn_read(conn_t *co, void *buf, size_t buf_len)
|
||||
{
|
||||
int rfd = SSL_get_fd(co->ssl), r;
|
||||
fd_set readset;
|
||||
fd_set readset, writeset;
|
||||
int ofcmode, read_blocked = 0, read_blocked_on_write = 0;
|
||||
|
||||
//fprintf(stderr, "\tListening ... on %s\n", co->uuid);
|
||||
/* get prepare to select */
|
||||
FD_ZERO(&readset);
|
||||
FD_SET(rfd, &readset);
|
||||
/* waits until something will be ready to read */
|
||||
r = select(FD_SETSIZE, &readset, NULL, NULL, NULL);
|
||||
if(r < 0) {
|
||||
printf("select (%d)\n", errno);
|
||||
return -1;
|
||||
}
|
||||
if(!r) {
|
||||
printf("Nothing to wait for\n");
|
||||
return 0;
|
||||
}
|
||||
/* First we make the socket nonblocking */
|
||||
ofcmode = fcntl(rfd, F_GETFL,0);
|
||||
ofcmode |= O_NDELAY;
|
||||
if(fcntl(rfd, F_SETFL, ofcmode))
|
||||
fprintf(stderr, "Couldn't make socket nonblocking");
|
||||
|
||||
if(r && FD_ISSET(rfd, &readset)) {
|
||||
do {
|
||||
//pthread_mutex_lock(&(co->oplock));
|
||||
/* ok, now we're ready to perform SSL_read */
|
||||
r = SSL_read(co->ssl, buf, (int)buf_len);
|
||||
switch(SSL_get_error(co->ssl, r)) {
|
||||
case SSL_ERROR_NONE:
|
||||
//printf("Read done (f:%d)\n", rfd);
|
||||
/* this is means we're get ridden it all */
|
||||
return r; break;
|
||||
case SSL_ERROR_ZERO_RETURN:
|
||||
printf("No data to read\n");
|
||||
/* no data to read ... */
|
||||
return 0; break;
|
||||
case SSL_ERROR_WANT_READ:
|
||||
case SSL_ERROR_WANT_WRITE:
|
||||
printf("Bypass until SSL buffer not ready.\n");
|
||||
return 0;
|
||||
default: /* seems the connection lost */
|
||||
fprintf(stderr, "(RD)Unknown error on %s\n", co->uuid);
|
||||
return -1;
|
||||
}
|
||||
//pthread_mutex_unlock(&(co->oplock));
|
||||
} while(SSL_pending(co->ssl));
|
||||
__retry:
|
||||
|
||||
do {
|
||||
r = SSL_read(co->ssl, buf, (int)buf_len);
|
||||
switch(SSL_get_error (co->ssl, r)) {
|
||||
case SSL_ERROR_NONE:
|
||||
return r;
|
||||
break;
|
||||
case SSL_ERROR_WANT_READ:
|
||||
/* get prepare to select */
|
||||
read_blocked = 1;
|
||||
break;
|
||||
case SSL_ERROR_WANT_WRITE: /* here we blocked on write */
|
||||
read_blocked_on_write = 1;
|
||||
break;
|
||||
default:
|
||||
fprintf(stderr, "(RD)Unknown error on %s\n", co->uuid);
|
||||
return -1;
|
||||
}
|
||||
} while(SSL_pending(co->ssl) && !read_blocked);
|
||||
|
||||
if(read_blocked) {
|
||||
FD_ZERO(&readset);
|
||||
FD_SET(rfd, &readset);
|
||||
/* waits until something will be ready to read */
|
||||
r = select(rfd + 1, &readset, NULL, NULL, NULL);
|
||||
if(r < 0) {
|
||||
printf("select (%d)\n", errno);
|
||||
return -1;
|
||||
}
|
||||
if(!r) {
|
||||
printf("Nothing to wait for\n");
|
||||
return 0;
|
||||
}
|
||||
read_blocked = 0;
|
||||
if(r && FD_ISSET(rfd, &readset)) goto __retry; /* try to read again */
|
||||
}
|
||||
if(read_blocked_on_write) { /* we was blocked on write */
|
||||
FD_ZERO(&readset);
|
||||
FD_ZERO(&writeset);
|
||||
FD_SET(rfd, &readset);
|
||||
FD_SET(rfd, &writeset);
|
||||
|
||||
r = select(rfd + 1, &readset, &writeset, NULL, NULL);
|
||||
read_blocked_on_write = 0;
|
||||
if(r && FD_ISSET(rfd, &writeset)) goto __retry;
|
||||
}
|
||||
|
||||
return 0;
|
||||
@ -129,24 +144,29 @@ static int __conn_read(conn_t *co, void *buf, size_t buf_len)
|
||||
|
||||
static int __conn_write(conn_t *co, void *buf, size_t buf_len)
|
||||
{
|
||||
int r;
|
||||
int r, rfd = SSL_get_fd(co->ssl);
|
||||
fd_set writeset;
|
||||
|
||||
pthread_mutex_lock(&(co->oplock));
|
||||
//pthread_mutex_lock(&(co->oplock));
|
||||
__retry:
|
||||
r = SSL_write(co->ssl, buf, (int)buf_len);
|
||||
switch(SSL_get_error(co->ssl, r)) {
|
||||
case SSL_ERROR_WANT_READ:
|
||||
case SSL_ERROR_WANT_WRITE:
|
||||
goto __retry;
|
||||
/* here we should block */
|
||||
FD_ZERO(&writeset);
|
||||
FD_SET(rfd, &writeset);
|
||||
r = select(rfd + 1, NULL, &writeset, NULL, NULL);
|
||||
if(r && FD_ISSET(rfd, &writeset)) goto __retry;
|
||||
break;
|
||||
default:
|
||||
pthread_mutex_unlock(&(co->oplock));
|
||||
//pthread_mutex_unlock(&(co->oplock));
|
||||
if(r < 0) {
|
||||
fprintf(stderr, "(WR)Unknown error on %s (%d)\n", co->uuid, r);
|
||||
return -1;
|
||||
} else return 0;
|
||||
}
|
||||
pthread_mutex_unlock(&(co->oplock));
|
||||
//pthread_mutex_unlock(&(co->oplock));
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -861,7 +881,7 @@ static int __default_ch_close_ret(void *cctx, sexp_t *sx)
|
||||
/* take length of the list */
|
||||
llen = sexp_list_length(lsx);
|
||||
if(!llen) return EINVAL; /* !! other side will not set any security attributes */
|
||||
;
|
||||
|
||||
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
|
||||
if(SEXP_IS_LIST(sx_iter)) {
|
||||
sexp_list_car(sx_iter, &sx_in);
|
||||
@ -892,7 +912,7 @@ static int __default_ch_close_ret(void *cctx, sexp_t *sx)
|
||||
}
|
||||
} else continue; /* ignore */
|
||||
}
|
||||
;
|
||||
|
||||
/* try to find desired channel to intercept message */
|
||||
pthread_rwlock_rdlock(&(co->chnl_lock));
|
||||
node = usrtc_lookup(co->chnl_tree, (void *)&id);
|
||||
@ -902,9 +922,9 @@ static int __default_ch_close_ret(void *cctx, sexp_t *sx)
|
||||
chan = (chnl_t *)usrtc_node_getdata(node);
|
||||
sms = chan->sysmsg;
|
||||
}
|
||||
;
|
||||
|
||||
__mark_msg:
|
||||
;
|
||||
|
||||
if(!sms) return r;
|
||||
sms->flags &= ~ESXMSG_PENDING; /* the message is done */
|
||||
sms->opcode = err;
|
||||
@ -1134,7 +1154,6 @@ static int __default_msg(void *cctx, sexp_t *sx)
|
||||
usrtc_insert(chan->msgs_tree, &(smsg->pendingq_node), &(smsg->mid));
|
||||
pthread_rwlock_unlock(&(chan->msglock));
|
||||
} else {
|
||||
//printf(">>>>>>>>>>>>>>>>>>>msg_id = %lu\n", msg_id);
|
||||
pthread_rwlock_unlock(&(chan->msglock));
|
||||
smsg = (sxmsg_t *)usrtc_node_getdata(node);
|
||||
msg_return(smsg, EEXIST);
|
||||
@ -1230,7 +1249,7 @@ static int __default_msg_return(void *cctx, sexp_t *sx)
|
||||
r = ESXRCBADPROT;
|
||||
goto __finish;
|
||||
}
|
||||
//printf("chnl_id = %ld\n", chnl_id);
|
||||
|
||||
if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT;
|
||||
else chan = (chnl_t *)usrtc_node_getdata(node);
|
||||
/* lookup for the message */
|
||||
@ -1256,8 +1275,114 @@ static int __default_msg_return(void *cctx, sexp_t *sx)
|
||||
usrtc_delete(chan->msgs_tree, &(smsg->pendingq_node));
|
||||
pthread_rwlock_unlock(&(chan->msglock));
|
||||
}
|
||||
if(smsg->flags & ESXMSG_PENDING) pthread_mutex_unlock(&(smsg->wait));
|
||||
else {
|
||||
if(smsg->flags & ESXMSG_PENDING) {
|
||||
destroy_sexp(sx);
|
||||
pthread_mutex_unlock(&(smsg->wait));
|
||||
return r;
|
||||
} else {
|
||||
/* nobody want it */
|
||||
destroy_sexp(smsg->initial_sx);
|
||||
__destroy_msg(smsg);
|
||||
}
|
||||
}
|
||||
|
||||
__finish:
|
||||
destroy_sexp(sx);
|
||||
return r;
|
||||
}
|
||||
|
||||
static int __default_msg_rapid(void *cctx, sexp_t *sx)
|
||||
{
|
||||
conn_t *co = (conn_t *)cctx;
|
||||
usrtc_node_t *node = NULL;
|
||||
chnl_t *chan = NULL;
|
||||
int r = 0;
|
||||
sexp_t *lsx = NULL, *sx_iter = NULL;
|
||||
sexp_t *sx_sublist = NULL, *sx_value = NULL;
|
||||
ulong_t chnl_id = -1;
|
||||
ulong_t msg_id = -1;
|
||||
sexp_t *msg = NULL;
|
||||
sxmsg_t *smsg = NULL;
|
||||
int idx;
|
||||
|
||||
/* get parameters from the message */
|
||||
if(sexp_list_cdr(sx, &lsx)) {
|
||||
r = ESXRCBADPROT;
|
||||
goto __finish;
|
||||
}
|
||||
if(!SEXP_IS_LIST(lsx)) {
|
||||
r = ESXRCBADPROT;
|
||||
goto __finish;
|
||||
}
|
||||
|
||||
/* get parameters */
|
||||
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
|
||||
if(SEXP_IS_LIST(sx_iter)) {
|
||||
sx_sublist = sx_iter;
|
||||
continue;
|
||||
} else {
|
||||
if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) {
|
||||
if(strcmp(sx_iter->val, ":chid")) {
|
||||
continue; // ignore it
|
||||
}
|
||||
sx_value = sx_iter->next;
|
||||
if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) {
|
||||
continue;
|
||||
}
|
||||
chnl_id = atol(sx_value->val);
|
||||
} else continue; // ignore it
|
||||
}
|
||||
}
|
||||
lsx = sx_sublist;
|
||||
/* find message id */
|
||||
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
|
||||
if(SEXP_IS_LIST(sx_iter)) {
|
||||
msg = sx_iter;
|
||||
continue;
|
||||
} else {
|
||||
if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) {
|
||||
if(strcmp(sx_iter->val, ":msgid")) {
|
||||
continue; // ignore
|
||||
}
|
||||
sx_value = sx_iter->next;
|
||||
if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) {
|
||||
continue;
|
||||
}
|
||||
msg_id = atol(sx_value->val);
|
||||
} else continue; // ignore it
|
||||
}
|
||||
}
|
||||
|
||||
if(msg_id < 0 || chnl_id < 0) {
|
||||
r = ESXRCBADPROT;
|
||||
goto __finish;
|
||||
}
|
||||
|
||||
if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT;
|
||||
else chan = (chnl_t *)usrtc_node_getdata(node);
|
||||
/* lookup for the message */
|
||||
pthread_rwlock_rdlock(&(chan->msglock));
|
||||
if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) {
|
||||
pthread_rwlock_unlock(&(chan->msglock));
|
||||
r = ENOENT;
|
||||
goto __finish;
|
||||
} else {
|
||||
pthread_rwlock_unlock(&(chan->msglock));
|
||||
smsg = (sxmsg_t *)usrtc_node_getdata(node);
|
||||
smsg->opcode = ESXRAPIDREPLY;
|
||||
smsg->payload = copy_sexp(msg);
|
||||
smsg->flags |= ESXMSG_ISREPLY;
|
||||
/* are we'are ready to remove this ? */
|
||||
if(smsg->flags & ESXMSG_RMONRETR) {
|
||||
pthread_rwlock_wrlock(&(chan->msglock));
|
||||
usrtc_delete(chan->msgs_tree, &(smsg->pendingq_node));
|
||||
pthread_rwlock_unlock(&(chan->msglock));
|
||||
}
|
||||
if(smsg->flags & ESXMSG_PENDING) {
|
||||
destroy_sexp(sx);
|
||||
pthread_mutex_unlock(&(smsg->wait));
|
||||
return r;
|
||||
} else {
|
||||
/* nobody want it */
|
||||
destroy_sexp(smsg->initial_sx);
|
||||
__destroy_msg(smsg);
|
||||
@ -1347,7 +1472,7 @@ static int __default_msg_reply(void *cctx, sexp_t *sx)
|
||||
} else {
|
||||
pthread_rwlock_unlock(&(chan->msglock));
|
||||
smsg = (sxmsg_t *)usrtc_node_getdata(node);
|
||||
smsg->opcode = SXOREPLYREQ;
|
||||
smsg->opcode = ESXOREPLYREQ;
|
||||
smsg->payload = copy_sexp(msg);
|
||||
smsg->flags |= ESXMSG_ISREPLY;
|
||||
pthread_mutex_unlock(&(smsg->wait));
|
||||
@ -1377,6 +1502,7 @@ static int __init_systemrpc_tree(usrtc_t *rtree)
|
||||
if(__insert_rpc_function(rtree, "ch-msg-pulse-ret", __default_msg_pulse_ret)) goto __fail;
|
||||
if(__insert_rpc_function(rtree, "ch-msg", __default_msg)) goto __fail;
|
||||
if(__insert_rpc_function(rtree, "ch-msg-rete", __default_msg_return)) goto __fail;
|
||||
if(__insert_rpc_function(rtree, "ch-msg-rapid", __default_msg_rapid)) goto __fail;
|
||||
if(__insert_rpc_function(rtree, "ch-msg-repl", __default_msg_reply)) goto __fail;
|
||||
|
||||
return 0;
|
||||
@ -1442,7 +1568,6 @@ static void *__cxmaster_thread_listener(void *wctx)
|
||||
|
||||
while((r = __conn_read(co, buf, 4096)) != -1) {
|
||||
buf[r] = '\0';
|
||||
// if(r) printf("Got the message %s (%d bytes)\n", buf, r);
|
||||
r = __eval_cstr(buf, conn_sys->system_rpc, co);
|
||||
}
|
||||
co->flags &= ~CXCONN_ESTABL;
|
||||
@ -1499,7 +1624,6 @@ static void *__rmsg_queue_thread(void *ctx)
|
||||
/* get the function name */
|
||||
if(sx->ty == SEXP_LIST) rpcf = sx->list->val;
|
||||
else {
|
||||
//rpcf = sx->val;
|
||||
r = ESXRCBADPROT;
|
||||
goto __err_ret;
|
||||
}
|
||||
@ -1562,6 +1686,7 @@ static void *__msg_queue_thread(void *ctx)
|
||||
continue;
|
||||
} else {
|
||||
ch = msg->pch;
|
||||
|
||||
/* now we need to complete the request */
|
||||
sx = (sexp_t *)msg->payload; tb = buf;
|
||||
if(!(msg->flags & ESXMSG_PULSE)) { /* %s))) */
|
||||
@ -1577,8 +1702,15 @@ static void *__msg_queue_thread(void *ctx)
|
||||
/* ok, here we will write it and wait, destroying dialog while reply */
|
||||
|
||||
goto __ssl_write;
|
||||
} else snprintf(buf, 4096, "(ch-msg-repl (:chid %lu (:msgid %lu ", ch->cid,
|
||||
msg->mid);
|
||||
} else {
|
||||
if(msg->flags & ESXMSG_ISRAPID) {
|
||||
msg->flags |= ESXMSG_CLOSURE;
|
||||
snprintf(buf, 4096, "(ch-msg-rapid (:chid %lu (:msgid %lu ", ch->cid,
|
||||
msg->mid);
|
||||
} else
|
||||
snprintf(buf, 4096, "(ch-msg-repl (:chid %lu (:msgid %lu ", ch->cid,
|
||||
msg->mid);
|
||||
}
|
||||
}
|
||||
|
||||
len = strlen(buf);
|
||||
@ -1629,13 +1761,15 @@ static void *__msg_queue_thread(void *ctx)
|
||||
|
||||
__ssl_write:
|
||||
if(msg->flags & ESXMSG_CLOSURE) {
|
||||
/* wake up it */
|
||||
pthread_mutex_unlock(&(msg->wait));
|
||||
/* first remove the message from tree */
|
||||
pthread_rwlock_wrlock(&(ch->msglock));
|
||||
usrtc_delete(ch->msgs_tree, &(msg->pendingq_node));
|
||||
pthread_rwlock_unlock(&(ch->msglock));
|
||||
/* destroy */
|
||||
destroy_sexp(msg->initial_sx);
|
||||
if(msg->flags & ESXMSG_ISREPLY && msg->payload)
|
||||
if(msg->flags & ESXMSG_ISREPLY && msg->payload)
|
||||
destroy_sexp((sexp_t *)msg->payload);
|
||||
__destroy_msg(msg);
|
||||
}
|
||||
@ -1646,6 +1780,7 @@ static void *__msg_queue_thread(void *ctx)
|
||||
co->flags |= CXCONN_BROKEN;
|
||||
__wake_up_waiters(co, ESXNOCONNECT);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
len = 0;
|
||||
@ -2024,12 +2159,14 @@ int connection_initiate(conn_t *co, const char *host, int port,
|
||||
/* now we will create an SSL connection */
|
||||
co->ssl = SSL_new(co->ctx);
|
||||
SSL_set_fd(co->ssl, sd); /* attach connected socket */
|
||||
BIO_set_nbio(SSL_get_rbio(co->ssl), 1);
|
||||
if(SSL_connect(co->ssl) == -1) {
|
||||
r = EBADE;
|
||||
free(buf);
|
||||
/* shutdown connection */
|
||||
goto __fail_3;
|
||||
} /* if success we're ready to use established SSL channel */
|
||||
BIO_set_nbio(SSL_get_rbio(co->ssl), 1);
|
||||
/* auth and RPC contexts sync */
|
||||
co->pctx = pctx;
|
||||
snprintf(buf, __TMPBUFLEN, "(auth-set-context ((:user \"%s\")(:passwd \"%s\")))",
|
||||
@ -2201,6 +2338,7 @@ int connection_create(conn_t *co, int sck)
|
||||
SSL_set_fd(co->ssl, sck); /* attach connected socket */
|
||||
/* set the context to verify ssl connection */
|
||||
SSL_set_ex_data(co->ssl, conn_sys->ex_ssldata_index, (void *)co);
|
||||
BIO_set_nbio(SSL_get_rbio(co->ssl), 1);
|
||||
if(SSL_accept(co->ssl) == -1) {
|
||||
r = EBADE;
|
||||
free(buf);
|
||||
@ -2217,10 +2355,8 @@ int connection_create(conn_t *co, int sck)
|
||||
if(bytes > 0) {
|
||||
buf[bytes] = 0;
|
||||
r = __eval_cstr(buf, conn_sys->system_rpc, (void *)co);
|
||||
printf("%s return %d (bytes %d)\n", buf, r, bytes);
|
||||
if(r) goto __fail_3;
|
||||
} else {
|
||||
printf("bytes = %d\n", bytes);
|
||||
if(bytes < 0) {
|
||||
printf("Terminate SSL connection, the other end is lost.\n");
|
||||
co->flags &= ~CXCONN_ESTABL;
|
||||
|
@ -62,8 +62,6 @@ sxmsg_t *__allocate_msg(int *res)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
usrtc_node_init(&(msg->chnl_node), msg);
|
||||
usrtc_node_init(&(msg->poll_node), msg);
|
||||
usrtc_node_init(&(msg->pendingq_node), msg);
|
||||
}
|
||||
|
||||
@ -161,7 +159,7 @@ static int __message_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec
|
||||
else pthread_mutex_timedlock(&(m->wait), tio);
|
||||
}
|
||||
if(tio && (m->flags & ESXMSG_PENDING))
|
||||
return SXOTIMEDOUT;
|
||||
return ESXOTIMEDOUT;
|
||||
if(!m->payload) {
|
||||
r = m->opcode;
|
||||
/* first remove the message from tree */
|
||||
@ -174,9 +172,16 @@ static int __message_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec
|
||||
__destroy_msg(m);
|
||||
} else {
|
||||
*msg = m;
|
||||
if(m->opcode == ESXNOCONNECT)
|
||||
if(m->opcode == ESXNOCONNECT || m->opcode == ESXRAPIDREPLY)
|
||||
r = m->opcode;
|
||||
else r = SXOREPLYREQ;
|
||||
else r = ESXOREPLYREQ;
|
||||
/* FIXME: remove ugly code */
|
||||
if(m->opcode == ESXRAPIDREPLY) {
|
||||
/* first remove the message from tree */
|
||||
pthread_rwlock_wrlock(&(ch->msglock));
|
||||
usrtc_delete(ch->msgs_tree, &(m->pendingq_node));
|
||||
pthread_rwlock_unlock(&(ch->msglock));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -193,7 +198,8 @@ int msg_send_timed(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec *tio)
|
||||
return __message_send(ch, sx, msg, tio);
|
||||
}
|
||||
|
||||
static int __msg_reply(sxmsg_t *msg, sexp_t *sx, struct timespec *tio, int opcode)
|
||||
static int __msg_reply(sxmsg_t *msg, sexp_t *sx, struct timespec *tio, int opcode,
|
||||
int israpid)
|
||||
{
|
||||
int r = 0;
|
||||
chnl_t *ch = msg->pch;
|
||||
@ -211,14 +217,19 @@ static int __msg_reply(sxmsg_t *msg, sexp_t *sx, struct timespec *tio, int opcod
|
||||
msg->opcode = opcode;
|
||||
msg->flags |= ESXMSG_PENDING; /* pending */
|
||||
msg->flags |= ESXMSG_ISREPLY; /* this is a reply */
|
||||
if(israpid) msg->flags |= ESXMSG_ISRAPID; /* message is a rapid message */
|
||||
|
||||
if(!sx) msg->flags &= ~ESXMSG_PENDING;
|
||||
if(!sx || israpid) msg->flags &= ~ESXMSG_PENDING;
|
||||
else msg->flags |= ESXMSG_RMONRETR;
|
||||
|
||||
/* put the message to the queue */
|
||||
r = pth_queue_add(co->mqueue, (void *)msg, USR_MSG);
|
||||
if(r) return r; /* FIXME: better give up */
|
||||
if(!sx) return 0;
|
||||
if(!sx || israpid) {
|
||||
/* wait for write */
|
||||
pthread_mutex_lock(&(msg->wait));
|
||||
return 0;
|
||||
}
|
||||
|
||||
if(msg->flags & ESXMSG_PENDING) {
|
||||
if(!tio) pthread_mutex_lock(&(msg->wait));
|
||||
@ -227,7 +238,7 @@ static int __msg_reply(sxmsg_t *msg, sexp_t *sx, struct timespec *tio, int opcod
|
||||
|
||||
if(tio && (msg->flags & ESXMSG_PENDING)) {
|
||||
msg->flags &= ~ESXMSG_PENDING; /* we will not wait for it */
|
||||
return SXOTIMEDOUT;
|
||||
return ESXOTIMEDOUT;
|
||||
}
|
||||
|
||||
r = msg->opcode;
|
||||
@ -243,17 +254,22 @@ static int __msg_reply(sxmsg_t *msg, sexp_t *sx, struct timespec *tio, int opcod
|
||||
|
||||
int msg_return(sxmsg_t *msg, int opcode)
|
||||
{
|
||||
return __msg_reply(msg, NULL, NULL, opcode);
|
||||
return __msg_reply(msg, NULL, NULL, opcode, 0);
|
||||
}
|
||||
|
||||
int msg_reply(sxmsg_t *msg, sexp_t *sx)
|
||||
{
|
||||
return __msg_reply(msg, sx, NULL, 0);
|
||||
return __msg_reply(msg, sx, NULL, 0, 0);
|
||||
}
|
||||
|
||||
int msg_reply_timed(sxmsg_t *msg, sexp_t *sx, struct timespec *tio)
|
||||
{
|
||||
return __msg_reply(msg, sx, tio, 0);
|
||||
return __msg_reply(msg, sx, tio, 0, 0);
|
||||
}
|
||||
|
||||
int msg_reply_rapid(sxmsg_t *msg, sexp_t *sx)
|
||||
{
|
||||
return __msg_reply(msg, sx, NULL, 0, 1);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -300,7 +316,7 @@ static int __message_send_pulse(chnl_t *ch, sexp_t *sx, struct timespec *tio,
|
||||
else pthread_mutex_timedlock(&(m->wait), tio);
|
||||
}
|
||||
if(tio && (m->flags & ESXMSG_PENDING))
|
||||
return SXOTIMEDOUT;
|
||||
return ESXOTIMEDOUT;
|
||||
}
|
||||
|
||||
if(!m->payload) {
|
||||
@ -314,7 +330,7 @@ static int __message_send_pulse(chnl_t *ch, sexp_t *sx, struct timespec *tio,
|
||||
/* destroy */
|
||||
__destroy_msg(m);
|
||||
} else {
|
||||
r = SXOREPLYREQ;
|
||||
r = ESXOREPLYREQ;
|
||||
}
|
||||
}
|
||||
|
||||
@ -335,3 +351,12 @@ int msg_send_pulse_nowait(chnl_t *ch, sexp_t *sx)
|
||||
{
|
||||
return __message_send_pulse(ch, sx, NULL, 1);
|
||||
}
|
||||
|
||||
int msg_rapid_clean(sxmsg_t *msg)
|
||||
{
|
||||
destroy_sexp(msg->initial_sx);
|
||||
if(msg->payload) destroy_sexp(msg->payload);
|
||||
__destroy_msg(msg);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user