diff --git a/lib/connection.c b/lib/connection.c index f369160..c871030 100644 --- a/lib/connection.c +++ b/lib/connection.c @@ -1026,6 +1026,59 @@ static void *__cxmaster_thread_listener(void *wctx) return NULL; } +static void *__rmsg_queue_thread(void *ctx) +{ + conn_t *co = (conn_t *)ctx; + pth_msg_t *tmp = malloc(sizeof(pth_msg_t)); + sxmsg_t *msg; + chnl_t *ch; + int r = 0; + char *rpcf; + sexp_t *sx; + usrtc_node_t *node = NULL; + cx_rpc_t *rpccall; + + if(!tmp) return NULL; + + while(1) { + r = pth_queue_get(co->mqueue, NULL, tmp); + if(r) { + free(tmp); + return NULL; + } + + msg = tmp->data; + if(!msg) continue; /* spurious !! */ + + /* check to right job */ + if(!(msg->flags & ESXMSG_USR)) { /* not a regular message */ + msg->flags |= ESXMSG_NOWAY; /* mark it's as undeliverable */ + msg->flags &= ~ESXMSG_PENDING; + __DBGLINE; + pthread_mutex_unlock(&(msg->wait)); /* wake up the waitee */ + continue; + } else { + /* now we're need to have a deal with the rpc calling, other - we don't care */ + ch = msg->pch; + sx = (sexp_t *)msg->payload; + /* get the function name */ + if(sx->ty == SEXP_LIST) rpcf = sx->list->val; + else rpcf = sx->val; + printf("Inbound queue RPC call = '%s'\n", rpcf); + + node = usrtc_lookup(ch->rpc_list->rpc_tree, rpcf); + if(!node) { + printf("RPC call illegal!\n"); + /* TODO: correct reply with an error code */ + } else rpccall = (cx_rpc_t *)usrtc_node_getdata(node); + /* call this ! */ + rpccall->rpcf((void *)msg, sx); + } + } + + return NULL; +} + static void *__msg_queue_thread(void *ctx) { conn_t *co = (conn_t *)ctx; @@ -1547,6 +1600,7 @@ int connection_create(conn_t *co, int sck) char *buf = NULL; usrtc_t *ch_tree, *rpc_tree; pth_queue_t *rqueue = malloc(sizeof(pth_queue_t)); + pth_queue_t *mqueue = malloc(sizeof(pth_queue_t)); idx_allocator_t *idx_ch = malloc(sizeof(idx_allocator_t)); if(!co) return EINVAL; @@ -1572,10 +1626,13 @@ int connection_create(conn_t *co, int sck) usrtc_init(ch_tree, USRTC_REDBLACK, MAX_CHANNELS, __cmp_ulong); co->idx_ch = idx_ch; - + /* assign message queue */ pth_queue_init(rqueue); /* TODO: check for initialization */ co->rqueue = rqueue; + /* assigned outbone message queue master also has this one */ + pth_queue_init(mqueue); /* TODO: check for initialization */ + co->mqueue = mqueue; /* init SSL certificates and context */ co->ctx = SSL_CTX_new(SSLv3_server_method()); @@ -1665,6 +1722,9 @@ int connection_create(conn_t *co, int sck) pthread_rwlock_wrlock(&conn_sys->rwlock); usrtc_insert(conn_sys->connections, &co->csnode, (void *)co->uuid); pthread_rwlock_unlock(&conn_sys->rwlock); + /* threads poll --- */ + r = pthread_create(&co->msgthread, NULL, __msg_queue_thread, (void *)co); /* TODO: check for thread creation */ + r = pthread_create(&co->rmsgthread, NULL, __rmsg_queue_thread, (void *)co); /* TODO: check for thread creation */ } return r;