|
|
|
@ -31,6 +31,10 @@ conn_sys_t *conn_sys = NULL;
|
|
|
|
|
|
|
|
|
|
static long __cmp_ulong(const void *a, const void *b);
|
|
|
|
|
|
|
|
|
|
/* message alloc and destroy */
|
|
|
|
|
static sxmsg_t *__allocate_msg(int *res);
|
|
|
|
|
static void __destroy_msg(sxmsg_t *msg);
|
|
|
|
|
|
|
|
|
|
int __alloc_channel(ulong_t cid, conn_t *co, rpc_typed_list_t *rlist, chnl_t **channel)
|
|
|
|
|
{
|
|
|
|
|
int r = 0;
|
|
|
|
@ -631,6 +635,30 @@ static int __default_ch_close(void *cctx, sexp_t *sx)
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* create a nould of the message */
|
|
|
|
|
static int __create_reg_msg_mould(sxmsg_t **msg, chnl_t *ch, ulong_t mid)
|
|
|
|
|
{
|
|
|
|
|
int r = 0;
|
|
|
|
|
sxmsg_t *sm = __allocate_msg(&r);
|
|
|
|
|
|
|
|
|
|
if(r) return r;
|
|
|
|
|
else {
|
|
|
|
|
sm->pch = ch;
|
|
|
|
|
sm->flags = (ESXMSG_USR | ESXMSG_PENDING);
|
|
|
|
|
sm->mid = mid;
|
|
|
|
|
|
|
|
|
|
/* ok reserve message ID */
|
|
|
|
|
pthread_mutex_lock(&(ch->oplock));
|
|
|
|
|
idx_reserve(ch->idx_msg, mid);
|
|
|
|
|
pthread_mutex_unlock(&(ch->oplock));
|
|
|
|
|
|
|
|
|
|
pthread_mutex_lock(&(sm->wait));
|
|
|
|
|
*msg = sm;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int __default_msg_pulse(void *cctx, sexp_t *sx)
|
|
|
|
|
{
|
|
|
|
|
return 0;
|
|
|
|
@ -643,7 +671,74 @@ static int __default_msg_pulse_ret(void *cctx, sexp_t *sx)
|
|
|
|
|
|
|
|
|
|
static int __default_msg(void *cctx, sexp_t *sx)
|
|
|
|
|
{
|
|
|
|
|
return 0;
|
|
|
|
|
conn_t *co = (conn_t *)cctx;
|
|
|
|
|
usrtc_node_t *node = NULL;
|
|
|
|
|
chnl_t *chan = NULL;
|
|
|
|
|
char *key; int r = 0;
|
|
|
|
|
sexp_t *lsx = NULL;
|
|
|
|
|
ulong_t chnl_id = 0;
|
|
|
|
|
ulong_t msg_id = 0x00;
|
|
|
|
|
sexp_t *msg = NULL;
|
|
|
|
|
sxmsg_t *smsg = NULL;
|
|
|
|
|
|
|
|
|
|
/* get parameters from the message */
|
|
|
|
|
if(sexp_list_cdr(sx, &lsx)) return EINVAL;
|
|
|
|
|
if(!SEXP_IS_LIST(lsx)) return EINVAL;
|
|
|
|
|
|
|
|
|
|
/* FIXME: make it via iteraction, to cover the case with different arguments placement */
|
|
|
|
|
key = lsx->list->val;
|
|
|
|
|
if(strcmp(key, ":chid")) return EINVAL;
|
|
|
|
|
lsx = lsx->list->next;
|
|
|
|
|
if(!lsx) return EINVAL;
|
|
|
|
|
if(!SEXP_IS_TYPE(lsx, SEXP_BASIC)) return EINVAL;
|
|
|
|
|
chnl_id = atol(lsx->val);
|
|
|
|
|
|
|
|
|
|
lsx = lsx->next;
|
|
|
|
|
if(!SEXP_IS_LIST(lsx)) return EINVAL;
|
|
|
|
|
key = lsx->list->val;
|
|
|
|
|
if(strcmp(key, ":msgid")) return EINVAL;
|
|
|
|
|
lsx = lsx->list->next;
|
|
|
|
|
if(!lsx) return EINVAL;
|
|
|
|
|
if(!SEXP_IS_TYPE(lsx, SEXP_BASIC)) return EINVAL;
|
|
|
|
|
msg_id = atol(lsx->val); /* message id */
|
|
|
|
|
|
|
|
|
|
lsx = lsx->next;
|
|
|
|
|
if(!SEXP_IS_LIST(lsx)) return EINVAL;
|
|
|
|
|
msg = lsx;
|
|
|
|
|
|
|
|
|
|
/* find channel */
|
|
|
|
|
printf("chnl_id = %ld\n", chnl_id);
|
|
|
|
|
if(!(node = usrtc_lookup(co->chnl_tree, &chnl_id))) return ENOENT;
|
|
|
|
|
else chan = (chnl_t *)usrtc_node_getdata(node);
|
|
|
|
|
/* lookup for the message */
|
|
|
|
|
if(!(node = usrtc_lookup(chan->msgs_tree, &msg_id))) {
|
|
|
|
|
/* here, rpc lookup has no sense, just put this job to the queue */
|
|
|
|
|
/* btw, we're need to create a message first */
|
|
|
|
|
r = __create_reg_msg_mould(&smsg, chan, msg_id);
|
|
|
|
|
if(r) return r; /* TODO: reply with right error code */
|
|
|
|
|
|
|
|
|
|
/* assign the message */
|
|
|
|
|
smsg->opcode = 0;
|
|
|
|
|
smsg->payload = (void *)msg;
|
|
|
|
|
|
|
|
|
|
/* put the message to the search tree */
|
|
|
|
|
pthread_rwlock_wrlock(&(chan->msglock));
|
|
|
|
|
usrtc_insert(chan->msgs_tree, &(smsg->pendingq_node), &(smsg->mid));
|
|
|
|
|
pthread_rwlock_unlock(&(chan->msglock));
|
|
|
|
|
} else {
|
|
|
|
|
/* TODO: reply fdr this message with an error EEXIST */
|
|
|
|
|
return EEXIST;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* put job to the queue and give up */
|
|
|
|
|
r = pth_queue_add(co->rqueue, (void *)msg, USR_MSG);
|
|
|
|
|
if(r) { /* cannot put job to the queue */
|
|
|
|
|
/* TODO: remove message and reply with error code */
|
|
|
|
|
return r;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* put to the IN queue */
|
|
|
|
|
return r;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int __default_msg_return(void *cctx, sexp_t *sx)
|
|
|
|
@ -705,7 +800,7 @@ static int __eval_cstr(char *cstr, cx_rpc_list_t *rpc_list, void *ctx)
|
|
|
|
|
printf("rentry->rpcf = %p\n", rentry->rpcf);
|
|
|
|
|
r = rentry->rpcf(ctx, sx);
|
|
|
|
|
/* free s-expression */
|
|
|
|
|
destroy_sexp(sx);
|
|
|
|
|
//destroy_sexp(sx); FIXME: i guess rpc call should take care of sx
|
|
|
|
|
|
|
|
|
|
return r;
|
|
|
|
|
}
|
|
|
|
@ -757,7 +852,7 @@ static void *__msg_queue_thread(void *ctx)
|
|
|
|
|
if(buf) free(buf);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
__DBGLINE;
|
|
|
|
|
while(1) {
|
|
|
|
|
r = pth_queue_get(co->mqueue, NULL, tmp);
|
|
|
|
|
if(r) {
|
|
|
|
@ -765,6 +860,7 @@ static void *__msg_queue_thread(void *ctx)
|
|
|
|
|
free(tmp);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
__DBGLINE;
|
|
|
|
|
|
|
|
|
|
/* message workout */
|
|
|
|
|
msg = tmp->data;
|
|
|
|
@ -773,6 +869,7 @@ static void *__msg_queue_thread(void *ctx)
|
|
|
|
|
if(!(msg->flags & ESXMSG_USR)) { /* not a regular message */
|
|
|
|
|
msg->flags |= ESXMSG_NOWAY; /* mark it's as undeliverable */
|
|
|
|
|
msg->flags &= ~ESXMSG_PENDING;
|
|
|
|
|
__DBGLINE;
|
|
|
|
|
pthread_mutex_unlock(&(msg->wait)); /* wake up the waitee */
|
|
|
|
|
continue;
|
|
|
|
|
} else {
|
|
|
|
@ -790,14 +887,15 @@ static void *__msg_queue_thread(void *ctx)
|
|
|
|
|
/* mark it to close */
|
|
|
|
|
msg->flags |= ESXMSG_CLOSURE;
|
|
|
|
|
/* ok, here we will write it and wait, destroying dialog while reply */
|
|
|
|
|
__DBGLINE;
|
|
|
|
|
goto __ssl_write;
|
|
|
|
|
} else snprintf(buf, 4096, "(ch-msg-repl (:chid %lu (:msgid %lu ", ch->cid,
|
|
|
|
|
msg->mid);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
__DBGLINE;
|
|
|
|
|
len = strlen(buf);
|
|
|
|
|
tb += len*sizeof(char);
|
|
|
|
|
if(print_sexp(tb, 4096 - (len + 4*sizeof(char)), sx)) {
|
|
|
|
|
if(print_sexp(tb, 4096 - (len + 4*sizeof(char)), sx) == -1) {
|
|
|
|
|
msg->opcode = ENOMEM;
|
|
|
|
|
/* we don't need to wake up anybody */
|
|
|
|
|
if(msg->flags & ESXMSG_TIMEDOUT) {
|
|
|
|
@ -806,19 +904,22 @@ static void *__msg_queue_thread(void *ctx)
|
|
|
|
|
* 2. destroy message itself
|
|
|
|
|
*/
|
|
|
|
|
/* TODO: destroy the message */
|
|
|
|
|
} else
|
|
|
|
|
} else {
|
|
|
|
|
__DBGLINE;
|
|
|
|
|
pthread_mutex_unlock(&(msg->wait));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
len = strlen(tb);
|
|
|
|
|
tb += len*sizeof(char);
|
|
|
|
|
strcat(tb, ")))");
|
|
|
|
|
__DBGLINE;
|
|
|
|
|
} else { /* pulse messages */
|
|
|
|
|
/* here we're shouldn't process reply procedure */
|
|
|
|
|
snprintf(buf, 4096, "(ch-msg-pulse (:chid %lu (:msgid %lu ", ch->cid,
|
|
|
|
|
msg->mid);
|
|
|
|
|
len = strlen(buf); /* FIXME: code double shit ! */
|
|
|
|
|
tb += len*sizeof(char);
|
|
|
|
|
if(print_sexp(tb, 4096 - (len + 4*sizeof(char)), sx)) {
|
|
|
|
|
if(print_sexp(tb, 4096 - (len + 4*sizeof(char)), sx) == -1) {
|
|
|
|
|
msg->opcode = ENOMEM;
|
|
|
|
|
/* we don't need to wake up anybody */
|
|
|
|
|
if(msg->flags & ESXMSG_TIMEDOUT) {
|
|
|
|
@ -836,6 +937,7 @@ static void *__msg_queue_thread(void *ctx)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
__ssl_write:
|
|
|
|
|
__DBGLINE;
|
|
|
|
|
/* write it */
|
|
|
|
|
pthread_mutex_lock(&(co->oplock)); /* exclusive write */
|
|
|
|
|
SSL_write(co->ssl, (void *)buf, strlen(buf) + sizeof(char)); /* TODO: SSL*/
|
|
|
|
@ -1094,8 +1196,10 @@ int connection_initiate(conn_t *co, const char *host, int port,
|
|
|
|
|
struct hostent *host_;
|
|
|
|
|
struct sockaddr_in addr;
|
|
|
|
|
usrtc_t *ch_tree, *rpc_tree;
|
|
|
|
|
pth_queue_t *mqueue = malloc(sizeof(pth_queue_t));
|
|
|
|
|
idx_allocator_t *idx_ch = malloc(sizeof(idx_allocator_t));
|
|
|
|
|
|
|
|
|
|
/* TODO: check for mqueue allocation */
|
|
|
|
|
if(!co) return EINVAL;
|
|
|
|
|
if(!host) return EINVAL;
|
|
|
|
|
if(!SSL_cert) return EINVAL;
|
|
|
|
@ -1124,6 +1228,10 @@ int connection_initiate(conn_t *co, const char *host, int port,
|
|
|
|
|
|
|
|
|
|
co->idx_ch = idx_ch;
|
|
|
|
|
|
|
|
|
|
/* assign message queue */
|
|
|
|
|
pth_queue_init(mqueue); /* TODO: check for initialization */
|
|
|
|
|
co->mqueue = mqueue;
|
|
|
|
|
|
|
|
|
|
/* init SSL certificates and context */
|
|
|
|
|
co->ctx = SSL_CTX_new(SSLv3_client_method());
|
|
|
|
|
if(!co->ctx) { ERR_print_errors_fp(stderr);
|
|
|
|
@ -1224,8 +1332,11 @@ int connection_initiate(conn_t *co, const char *host, int port,
|
|
|
|
|
pthread_rwlock_wrlock(&conn_sys->rwlock);
|
|
|
|
|
usrtc_insert(conn_sys->connections, &co->csnode, (void *)co->uuid);
|
|
|
|
|
pthread_rwlock_unlock(&conn_sys->rwlock);
|
|
|
|
|
return 0;
|
|
|
|
|
//return 0; /* FIXME: */
|
|
|
|
|
}
|
|
|
|
|
r = pthread_create(&co->msgthread, NULL, __msg_queue_thread, (void *)co);
|
|
|
|
|
/* TODO: check for thread creation */
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
__fail_3:
|
|
|
|
@ -1594,9 +1705,11 @@ static int __message_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec
|
|
|
|
|
|
|
|
|
|
*msg = NULL;
|
|
|
|
|
|
|
|
|
|
__DBGLINE;
|
|
|
|
|
r = __create_reg_msg(&m, ch);
|
|
|
|
|
if(r) return r;
|
|
|
|
|
else {
|
|
|
|
|
__DBGLINE;
|
|
|
|
|
/* put the message to the search tree */
|
|
|
|
|
pthread_rwlock_wrlock(&(ch->msglock));
|
|
|
|
|
usrtc_insert(ch->msgs_tree, &(m->pendingq_node), &(m->mid));
|
|
|
|
@ -1608,16 +1721,19 @@ static int __message_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec
|
|
|
|
|
|
|
|
|
|
/* put the message to the run queue */
|
|
|
|
|
r = pth_queue_add(co->mqueue, (void *)m, USR_MSG);
|
|
|
|
|
__DBGLINE;
|
|
|
|
|
if(r) return r; /* FIXME: better give up */
|
|
|
|
|
|
|
|
|
|
if(m->flags & ESXMSG_PENDING) {
|
|
|
|
|
__DBGLINE;
|
|
|
|
|
if(!tio) pthread_mutex_lock(&(m->wait));
|
|
|
|
|
else pthread_mutex_timedlock(&(m->wait), tio);
|
|
|
|
|
__DBGLINE;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
__DBGLINE;
|
|
|
|
|
if(tio && (m->flags & ESXMSG_PENDING))
|
|
|
|
|
return SXOTIMEDOUT;
|
|
|
|
|
|
|
|
|
|
__DBGLINE;
|
|
|
|
|
if(!m->payload) {
|
|
|
|
|
/* TODO: destroy the message */
|
|
|
|
|
r = m->opcode;
|
|
|
|
|