From 109d70a7162fa2e6ba9f543b5e0766e30760a4f9 Mon Sep 17 00:00:00 2001 From: Alexander Vdolainen Date: Tue, 21 Jul 2015 04:06:06 +0300 Subject: [PATCH] extern C added, minor fixes; --- include/sntl/sntllv2.h | 33 ++++++++++-- lib/messagesx.c | 115 ++++++++++++++++++++++++++++++++++++----- lib/sntllv2.c | 21 +++++--- 3 files changed, 146 insertions(+), 23 deletions(-) diff --git a/include/sntl/sntllv2.h b/include/sntl/sntllv2.h index 0b83ffc..cf5ec77 100644 --- a/include/sntl/sntllv2.h +++ b/include/sntl/sntllv2.h @@ -171,7 +171,7 @@ typedef struct __connections_subsys_type { int (*set_typed_list_callback)(conn_t *, int, char *); /** < this function is a callback * during setting up a typed channel */ void (*on_destroy)(conn_t *); /** < callback on connection destroy */ - void (*on_pulse)(conn_t *, sxmsg_t *); /** < callback on pulse emit */ + void (*on_pulse)(conn_t *, sexp_t *); /** < callback on pulse emit */ void *priv; } conn_sys_t; @@ -189,6 +189,10 @@ typedef struct __rpc_typed_list_type { usrtc_node_t lnode; } rpc_typed_list_t; +#ifdef __cplusplus +extern "C" { +#endif + /* API */ int connections_init(conn_sys_t *ssys); @@ -219,20 +223,39 @@ int sxmsg_send(chnl_t *channel, const char *data, size_t datalen, sxmsg_t **msg) int sxmsg_send_pp(chnl_t *channel, const char *data, size_t datalen, sxmsg_t **msg); /* send a pulse message */ int sxmsg_pulse(conn_t *co, const char *data, size_t datalen); -/* the same but will be postponed */ -int sxmsg_pulse_pp(conn_t *co, const char *data, size_t datalen); int sxmsg_reply(sxmsg_t *msg, const char *data, size_t datalen); int sxmsg_reply_pp(sxmsg_t *msg, const char *data, size_t datalen); -int sxmsg_rreply(sxmsg_t *msg, const char *data, size_t datalen); -int sxmsg_rreply_pp(sxmsg_t *msg, const char *data, size_t datalen); +int sxmsg_rreply(sxmsg_t *msg, size_t datalen); int sxmsg_return(sxmsg_t *msg, int opcode); int sxmsg_return_pp(sxmsg_t *msg, int opcode); +#ifdef __cplusplus +} +#endif + /* RPC List API */ #define SNTL_FILTER_INC 0xa #define SNTL_FILTER_EXC 0xb #define SNTL_FILTER_END -1 +#ifdef __cplusplus +extern "C" { +#endif + +int sntl_rpclist_init(usrtc_t *tree); + +int sntl_rpclist_add(usrtc_t *tree, int type, const char *description, + const char *version); + +int sntl_rpclist_add_function(usrtc_t *tree, int type, const char *fu_name, + int (*rpcf)(void *, sexp_t *)); + +int sntl_rpclist_filter(usrtc_t *source, usrtc_t **dest, int flag, int *filter); + +#ifdef __cplusplus +} +#endif + #endif /* __SNTL_SNTLLV2_H__ */ diff --git a/lib/messagesx.c b/lib/messagesx.c index c0faca0..213af6b 100644 --- a/lib/messagesx.c +++ b/lib/messagesx.c @@ -145,33 +145,124 @@ int sxmsg_send_pp(chnl_t *channel, const char *data, size_t datalen, sxmsg_t **o /* send a pulse message */ int sxmsg_pulse(conn_t *co, const char *data, size_t datalen) { - return SNE_FAILED; + sxmsg_t *msg = malloc(sizeof(sxmsg_t)); + sntllv2_head_t *head; + int r; + + /* a little bit of paranoid tests */ + if(!msg) return SNE_ENOMEM; + else memset(msg, 0, sizeof(sxmsg_t)); + + /* prepare it */ + head = &msg->mhead; + head->attr = 0; + head->attr = SXMSG_PULSE | SXMSG_LINK; + head->opcode = SNE_RAPIDMSG; + head->payload_length = datalen; + msg->payload = (void *)data; + + r = _sntll_writemsg(co, msg); + + free(msg); + + return r; } -/* the same but will be postponed */ -int sxmsg_pulse_pp(conn_t *co, const char *data, size_t datalen) +static inline int __sxmsg_reply(sxmsg_t *msg, const char *data, + size_t datalen, int pp) { - return SNE_FAILED; + chnl_t *ch; + conn_t *co; + sntllv2_head_t *head; + ppmsg_t *ppm; + int r, i; + pthread_t self = pthread_self(); + + /* a little bit of paranoid tests */ + if(!msg) return SNE_FAILED; + if(!(ch = msg->pch)) return SNE_FAILED; + if(!(co = ch->connection)) return SNE_FAILED; + + /* test for blocking */ + for(i = 0; i < 8; i++) + if(pthread_equal(self, co->thrd_poll[i])) return SNE_WOULDBLOCK; + + /* prepare it */ + head = &msg->mhead; + head->attr = 0; + head->attr |= SXMSG_REPLYREQ; + head->opcode = SNE_REPLYREQ; + head->payload_length = datalen; + msg->payload = (void *)data; + + if(!pp) { + r = _sntll_writemsg(co, msg); + if(r != SNE_SUCCESS) return r; + } else { + if(!(ppm = malloc(sizeof(ppmsg_t)))) return SNE_ENOMEM; + list_init_node(&ppm->node); + ppm->msg = msg; + + /* under locking here */ + pthread_mutex_lock(&co->write_pending_lock); + list_add2tail(&co->write_pending, &ppm->node); /* push it to the FIFO */ + pthread_mutex_unlock(&co->write_pending_lock); + } + + pthread_mutex_lock(&msg->wait); /* wait */ + + r = head->opcode; + + if((head->attr & SXMSG_CLOSED) && !head->payload_length) { /* dialog closed and no data exists */ + pthread_mutex_unlock(&msg->wait); /* we able to invalidate it */ + pthread_mutex_destroy(&msg->wait); + free(msg); + } + + return r; } int sxmsg_reply(sxmsg_t *msg, const char *data, size_t datalen) { - return SNE_FAILED; + return __sxmsg_reply(msg, data, datalen, 0); } int sxmsg_reply_pp(sxmsg_t *msg, const char *data, size_t datalen) { - return SNE_FAILED; + return __sxmsg_reply(msg, data, datalen, 1); } -int sxmsg_rreply(sxmsg_t *msg, const char *data, size_t datalen) +int sxmsg_rreply(sxmsg_t *msg, size_t datalen) { - return SNE_FAILED; -} + chnl_t *ch; + conn_t *co; + sntllv2_head_t *head; + int r; -int sxmsg_rreply_pp(sxmsg_t *msg, const char *data, size_t datalen) -{ - return SNE_FAILED; + /* a little bit of paranoid tests */ + if(!msg) return SNE_FAILED; + if(!(ch = msg->pch)) return SNE_FAILED; + if(!(co = ch->connection)) return SNE_FAILED; + + /* prepare it */ + head = &msg->mhead; + head->attr = 0; + head->attr |= SXMSG_CLOSED; + head->opcode = SNE_RAPIDMSG; + head->payload_length = datalen; + + pthread_mutex_lock(&co->idx_msg_lock); + idx_free(&co->idx_msg, head->msgid); + co->messages[head->msgid] = NULL; + pthread_mutex_unlock(&co->idx_msg_lock); + + r = _sntll_writemsg(co, msg); + + pthread_mutex_unlock(&msg->wait); /* we able to invalidate it */ + pthread_mutex_destroy(&msg->wait); + free(msg); + + return r; } static inline int __sxmsg_return(sxmsg_t *msg, int opcode, int pp) diff --git a/lib/sntllv2.c b/lib/sntllv2.c index 35961d7..512d102 100644 --- a/lib/sntllv2.c +++ b/lib/sntllv2.c @@ -442,6 +442,7 @@ static void *__sntll_thread(void *b) char *bbuf = (char*)buf; sntllv2_head_t *mhead = (sntllv2_head_t *)buf; sxmsg_t *msg; + sexp_t *sx; chnl_t *channel; pthread_t self = pthread_self(); int dispatch = 0; @@ -540,6 +541,11 @@ static void *__sntll_thread(void *b) if(mhead->attr & SXMSG_CLOSED) goto __finish; /* close the link */ if(mhead->attr & SXMSG_PULSE) { /* it's a link pulse messages */ /* TODO: syncronization and so on */ + if(mhead->opcode == SNE_RAPIDMSG) { /* custom pulse */ + sx = parse_sexp(bbuf, mhead->payload_length); + if(sx && co->ssys->on_pulse) co->ssys->on_pulse(co, sx); + if(sx) destroy_sexp(sx); + } } } else { /* regular messages */ if((mhead->attr & SXMSG_OPEN) && (mhead->attr & SXMSG_REPLYREQ)) { /* dialog initiation */ @@ -567,6 +573,7 @@ static void *__sntll_thread(void *b) } else { /* set mutex and channel */ pthread_mutex_init(&msg->wait, NULL); + pthread_mutex_lock(&msg->wait); msg->pch = channel; /* copy header only */ memcpy(&msg->mhead, mhead, sizeof(sntllv2_head_t)); @@ -586,13 +593,15 @@ static void *__sntll_thread(void *b) msg = co->messages[mhead->msgid]; if(!msg) goto __inval_idx_nor; - if(msg->mhead.attr & SXMSG_TIMEDOUT) { /* nobody wait for it */ - pthread_mutex_lock(&co->idx_ch_lock); - idx_free(&co->idx_ch, mhead->msgid); - co->messages[mhead->msgid] = NULL; - pthread_mutex_unlock(&co->idx_ch_lock); + /* message dialog is closed - remove this right now */ + pthread_mutex_lock(&co->idx_ch_lock); + idx_free(&co->idx_ch, mhead->msgid); + co->messages[mhead->msgid] = NULL; + pthread_mutex_unlock(&co->idx_ch_lock); + if(msg->mhead.attr & SXMSG_TIMEDOUT) { /* nobody wait for it */ /* now just free it */ + pthread_mutex_unlock(&msg->wait); pthread_mutex_destroy(&msg->wait); free(msg); } else { @@ -602,7 +611,7 @@ static void *__sntll_thread(void *b) if(msg->payload) memcpy(msg->payload, bbuf, mhead->payload_length); else msg->mhead.opcode = SNE_ENOMEM; } - /* TODO: remove this message from list */ + pthread_mutex_unlock(&msg->wait); /* wake up thread waiting for */ } } else if((!(mhead->attr & SXMSG_CLOSED) && !(mhead->attr & SXMSG_OPEN)) &&