|
|
|
@ -43,6 +43,10 @@ static long __cmp_ulong(const void *a, const void *b);
|
|
|
|
|
extern sxmsg_t *__allocate_msg(int *res);
|
|
|
|
|
extern void __destroy_msg(sxmsg_t *msg);
|
|
|
|
|
|
|
|
|
|
/* connections */
|
|
|
|
|
void __connections_subsystem_connection_remove(conn_t *);
|
|
|
|
|
static void __connection_free(conn_t *co);
|
|
|
|
|
|
|
|
|
|
/* examination */
|
|
|
|
|
static inline int __exam_connection(conn_t *co)
|
|
|
|
|
{
|
|
|
|
@ -1412,7 +1416,13 @@ static void *__cxslave_thread_listener(void *wctx)
|
|
|
|
|
}
|
|
|
|
|
co->flags &= ~CXCONN_ESTABL;
|
|
|
|
|
co->flags |= CXCONN_BROKEN;
|
|
|
|
|
|
|
|
|
|
/* ok the first of all we're need to wake up all */
|
|
|
|
|
__wake_up_waiters(co, ESXNOCONNECT);
|
|
|
|
|
/* now we need to end the poll */
|
|
|
|
|
pth_dqtpoll_destroy(co->tpoll, 1); /* force */
|
|
|
|
|
|
|
|
|
|
__connection_free(co);
|
|
|
|
|
|
|
|
|
|
free(buf);
|
|
|
|
|
|
|
|
|
@ -1432,7 +1442,13 @@ static void *__cxmaster_thread_listener(void *wctx)
|
|
|
|
|
}
|
|
|
|
|
co->flags &= ~CXCONN_ESTABL;
|
|
|
|
|
co->flags |= CXCONN_BROKEN;
|
|
|
|
|
|
|
|
|
|
/* ok the first of all we're need to wake up all */
|
|
|
|
|
__wake_up_waiters(co, ESXNOCONNECT);
|
|
|
|
|
/* now we need to end the poll */
|
|
|
|
|
pth_dqtpoll_destroy(co->tpoll, 1); /* force */
|
|
|
|
|
|
|
|
|
|
__connection_free(co);
|
|
|
|
|
|
|
|
|
|
free(buf);
|
|
|
|
|
|
|
|
|
@ -1457,9 +1473,10 @@ static void *__rmsg_queue_thread(void *ctx)
|
|
|
|
|
while(1) {
|
|
|
|
|
r = pth_queue_get(co->rqueue, NULL, tmp);
|
|
|
|
|
if(r) {
|
|
|
|
|
__fini:
|
|
|
|
|
free(tmp);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
} else if(tmp->msgtype == END_MSG) goto __fini;
|
|
|
|
|
msg = tmp->data;
|
|
|
|
|
if(!msg) continue; /* spurious !! */
|
|
|
|
|
|
|
|
|
@ -1521,12 +1538,14 @@ static void *__msg_queue_thread(void *ctx)
|
|
|
|
|
while(1) {
|
|
|
|
|
r = pth_queue_get(co->mqueue, NULL, tmp);
|
|
|
|
|
if(r) {
|
|
|
|
|
__fini:
|
|
|
|
|
free(buf);
|
|
|
|
|
free(tmp);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* message workout */
|
|
|
|
|
if(tmp->msgtype == END_MSG) goto __fini;
|
|
|
|
|
msg = tmp->data;
|
|
|
|
|
if(!msg) continue; /* spurious message */
|
|
|
|
|
|
|
|
|
@ -1551,12 +1570,12 @@ static void *__msg_queue_thread(void *ctx)
|
|
|
|
|
/* mark it to close */
|
|
|
|
|
msg->flags |= ESXMSG_CLOSURE;
|
|
|
|
|
/* ok, here we will write it and wait, destroying dialog while reply */
|
|
|
|
|
;
|
|
|
|
|
|
|
|
|
|
goto __ssl_write;
|
|
|
|
|
} else snprintf(buf, 4096, "(ch-msg-repl (:chid %lu (:msgid %lu ", ch->cid,
|
|
|
|
|
msg->mid);
|
|
|
|
|
}
|
|
|
|
|
;
|
|
|
|
|
|
|
|
|
|
len = strlen(buf);
|
|
|
|
|
tb += len*sizeof(char);
|
|
|
|
|
if(print_sexp(tb, 4096 - (len + 4*sizeof(char)), sx) == -1) {
|
|
|
|
@ -1573,7 +1592,6 @@ static void *__msg_queue_thread(void *ctx)
|
|
|
|
|
if(msg->flags & ESXMSG_ISREPLY)
|
|
|
|
|
destroy_sexp(msg->payload);
|
|
|
|
|
} else {
|
|
|
|
|
;
|
|
|
|
|
pthread_mutex_unlock(&(msg->wait));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -1854,6 +1872,15 @@ int connections_subsystem_setrpclist_function(usrtc_t* (*get_rpc_typed_list_tree
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void __connections_subsystem_connection_remove(conn_t *co)
|
|
|
|
|
{
|
|
|
|
|
pthread_rwlock_wrlock(&conn_sys->rwlock);
|
|
|
|
|
usrtc_delete(conn_sys->connections, &(co->csnode));
|
|
|
|
|
pthread_rwlock_unlock(&conn_sys->rwlock);
|
|
|
|
|
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#define __TMPBUFLEN 2048
|
|
|
|
|
|
|
|
|
|
/* connection_initiate: perform a connection thru the socket to the
|
|
|
|
@ -1870,6 +1897,7 @@ int connection_initiate(conn_t *co, const char *host, int port,
|
|
|
|
|
struct sockaddr_in addr;
|
|
|
|
|
usrtc_t *ch_tree, *rpc_tree;
|
|
|
|
|
pth_queue_t *mqueue = malloc(sizeof(pth_queue_t));
|
|
|
|
|
pth_queue_t *rqueue = malloc(sizeof(pth_queue_t));
|
|
|
|
|
pth_dqtpoll_t *tpoll = malloc(sizeof(pth_dqtpoll_t));
|
|
|
|
|
idx_allocator_t *idx_ch = malloc(sizeof(idx_allocator_t));
|
|
|
|
|
|
|
|
|
@ -1877,6 +1905,7 @@ int connection_initiate(conn_t *co, const char *host, int port,
|
|
|
|
|
__fallenomem:
|
|
|
|
|
r = ENOMEM;
|
|
|
|
|
__fall0:
|
|
|
|
|
if(rqueue) free(rqueue);
|
|
|
|
|
if(mqueue) free(mqueue);
|
|
|
|
|
if(tpoll) free(tpoll);
|
|
|
|
|
if(idx_ch) free(idx_ch);
|
|
|
|
@ -1922,6 +1951,9 @@ int connection_initiate(conn_t *co, const char *host, int port,
|
|
|
|
|
/* assign message queue */
|
|
|
|
|
if((r = pth_queue_init(mqueue))) goto __fail_3;
|
|
|
|
|
co->mqueue = mqueue;
|
|
|
|
|
/* assign repl message queue */
|
|
|
|
|
if((r = pth_queue_init(rqueue))) goto __fail_3;
|
|
|
|
|
co->rqueue = rqueue;
|
|
|
|
|
|
|
|
|
|
/* init SSL certificates and context */
|
|
|
|
|
co->ctx = SSL_CTX_new(TLSv1_2_client_method());
|
|
|
|
@ -2006,7 +2038,7 @@ int connection_initiate(conn_t *co, const char *host, int port,
|
|
|
|
|
co->flags &= ~CXCONN_ESTABL;
|
|
|
|
|
r = ESXNOCONNECT;
|
|
|
|
|
co->flags |= CXCONN_BROKEN;
|
|
|
|
|
//__wake_up_waiters(co, ESXNOCONNECT);
|
|
|
|
|
|
|
|
|
|
free(buf);
|
|
|
|
|
/* shutdown connection */
|
|
|
|
|
goto __fail_3;
|
|
|
|
@ -2020,11 +2052,11 @@ int connection_initiate(conn_t *co, const char *host, int port,
|
|
|
|
|
/* read the message reply */
|
|
|
|
|
bytes = __conn_read(co, buf, __TMPBUFLEN);
|
|
|
|
|
if(bytes == -1) {
|
|
|
|
|
// we've lost the connection
|
|
|
|
|
/* we've lost the connection */
|
|
|
|
|
co->flags &= ~CXCONN_ESTABL;
|
|
|
|
|
co->flags |= CXCONN_BROKEN;
|
|
|
|
|
r = ESXNOCONNECT;
|
|
|
|
|
//__wake_up_waiters(co, ESXNOCONNECT);
|
|
|
|
|
|
|
|
|
|
free(buf);
|
|
|
|
|
/* shutdown connection */
|
|
|
|
|
goto __fail_3;
|
|
|
|
@ -2048,6 +2080,8 @@ int connection_initiate(conn_t *co, const char *host, int port,
|
|
|
|
|
}
|
|
|
|
|
r = pthread_create(&co->msgthread, NULL, __msg_queue_thread, (void *)co);
|
|
|
|
|
if(r) goto __fail_3;
|
|
|
|
|
r = pthread_create(&co->rmsgthread, NULL, __rmsg_queue_thread, (void *)co);
|
|
|
|
|
if(r) goto __fail_3;
|
|
|
|
|
|
|
|
|
|
pth_dqtpoll_run(tpoll);
|
|
|
|
|
co->tpoll = tpoll;
|
|
|
|
@ -2186,7 +2220,7 @@ int connection_create(conn_t *co, int sck)
|
|
|
|
|
printf("Terminate SSL connection, the other end is lost.\n");
|
|
|
|
|
co->flags &= ~CXCONN_ESTABL;
|
|
|
|
|
co->flags |= CXCONN_BROKEN;
|
|
|
|
|
//__wake_up_waiters(co, ESXNOCONNECT);
|
|
|
|
|
|
|
|
|
|
r = ESXNOCONNECT;
|
|
|
|
|
goto __fail_3;
|
|
|
|
|
}
|
|
|
|
@ -2229,8 +2263,20 @@ int connection_create(conn_t *co, int sck)
|
|
|
|
|
return r;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int connection_close(conn_t *co) /* TODO: */
|
|
|
|
|
int connection_close(conn_t *co)
|
|
|
|
|
{
|
|
|
|
|
void *nil;
|
|
|
|
|
|
|
|
|
|
pthread_cancel(co->cthread);
|
|
|
|
|
/* wait for the main listener */
|
|
|
|
|
pthread_join(co->cthread, &nil);
|
|
|
|
|
/* ok the first of all we're need to wake up all */
|
|
|
|
|
__wake_up_waiters(co, ESXNOCONNECT);
|
|
|
|
|
/* now we need to end the poll */
|
|
|
|
|
pth_dqtpoll_destroy(co->tpoll, 1); /* force */
|
|
|
|
|
|
|
|
|
|
__connection_free(co);
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -2243,3 +2289,41 @@ extern int __create_reg_msg(sxmsg_t **msg, chnl_t *ch);
|
|
|
|
|
|
|
|
|
|
extern int __create_sys_msg(sxmsg_t **msg, char *uuid, chnl_t *ch, sxpayload_t *data);
|
|
|
|
|
|
|
|
|
|
static void __connection_free(conn_t *co)
|
|
|
|
|
{
|
|
|
|
|
pth_msg_t msg;
|
|
|
|
|
void *nil;
|
|
|
|
|
|
|
|
|
|
/* since we doesn't works with IN queue and job dispatching
|
|
|
|
|
* close the thread
|
|
|
|
|
*/
|
|
|
|
|
msg.msgtype = END_MSG;
|
|
|
|
|
msg.data = NULL;
|
|
|
|
|
pth_queue_add(co->rqueue, &msg, END_MSG); /* it will drop the thread */
|
|
|
|
|
pthread_join(co->rmsgthread, &nil); /* wait for it */
|
|
|
|
|
/* message sending dispatch thread must be finished too */
|
|
|
|
|
pth_queue_add(co->mqueue, &msg, END_MSG); /* it will drop the thread */
|
|
|
|
|
pthread_join(co->msgthread, &nil); /* wait for it */
|
|
|
|
|
/* since we don't have any threads working with channels destroy them */
|
|
|
|
|
__destroy_all_channels(co);
|
|
|
|
|
/* TODO: permission context and callbacks */
|
|
|
|
|
__connections_subsystem_connection_remove(co);
|
|
|
|
|
/* now we're ready to free other resources */
|
|
|
|
|
if(co->uuid) free(co->uuid);
|
|
|
|
|
/* idx allocator */
|
|
|
|
|
idx_allocator_destroy(co->idx_ch);
|
|
|
|
|
free(co->idx_ch);
|
|
|
|
|
free(co->chnl_tree);
|
|
|
|
|
/* kill SSL context */
|
|
|
|
|
close(SSL_get_fd(co->ssl));
|
|
|
|
|
SSL_free(co->ssl);
|
|
|
|
|
SSL_CTX_free(co->ctx);
|
|
|
|
|
/* destroy queues */
|
|
|
|
|
pth_queue_destroy(co->mqueue, 0, NULL);
|
|
|
|
|
pth_queue_destroy(co->rqueue, 0, NULL);
|
|
|
|
|
/* locks */
|
|
|
|
|
pthread_rwlock_destroy(&(co->chnl_lock));
|
|
|
|
|
pthread_mutex_destroy(&(co->oplock));
|
|
|
|
|
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|