added numerious fixes, TODOs, and some implementation;

v0.5.xx
Alexander Vdolainen 10 years ago
parent 733ee9f904
commit 36bd01716e

@ -139,6 +139,7 @@ typedef struct __message_t {
usrtc_node_t pendingq_node; /** < node for the pending queue */
pthread_mutex_t wait; /** < special wait mutex, used for sync */
void *payload; /** < payload */
sexp_t *initial_sx;
int opcode; /** < opcode for system and pulse messages */
int flags; /** < flags of the message (type, state etc ...)*/
int use_count; /** < use count */

@ -847,7 +847,8 @@ static int __default_msg(void *cctx, sexp_t *sx)
/* get parameters from the message */
if(sexp_list_cdr(sx, &lsx)) return EINVAL;
if(!SEXP_IS_LIST(lsx)) return EINVAL;
// find channel id
/* TODO: optimize it, i.e. do it with one pass iteraction !! */
/* find channel id */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
sx_sublist = sx_iter;
@ -863,10 +864,10 @@ static int __default_msg(void *cctx, sexp_t *sx)
}
chnl_id = atol(sx_value->val);
} else continue; // ignore it
}
}
}
lsx = sx_sublist;
// find message id
/* find message id */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
msg = sx_iter;
@ -882,7 +883,7 @@ static int __default_msg(void *cctx, sexp_t *sx)
}
msg_id = atol(sx_value->val);
} else continue; // ignore it
}
}
}
if(msg_id < 0 || chnl_id < 0) {
@ -903,6 +904,9 @@ static int __default_msg(void *cctx, sexp_t *sx)
smsg->opcode = 0;
smsg->payload = (void *)msg;
/* assign initial S-expression structure */
smsg->initial_sx = sx;
/* put the message to the search tree */
pthread_rwlock_wrlock(&(chan->msglock));
usrtc_insert(chan->msgs_tree, &(smsg->pendingq_node), &(smsg->mid));
@ -915,7 +919,7 @@ static int __default_msg(void *cctx, sexp_t *sx)
}
/* put job to the queue and give up */
r = pth_queue_add(co->rqueue, (void *)msg, USR_MSG);
r = pth_queue_add(co->rqueue, (void *)smsg, USR_MSG);
if(r) { /* cannot put job to the queue */
/* TODO: remove message and reply with error code */
return r;
@ -925,15 +929,165 @@ static int __default_msg(void *cctx, sexp_t *sx)
return r;
}
/* TODO: optimize amount of code: (must be first)
* __default_msg_return
* __default_msg_reply
* there are many copy-n-paste code!!!!!
*/
static int __default_msg_return(void *cctx, sexp_t *sx)
{
conn_t *co = (conn_t *)cctx;
usrtc_node_t *node = NULL;
chnl_t *chan = NULL;
int r = 0;
sexp_t *lsx = NULL, *sx_iter = NULL;
sexp_t *sx_sublist = NULL, *sx_value = NULL;
ulong_t chnl_id = -1;
ulong_t msg_id = -1;
sexp_t *msg = NULL;
sxmsg_t *smsg = NULL;
int idx, opcode;
__DBGLINE;
/* get parameters from the message */
if(sexp_list_cdr(sx, &lsx)) return EINVAL;
if(!SEXP_IS_LIST(lsx)) return EINVAL;
__DBGLINE;
/* get parameters */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
sx_sublist = sx_iter;
continue;
} else {
if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) {
if(strcmp(sx_iter->val, ":chid")) {
continue; // ignore it
}
sx_value = sx_iter->next;
if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) {
continue;
}
chnl_id = atol(sx_value->val);
} else continue; // ignore it
}
}
lsx = sx_sublist;
/* find message id */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
msg = sx_iter;
continue;
} else {
if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) {
if(strcmp(sx_iter->val, ":msgid")) {
continue; // ignore
}
sx_value = sx_iter->next;
if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) {
continue;
}
msg_id = atol(sx_value->val);
} else continue; // ignore it
}
}
/* get opcode */
sexp_list_car(msg, &lsx);
opcode = atoi(lsx->val);
if(msg_id < 0 || chnl_id < 0) {
return EINVAL;
}
printf("chnl_id = %ld\n", chnl_id);
if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT;
else chan = (chnl_t *)usrtc_node_getdata(node);
/* lookup for the message */
if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) {
/* TODO: return gently an opcode about absent message */
} else {
smsg = (sxmsg_t *)usrtc_node_getdata(node);
smsg->opcode = opcode;
//destroy_sexp((sexp_t *)smsg->payload);
smsg->payload = NULL;
pthread_mutex_unlock(&(smsg->wait));
}
return 0;
}
static int __default_msg_reply(void *cctx, sexp_t *sx)
{
__DBGLINE;
conn_t *co = (conn_t *)cctx;
usrtc_node_t *node = NULL;
chnl_t *chan = NULL;
int r = 0;
sexp_t *lsx = NULL, *sx_iter = NULL;
sexp_t *sx_sublist = NULL, *sx_value = NULL;
ulong_t chnl_id = -1;
ulong_t msg_id = -1;
sexp_t *msg = NULL;
sxmsg_t *smsg = NULL;
int idx;
/* get parameters from the message */
if(sexp_list_cdr(sx, &lsx)) return EINVAL;
if(!SEXP_IS_LIST(lsx)) return EINVAL;
/* get parameters */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
sx_sublist = sx_iter;
continue;
} else {
if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) {
if(strcmp(sx_iter->val, ":chid")) {
continue; // ignore it
}
sx_value = sx_iter->next;
if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) {
continue;
}
chnl_id = atol(sx_value->val);
} else continue; // ignore it
}
}
lsx = sx_sublist;
/* find message id */
SEXP_ITERATE_LIST(lsx, sx_iter, idx) {
if(SEXP_IS_LIST(sx_iter)) {
msg = sx_iter;
continue;
} else {
if(SEXP_IS_TYPE(sx_iter, SEXP_BASIC)) {
if(strcmp(sx_iter->val, ":msgid")) {
continue; // ignore
}
sx_value = sx_iter->next;
if(!sx_value || !SEXP_IS_TYPE(sx_value, SEXP_BASIC)) {
continue;
}
msg_id = atol(sx_value->val);
} else continue; // ignore it
}
}
if(msg_id < 0 || chnl_id < 0) {
/* FIXME: ?? */
return EINVAL;
}
if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT;
else chan = (chnl_t *)usrtc_node_getdata(node);
/* lookup for the message */
if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) {
/* TODO: return gently an opcode about absent message */
} else {
smsg = (sxmsg_t *)usrtc_node_getdata(node);
smsg->opcode = SXOREPLYREQ;
//destroy_sexp((sexp_t *)smsg->payload);
smsg->payload = msg;
pthread_mutex_unlock(&(smsg->wait));
}
return 0;
}
@ -1041,7 +1195,7 @@ static void *__rmsg_queue_thread(void *ctx)
if(!tmp) return NULL;
while(1) {
r = pth_queue_get(co->mqueue, NULL, tmp);
r = pth_queue_get(co->rqueue, NULL, tmp);
if(r) {
free(tmp);
return NULL;
@ -1070,9 +1224,11 @@ static void *__rmsg_queue_thread(void *ctx)
if(!node) {
printf("RPC call illegal!\n");
/* TODO: correct reply with an error code */
} else rpccall = (cx_rpc_t *)usrtc_node_getdata(node);
/* call this ! */
rpccall->rpcf((void *)msg, sx);
} else {
rpccall = (cx_rpc_t *)usrtc_node_getdata(node);
/* call this ! */ /* TODO: move this job to the queue under dynamic thread poll */
rpccall->rpcf((void *)msg, sx);
}
}
}
@ -1740,12 +1896,12 @@ int connection_create(conn_t *co, int sck)
return r;
}
int connection_close(conn_t *co)
int connection_close(conn_t *co) /* TODO: */
{
return 0;
}
int connection_reinit(conn_t *co)
int connection_reinit(conn_t *co) /* TODO: */
{
return 0;
}
@ -2040,6 +2196,8 @@ static int __message_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec
/* message assign */
m->opcode = 0;
m->payload = (void *)sx;
/* assign initial sx */
m->initial_sx = sx;
/* put the message to the run queue */
r = pth_queue_add(co->mqueue, (void *)m, USR_MSG);
@ -2093,6 +2251,8 @@ static int __msg_reply(sxmsg_t *msg, sexp_t *sx, struct timespec *tio, int opcod
r = pth_queue_add(co->mqueue, (void *)msg, USR_MSG);
if(r) return r; /* FIXME: better give up */
if(!sx) return 0; /* TODO: destroy a message */
if(msg->flags & ESXMSG_PENDING) {
if(!tio) pthread_mutex_lock(&(msg->wait));
else pthread_mutex_timedlock(&(msg->wait), tio);

Loading…
Cancel
Save