extern C added, minor fixes;

v0.5.xx
Alexander Vdolainen 10 years ago
parent ebfadad3e9
commit 109d70a716

@ -171,7 +171,7 @@ typedef struct __connections_subsys_type {
int (*set_typed_list_callback)(conn_t *, int, char *); /** < this function is a callback
* during setting up a typed channel */
void (*on_destroy)(conn_t *); /** < callback on connection destroy */
void (*on_pulse)(conn_t *, sxmsg_t *); /** < callback on pulse emit */
void (*on_pulse)(conn_t *, sexp_t *); /** < callback on pulse emit */
void *priv;
} conn_sys_t;
@ -189,6 +189,10 @@ typedef struct __rpc_typed_list_type {
usrtc_node_t lnode;
} rpc_typed_list_t;
#ifdef __cplusplus
extern "C" {
#endif
/* API */
int connections_init(conn_sys_t *ssys);
@ -219,20 +223,39 @@ int sxmsg_send(chnl_t *channel, const char *data, size_t datalen, sxmsg_t **msg)
int sxmsg_send_pp(chnl_t *channel, const char *data, size_t datalen, sxmsg_t **msg);
/* send a pulse message */
int sxmsg_pulse(conn_t *co, const char *data, size_t datalen);
/* the same but will be postponed */
int sxmsg_pulse_pp(conn_t *co, const char *data, size_t datalen);
int sxmsg_reply(sxmsg_t *msg, const char *data, size_t datalen);
int sxmsg_reply_pp(sxmsg_t *msg, const char *data, size_t datalen);
int sxmsg_rreply(sxmsg_t *msg, const char *data, size_t datalen);
int sxmsg_rreply_pp(sxmsg_t *msg, const char *data, size_t datalen);
int sxmsg_rreply(sxmsg_t *msg, size_t datalen);
int sxmsg_return(sxmsg_t *msg, int opcode);
int sxmsg_return_pp(sxmsg_t *msg, int opcode);
#ifdef __cplusplus
}
#endif
/* RPC List API */
#define SNTL_FILTER_INC 0xa
#define SNTL_FILTER_EXC 0xb
#define SNTL_FILTER_END -1
#ifdef __cplusplus
extern "C" {
#endif
int sntl_rpclist_init(usrtc_t *tree);
int sntl_rpclist_add(usrtc_t *tree, int type, const char *description,
const char *version);
int sntl_rpclist_add_function(usrtc_t *tree, int type, const char *fu_name,
int (*rpcf)(void *, sexp_t *));
int sntl_rpclist_filter(usrtc_t *source, usrtc_t **dest, int flag, int *filter);
#ifdef __cplusplus
}
#endif
#endif /* __SNTL_SNTLLV2_H__ */

@ -145,33 +145,124 @@ int sxmsg_send_pp(chnl_t *channel, const char *data, size_t datalen, sxmsg_t **o
/* send a pulse message */
int sxmsg_pulse(conn_t *co, const char *data, size_t datalen)
{
return SNE_FAILED;
sxmsg_t *msg = malloc(sizeof(sxmsg_t));
sntllv2_head_t *head;
int r;
/* a little bit of paranoid tests */
if(!msg) return SNE_ENOMEM;
else memset(msg, 0, sizeof(sxmsg_t));
/* prepare it */
head = &msg->mhead;
head->attr = 0;
head->attr = SXMSG_PULSE | SXMSG_LINK;
head->opcode = SNE_RAPIDMSG;
head->payload_length = datalen;
msg->payload = (void *)data;
r = _sntll_writemsg(co, msg);
free(msg);
return r;
}
/* the same but will be postponed */
int sxmsg_pulse_pp(conn_t *co, const char *data, size_t datalen)
static inline int __sxmsg_reply(sxmsg_t *msg, const char *data,
size_t datalen, int pp)
{
return SNE_FAILED;
chnl_t *ch;
conn_t *co;
sntllv2_head_t *head;
ppmsg_t *ppm;
int r, i;
pthread_t self = pthread_self();
/* a little bit of paranoid tests */
if(!msg) return SNE_FAILED;
if(!(ch = msg->pch)) return SNE_FAILED;
if(!(co = ch->connection)) return SNE_FAILED;
/* test for blocking */
for(i = 0; i < 8; i++)
if(pthread_equal(self, co->thrd_poll[i])) return SNE_WOULDBLOCK;
/* prepare it */
head = &msg->mhead;
head->attr = 0;
head->attr |= SXMSG_REPLYREQ;
head->opcode = SNE_REPLYREQ;
head->payload_length = datalen;
msg->payload = (void *)data;
if(!pp) {
r = _sntll_writemsg(co, msg);
if(r != SNE_SUCCESS) return r;
} else {
if(!(ppm = malloc(sizeof(ppmsg_t)))) return SNE_ENOMEM;
list_init_node(&ppm->node);
ppm->msg = msg;
/* under locking here */
pthread_mutex_lock(&co->write_pending_lock);
list_add2tail(&co->write_pending, &ppm->node); /* push it to the FIFO */
pthread_mutex_unlock(&co->write_pending_lock);
}
pthread_mutex_lock(&msg->wait); /* wait */
r = head->opcode;
if((head->attr & SXMSG_CLOSED) && !head->payload_length) { /* dialog closed and no data exists */
pthread_mutex_unlock(&msg->wait); /* we able to invalidate it */
pthread_mutex_destroy(&msg->wait);
free(msg);
}
return r;
}
int sxmsg_reply(sxmsg_t *msg, const char *data, size_t datalen)
{
return SNE_FAILED;
return __sxmsg_reply(msg, data, datalen, 0);
}
int sxmsg_reply_pp(sxmsg_t *msg, const char *data, size_t datalen)
{
return SNE_FAILED;
return __sxmsg_reply(msg, data, datalen, 1);
}
int sxmsg_rreply(sxmsg_t *msg, const char *data, size_t datalen)
int sxmsg_rreply(sxmsg_t *msg, size_t datalen)
{
return SNE_FAILED;
}
chnl_t *ch;
conn_t *co;
sntllv2_head_t *head;
int r;
int sxmsg_rreply_pp(sxmsg_t *msg, const char *data, size_t datalen)
{
return SNE_FAILED;
/* a little bit of paranoid tests */
if(!msg) return SNE_FAILED;
if(!(ch = msg->pch)) return SNE_FAILED;
if(!(co = ch->connection)) return SNE_FAILED;
/* prepare it */
head = &msg->mhead;
head->attr = 0;
head->attr |= SXMSG_CLOSED;
head->opcode = SNE_RAPIDMSG;
head->payload_length = datalen;
pthread_mutex_lock(&co->idx_msg_lock);
idx_free(&co->idx_msg, head->msgid);
co->messages[head->msgid] = NULL;
pthread_mutex_unlock(&co->idx_msg_lock);
r = _sntll_writemsg(co, msg);
pthread_mutex_unlock(&msg->wait); /* we able to invalidate it */
pthread_mutex_destroy(&msg->wait);
free(msg);
return r;
}
static inline int __sxmsg_return(sxmsg_t *msg, int opcode, int pp)

@ -442,6 +442,7 @@ static void *__sntll_thread(void *b)
char *bbuf = (char*)buf;
sntllv2_head_t *mhead = (sntllv2_head_t *)buf;
sxmsg_t *msg;
sexp_t *sx;
chnl_t *channel;
pthread_t self = pthread_self();
int dispatch = 0;
@ -540,6 +541,11 @@ static void *__sntll_thread(void *b)
if(mhead->attr & SXMSG_CLOSED) goto __finish; /* close the link */
if(mhead->attr & SXMSG_PULSE) { /* it's a link pulse messages */
/* TODO: syncronization and so on */
if(mhead->opcode == SNE_RAPIDMSG) { /* custom pulse */
sx = parse_sexp(bbuf, mhead->payload_length);
if(sx && co->ssys->on_pulse) co->ssys->on_pulse(co, sx);
if(sx) destroy_sexp(sx);
}
}
} else { /* regular messages */
if((mhead->attr & SXMSG_OPEN) && (mhead->attr & SXMSG_REPLYREQ)) { /* dialog initiation */
@ -567,6 +573,7 @@ static void *__sntll_thread(void *b)
} else {
/* set mutex and channel */
pthread_mutex_init(&msg->wait, NULL);
pthread_mutex_lock(&msg->wait);
msg->pch = channel;
/* copy header only */
memcpy(&msg->mhead, mhead, sizeof(sntllv2_head_t));
@ -586,13 +593,15 @@ static void *__sntll_thread(void *b)
msg = co->messages[mhead->msgid];
if(!msg) goto __inval_idx_nor;
if(msg->mhead.attr & SXMSG_TIMEDOUT) { /* nobody wait for it */
pthread_mutex_lock(&co->idx_ch_lock);
idx_free(&co->idx_ch, mhead->msgid);
co->messages[mhead->msgid] = NULL;
pthread_mutex_unlock(&co->idx_ch_lock);
/* message dialog is closed - remove this right now */
pthread_mutex_lock(&co->idx_ch_lock);
idx_free(&co->idx_ch, mhead->msgid);
co->messages[mhead->msgid] = NULL;
pthread_mutex_unlock(&co->idx_ch_lock);
if(msg->mhead.attr & SXMSG_TIMEDOUT) { /* nobody wait for it */
/* now just free it */
pthread_mutex_unlock(&msg->wait);
pthread_mutex_destroy(&msg->wait);
free(msg);
} else {
@ -602,7 +611,7 @@ static void *__sntll_thread(void *b)
if(msg->payload) memcpy(msg->payload, bbuf, mhead->payload_length);
else msg->mhead.opcode = SNE_ENOMEM;
}
/* TODO: remove this message from list */
pthread_mutex_unlock(&msg->wait); /* wake up thread waiting for */
}
} else if((!(mhead->attr & SXMSG_CLOSED) && !(mhead->attr & SXMSG_OPEN)) &&

Loading…
Cancel
Save