removed obsolete functions - planned reinit, pulse messages;

v0.5.xx
Alexander Vdolainen 10 years ago
parent b60bca13b4
commit 87c09c307e

@ -137,7 +137,7 @@ typedef struct __sexp_payload_t {
#define ESXMSG_PENDING (1 << 3)
#define ESXMSG_NOWAY (1 << 4)
#define ESXMSG_TIMEDOUT (1 << 5)
#define ESXMSG_PULSE (1 << 6)
#define ESXMSG_PULSE (1 << 6) /* obsolete flag */
#define ESXMSG_NOWAIT (1 << 7)
#define ESXMSG_ISREPLY (1 << 8)
#define ESXMSG_CLOSURE (1 << 9)
@ -252,9 +252,6 @@ int connection_create_fapi(conn_t *co, int sck, struct in_addr *addr);
int connection_close(conn_t *co);
/* FIXME: for the next versions */
int connection_reinit(conn_t *co);
/* channels */
int channel_open(conn_t *co, chnl_t **ch, int type);
@ -277,12 +274,6 @@ int msg_reply_rapid(sxmsg_t *msg, sexp_t *sx);
/* this is required to clean the message in case if it's a rapid message */
int msg_rapid_clean(sxmsg_t *msg);
int msg_send_pulse(chnl_t *ch, sexp_t *sx);
int msg_send_pulse_timed(chnl_t *ch, sexp_t *sx, struct timespec *tio);
int msg_send_pulse_nowait(chnl_t *ch, sexp_t *sx);
#ifdef __cplusplus
}
#endif

@ -997,117 +997,6 @@ static int __create_reg_msg_mould(sxmsg_t **msg, chnl_t *ch, ulong_t mid)
return 0;
}
// TODO: check and continue
static int __default_msg_pulse(void *cctx, sexp_t *sx)
{
conn_t *co = (conn_t *)cctx;
usrtc_node_t *node = NULL;
chnl_t *chan = NULL;
int r = 0;
sexp_t *lsx = NULL, *sx_iter = NULL;
sexp_t *sx_sublist = NULL, *sx_value = NULL;
ulong_t chnl_id = -1;
ulong_t msg_id = -1;
sexp_t *msg = NULL;
sxmsg_t *smsg = NULL;
int idx;
/* get parameters from the message */
if(sexp_list_cdr(sx, &lsx)) return ESXRCBADPROT;
if(!SEXP_IS_LIST(lsx)) return ESXRCBADPROT;
/* find channel id */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
sx_sublist = sx_iter;
continue;
} else {
if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) {
if(strcmp(sx_iter->val, ":chid")) {
continue; // ignore it
}
sx_value = sx_iter->next;
if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) {
continue;
}
chnl_id = atol(sx_value->val);
} else continue; // ignore it
}
}
lsx = sx_sublist;
/* find message id */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
msg = sx_iter;
continue;
} else {
if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) {
if(strcmp(sx_iter->val, ":msgid")) {
continue; // ignore
}
sx_value = sx_iter->next;
if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) {
continue;
}
msg_id = atol(sx_value->val);
} else continue; // ignore it
}
}
if(msg_id < 0 || chnl_id < 0) {
return ESXRCBADPROT;
}
/* find channel */
if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT;
else chan = (chnl_t *)usrtc_node_getdata(node);
/* lookup for the message */
pthread_rwlock_rdlock(&(chan->msglock));
if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) {
pthread_rwlock_unlock(&(chan->msglock));
/* here, rpc lookup has no sense, just put this job to the queue */
/* btw, we're need to create a message first */
r = __create_reg_msg_mould(&smsg, chan, msg_id);
if(r) return r;
/* assign the message */
smsg->opcode = 0;
smsg->payload = (void *)msg;
/* assign initial S-expression structure */
smsg->initial_sx = sx;
/* put the message to the search tree */
pthread_rwlock_wrlock(&(chan->msglock));
usrtc_insert(chan->msgs_tree, &(smsg->pendingq_node), &(smsg->mid));
pthread_rwlock_unlock(&(chan->msglock));
} else {
//printf(">>>>>>>>>>>>>>>>>>>msg_id = %lu\n", msg_id);
pthread_rwlock_unlock(&(chan->msglock));
smsg = (sxmsg_t *)usrtc_node_getdata(node);
msg_return(smsg, EEXIST);
return EEXIST;
}
/* put job to the queue and give up */
r = pth_queue_add(co->rqueue, (void *)smsg, USR_MSG);
if(r) { /* cannot put job to the queue */
pthread_rwlock_wrlock(&(chan->msglock));
usrtc_delete(chan->msgs_tree, &(smsg->pendingq_node));
pthread_rwlock_unlock(&(chan->msglock));
__destroy_msg(smsg);
return r;
}
//msg_return(smsg, r);
/* put to the IN queue */
return r;
}
static int __default_msg_pulse_ret(void *cctx, sexp_t *sx)
{
return 0;
}
static int __default_msg(void *cctx, sexp_t *sx)
{
conn_t *co = (conn_t *)cctx;
@ -1535,8 +1424,6 @@ static int __init_systemrpc_tree(usrtc_t *rtree)
if(__insert_rpc_function(rtree, "ch-close", __default_ch_close)) goto __fail;
if(__insert_rpc_function(rtree, "ch-close-ret", __default_ch_close_ret)) goto __fail;
/* messaging functions */
if(__insert_rpc_function(rtree, "ch-msg-pulse", __default_msg_pulse)) goto __fail;
if(__insert_rpc_function(rtree, "ch-msg-pulse-ret", __default_msg_pulse_ret)) goto __fail;
if(__insert_rpc_function(rtree, "ch-msg", __default_msg)) goto __fail;
if(__insert_rpc_function(rtree, "ch-msg-rete", __default_msg_return)) goto __fail;
if(__insert_rpc_function(rtree, "ch-msg-rapid", __default_msg_rapid)) goto __fail;
@ -1785,27 +1672,6 @@ static void *__msg_queue_thread(void *ctx)
pthread_mutex_unlock(&(msg->wait));
}
}
} else { /* pulse messages */
/* here we're shouldn't process reply procedure */
snprintf(buf, 4096, "(ch-msg-pulse (:chid %lu (:msgid %lu ", ch->cid,
msg->mid);
len = strlen(buf); /* FIXME: code double shit ! */
tb += len*sizeof(char);
if(print_sexp(tb, 4096 - (len + 4*sizeof(char)), sx) == -1) {
msg->opcode = ENOMEM;
/* we don't need to wake up anybody */
if(msg->flags & ESXMSG_TIMEDOUT) {
/* clean up all the shit:
* 1. remove from message tree
* 2. destroy message itself
*/
destroy_sexp(msg->initial_sx);
msg->initial_sx = NULL;
msg->payload = NULL;
__destroy_msg(msg);
} else
pthread_mutex_unlock(&(msg->wait));
}
}
len = strlen(tb);

@ -276,86 +276,6 @@ int msg_reply_rapid(sxmsg_t *msg, sexp_t *sx)
return __msg_reply(msg, sx, NULL, 0, 1);
}
/*
* How message sending works:
* 1. Create a message structure assigned to the channel,
* 2. Put S-expression context to it
* 3. Put the message to the queue
* 4. Wait for job execution
*/
static int __message_send_pulse(chnl_t *ch, sexp_t *sx, struct timespec *tio,
int nowait)
{
int r = 0;
sxmsg_t *m = NULL;
conn_t *co = ch->connection;
if(!(co->flags & CXCONN_ESTABL)) {
destroy_sexp(sx);
return ESXNOCONNECT;
}
r = __create_reg_msg(&m, ch);
if(r) return r;
else {
/* put the message to the search tree */
pthread_rwlock_wrlock(&(ch->msglock));
usrtc_insert(ch->msgs_tree, &(m->pendingq_node), &(m->mid));
pthread_rwlock_unlock(&(ch->msglock));
/* message assign */
m->opcode = 0;
m->payload = (void *)sx;
/* assign initial sx */
m->initial_sx = sx;
m->flags |= ESXMSG_PULSE;
/* put the message to the run queue */
r = pth_queue_add(co->mqueue, (void *)m, USR_MSG);
if(r) return r; /* FIXME: better give up */
if(!nowait) {
if(m->flags & ESXMSG_PENDING) {
if(!tio) pthread_mutex_lock(&(m->wait));
else pthread_mutex_timedlock(&(m->wait), tio);
}
if(tio && (m->flags & ESXMSG_PENDING))
return ESXOTIMEDOUT;
}
if(!m->payload) {
r = m->opcode;
/* first remove the message from tree */
pthread_rwlock_wrlock(&(ch->msglock));
usrtc_delete(ch->msgs_tree, &(m->pendingq_node));
pthread_rwlock_unlock(&(ch->msglock));
/* destroy s expression */
destroy_sexp(m->initial_sx);
/* destroy */
__destroy_msg(m);
} else {
r = ESXOREPLYREQ;
}
}
return r;
}
int msg_send_pulse(chnl_t *ch, sexp_t *sx)
{
return __message_send_pulse(ch, sx, NULL, 0);
}
int msg_send_pulse_timed(chnl_t *ch, sexp_t *sx, struct timespec *tio)
{
return __message_send_pulse(ch, sx, tio, 0);
}
int msg_send_pulse_nowait(chnl_t *ch, sexp_t *sx)
{
return __message_send_pulse(ch, sx, NULL, 1);
}
int msg_rapid_clean(sxmsg_t *msg)
{
destroy_sexp(msg->initial_sx);

Loading…
Cancel
Save