|
|
|
@ -1239,16 +1239,17 @@ static int __default_msg_return(void *cctx, sexp_t *sx)
|
|
|
|
|
destroy_sexp((sexp_t *)smsg->payload);
|
|
|
|
|
smsg->payload = NULL;
|
|
|
|
|
smsg->flags |= ESXMSG_CLOSURE;
|
|
|
|
|
|
|
|
|
|
/* TODO: can we remove the message from the tree there??? */
|
|
|
|
|
|
|
|
|
|
/* Q: can we remove the message from the tree there??? */
|
|
|
|
|
/* A: yep */
|
|
|
|
|
/* first remove the message from tree */
|
|
|
|
|
pthread_rwlock_wrlock(&(chan->msglock));
|
|
|
|
|
usrtc_delete(chan->msgs_tree, &(smsg->pendingq_node));
|
|
|
|
|
pthread_rwlock_unlock(&(chan->msglock));
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pthread_mutex_unlock(&(smsg->wait));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
__finish:
|
|
|
|
|
destroy_sexp(sx);
|
|
|
|
|
return r;
|
|
|
|
@ -1385,12 +1386,10 @@ static int __eval_cstr(char *cstr, cx_rpc_list_t *rpc_list, void *ctx)
|
|
|
|
|
else rpcf = sx->val;
|
|
|
|
|
|
|
|
|
|
/* find an appropriate function */
|
|
|
|
|
//printf("rpcf = %s (sx = %p)\n", rpcf, sx);
|
|
|
|
|
node = usrtc_lookup(rpc_list->rpc_tree, rpcf);
|
|
|
|
|
if(!node) return ENOENT;
|
|
|
|
|
else rentry = (cx_rpc_t *)usrtc_node_getdata(node);
|
|
|
|
|
/* call it */
|
|
|
|
|
//printf("rentry->rpcf = %p\n", rentry->rpcf);
|
|
|
|
|
r = rentry->rpcf(ctx, sx);
|
|
|
|
|
|
|
|
|
|
return r;
|
|
|
|
@ -1404,7 +1403,7 @@ static void *__cxslave_thread_listener(void *wctx)
|
|
|
|
|
|
|
|
|
|
while((r = __conn_read(co, buf, 4096)) != -1) {
|
|
|
|
|
buf[r] = '\0';
|
|
|
|
|
if(r) printf("Got the message %s (%d bytes)\n", buf, r);
|
|
|
|
|
//if(r) printf("Got the message %s (%d bytes)\n", buf, r);
|
|
|
|
|
r = __eval_cstr(buf, conn_sys->system_rpc, co);
|
|
|
|
|
}
|
|
|
|
|
co->flags &= ~CXCONN_ESTABL;
|
|
|
|
@ -1423,7 +1422,7 @@ static void *__cxmaster_thread_listener(void *wctx)
|
|
|
|
|
|
|
|
|
|
while((r = __conn_read(co, buf, 4096)) != -1) {
|
|
|
|
|
buf[r] = '\0';
|
|
|
|
|
if(r) printf("Got the message %s (%d bytes)\n", buf, r);
|
|
|
|
|
// if(r) printf("Got the message %s (%d bytes)\n", buf, r);
|
|
|
|
|
r = __eval_cstr(buf, conn_sys->system_rpc, co);
|
|
|
|
|
}
|
|
|
|
|
co->flags &= ~CXCONN_ESTABL;
|
|
|
|
@ -1462,7 +1461,7 @@ static void *__rmsg_queue_thread(void *ctx)
|
|
|
|
|
if(!(msg->flags & ESXMSG_USR)) { /* not a regular message */
|
|
|
|
|
msg->flags |= ESXMSG_NOWAY; /* mark it's as undeliverable */
|
|
|
|
|
msg->flags &= ~ESXMSG_PENDING;
|
|
|
|
|
;
|
|
|
|
|
|
|
|
|
|
pthread_mutex_unlock(&(msg->wait)); /* wake up the waitee */
|
|
|
|
|
continue;
|
|
|
|
|
} else {
|
|
|
|
@ -1472,11 +1471,9 @@ static void *__rmsg_queue_thread(void *ctx)
|
|
|
|
|
/* get the function name */
|
|
|
|
|
if(sx->ty == SEXP_LIST) rpcf = sx->list->val;
|
|
|
|
|
else rpcf = sx->val;
|
|
|
|
|
//printf("Inbound queue RPC call = '%s'\n", rpcf);
|
|
|
|
|
|
|
|
|
|
node = usrtc_lookup(ch->rpc_list->rpc_tree, rpcf);
|
|
|
|
|
if(!node) {
|
|
|
|
|
printf("RPC call illegal!\n");
|
|
|
|
|
r = ENOENT;
|
|
|
|
|
msg_return(msg, r);
|
|
|
|
|
} else {
|
|
|
|
@ -1487,8 +1484,6 @@ static void *__rmsg_queue_thread(void *ctx)
|
|
|
|
|
rjob->sx = sx;
|
|
|
|
|
rjob->rpcf = rpccall->rpcf;
|
|
|
|
|
pth_dqtpoll_add(co->tpoll, (void *)rjob, USR_MSG); // TODO: check it
|
|
|
|
|
//rpccall->rpcf((void *)msg, sx);
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -1511,7 +1506,7 @@ static void *__msg_queue_thread(void *ctx)
|
|
|
|
|
if(buf) free(buf);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
while(1) {
|
|
|
|
|
r = pth_queue_get(co->mqueue, NULL, tmp);
|
|
|
|
|
if(r) {
|
|
|
|
@ -1594,7 +1589,7 @@ static void *__msg_queue_thread(void *ctx)
|
|
|
|
|
pthread_mutex_unlock(&(msg->wait));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
len = strlen(tb);
|
|
|
|
|
tb += len*sizeof(char);
|
|
|
|
|
strcat(tb, ")))");
|
|
|
|
@ -1605,7 +1600,7 @@ static void *__msg_queue_thread(void *ctx)
|
|
|
|
|
co->flags &= ~CXCONN_ESTABL;
|
|
|
|
|
__wake_up_waiters(co, ESXNOCONNECT);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if(msg->flags & ESXMSG_CLOSURE) {
|
|
|
|
|
/* first remove the message from tree */
|
|
|
|
|
pthread_rwlock_wrlock(&(ch->msglock));
|
|
|
|
@ -1617,8 +1612,6 @@ static void *__msg_queue_thread(void *ctx)
|
|
|
|
|
destroy_sexp((sexp_t *)msg->payload);
|
|
|
|
|
__destroy_msg(msg);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fprintf(stderr, "\t-->%s wrote %s\n", __FUNCTION__, buf);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
len = 0;
|
|
|
|
@ -1681,7 +1674,6 @@ static int __verify_certcall(int preverify_ok, X509_STORE_CTX *ctx)
|
|
|
|
|
/* ok, now we're on top of SSL (depth == 0) certs chain,
|
|
|
|
|
* and we can validate client certificate */
|
|
|
|
|
if(!depth) {
|
|
|
|
|
/* TODO: check serial number and other stuff */
|
|
|
|
|
co->pctx = malloc(sizeof(perm_ctx_t));
|
|
|
|
|
co->pctx->certid =
|
|
|
|
|
ASN1_INTEGER_get((const ASN1_INTEGER *)X509_get_serialNumber(ctx->current_cert));
|
|
|
|
@ -1974,7 +1966,6 @@ int connection_initiate(conn_t *co, const char *host, int port,
|
|
|
|
|
/* try to connect it */
|
|
|
|
|
addr.sin_family = AF_INET;
|
|
|
|
|
addr.sin_port = htons(port);
|
|
|
|
|
//printf("addr.sin_addr.s_addr = %p, host_ = %p\n", &addr.sin_addr.s_addr, host_);
|
|
|
|
|
addr.sin_addr.s_addr = *(uint32_t*)(host_->h_addr);
|
|
|
|
|
free(host_);
|
|
|
|
|
if (connect(sd, (struct sockaddr*)&addr, sizeof(addr)) != 0) {
|
|
|
|
|