diff --git a/include/sntl/sntllv2.h b/include/sntl/sntllv2.h index d25e7eb..0b83ffc 100644 --- a/include/sntl/sntllv2.h +++ b/include/sntl/sntllv2.h @@ -96,6 +96,11 @@ typedef struct __connection_t { struct __connection_rpc_list_type; struct __message_t; +typedef struct __pp_msg_type { + struct __message_t *msg; + list_node_t node; +} ppmsg_t; + typedef struct __channel_t { uint16_t cid; /** < ID of the channel */ conn_t *connection; /** < pointer to the parent connection */ diff --git a/lib/messagesx.c b/lib/messagesx.c index 03ee245..c0faca0 100644 --- a/lib/messagesx.c +++ b/lib/messagesx.c @@ -27,6 +27,8 @@ #include +#include "internal.h" + void _message_process(sxmsg_t *msg) { chnl_t *chan = msg->pch; @@ -58,15 +60,86 @@ void _message_process(sxmsg_t *msg) return; } -int sxmsg_send(chnl_t *channel, const char *data, size_t datalen, sxmsg_t **msg) +static inline int __sxmsg_send(chnl_t *channel, const char *data, size_t datalen, + sxmsg_t **omsg, int pp) { - return SNE_FAILED; + conn_t *co; + sxmsg_t *msg; + sntllv2_head_t *head; + ppmsg_t *ppm; + int msgidx, r; + + if(!channel) return SNE_FAILED; + if(!data || !datalen) return SNE_FAILED; + + if(!(msg = malloc(sizeof(sxmsg_t)))) return SNE_ENOMEM; + else memset(msg, 0, sizeof(sxmsg_t)); + + co = channel->connection; + head = &msg->mhead; + /* form a head */ + head->attr = SXMSG_OPEN | SXMSG_REPLYREQ; + head->reserve = channel->cid; + head->payload_length = datalen; + /* init message itself */ + pthread_mutex_init(&msg->wait, NULL); + pthread_mutex_lock(&msg->wait); + msg->pch = channel; + msg->payload = (void *)data; + + pthread_mutex_lock(&co->idx_msg_lock); + msgidx = idx_allocate(&co->idx_msg); + if(msgidx != IDX_INVAL) co->messages[msgidx] = msg; + pthread_mutex_unlock(&co->idx_msg_lock); + + if(msgidx == IDX_INVAL) { r = SNE_MMESSAGES; goto __freemsg; } + else head->msgid = (uint16_t)msgidx; + + /* ready to send it */ + if(!pp) { + r = _sntll_writemsg(co, msg); + if(r != SNE_SUCCESS) goto __closemsg; + } else { /* postponed */ + if(!(ppm = malloc(sizeof(ppmsg_t)))) { r = SNE_ENOMEM; goto __closemsg; } + list_init_node(&ppm->node); + ppm->msg = msg; + + /* under locking here */ + pthread_mutex_lock(&co->write_pending_lock); + list_add2tail(&co->write_pending, &ppm->node); /* push it to the FIFO */ + pthread_mutex_unlock(&co->write_pending_lock); + } + + pthread_mutex_lock(&msg->wait); /* we will sleep here */ + + if(head->payload_length) { + *omsg = msg; + return head->opcode; + } else r = head->opcode; + + __closemsg: + pthread_mutex_lock(&co->idx_msg_lock); + idx_free(&co->idx_msg, msgidx); + co->messages[msgidx] = NULL; + pthread_mutex_unlock(&co->idx_msg_lock); + __freemsg: + /* free resources for message */ + pthread_mutex_unlock(&msg->wait); + pthread_mutex_destroy(&msg->wait); + free(msg); + + return r; +} + +int sxmsg_send(chnl_t *channel, const char *data, size_t datalen, sxmsg_t **omsg) +{ + return __sxmsg_send(channel, data, datalen, omsg, 0); } /* the same - postponed message i.e. will be written to the queue - not to write immendatly */ -int sxmsg_send_pp(chnl_t *channel, const char *data, size_t datalen, sxmsg_t **msg) +int sxmsg_send_pp(chnl_t *channel, const char *data, size_t datalen, sxmsg_t **omsg) { - return SNE_FAILED; + return __sxmsg_send(channel, data, datalen, omsg, 1); } /* send a pulse message */ @@ -101,13 +174,63 @@ int sxmsg_rreply_pp(sxmsg_t *msg, const char *data, size_t datalen) return SNE_FAILED; } +static inline int __sxmsg_return(sxmsg_t *msg, int opcode, int pp) +{ + chnl_t *ch; + conn_t *co; + sntllv2_head_t *head; + ppmsg_t *ppm; + int r; + + /* a little bit of paranoid tests */ + if(!msg) return SNE_FAILED; + if(!(ch = msg->pch)) return SNE_FAILED; + if(!(co = ch->connection)) return SNE_FAILED; + + head = &msg->mhead; + head->attr = 0; + head->attr |= SXMSG_CLOSED; + head->opcode = opcode; + head->payload_length = 0; + + if(!pp) { + /* free index */ + pthread_mutex_lock(&co->idx_msg_lock); + idx_free(&co->idx_msg, head->msgid); + co->messages[head->msgid] = NULL; + pthread_mutex_unlock(&co->idx_msg_lock); + + r = _sntll_writemsg(co, msg); + } else { + if(!(ppm = malloc(sizeof(ppmsg_t)))) return SNE_ENOMEM; + else { /* remove it */ + pthread_mutex_lock(&co->idx_msg_lock); + idx_free(&co->idx_msg, head->msgid); + co->messages[head->msgid] = NULL; + pthread_mutex_unlock(&co->idx_msg_lock); + } + + list_init_node(&ppm->node); + ppm->msg = msg; + + /* under locking here */ + pthread_mutex_lock(&co->write_pending_lock); + list_add2tail(&co->write_pending, &ppm->node); /* push it to the FIFO */ + pthread_mutex_unlock(&co->write_pending_lock); + + r = SNE_SUCCESS; + } + + return r; +} + int sxmsg_return(sxmsg_t *msg, int opcode) { - return SNE_FAILED; + return __sxmsg_return(msg, opcode, 0); } int sxmsg_return_pp(sxmsg_t *msg, int opcode) { - return SNE_FAILED; + return __sxmsg_return(msg, opcode, 1); } diff --git a/lib/sntllv2.c b/lib/sntllv2.c index 050c85d..35961d7 100644 --- a/lib/sntllv2.c +++ b/lib/sntllv2.c @@ -570,7 +570,7 @@ static void *__sntll_thread(void *b) msg->pch = channel; /* copy header only */ memcpy(&msg->mhead, mhead, sizeof(sntllv2_head_t)); - msg->payload = bbuf; + if(mhead->payload_length) msg->payload = bbuf; } pthread_mutex_lock(&co->idx_ch_lock); @@ -602,6 +602,7 @@ static void *__sntll_thread(void *b) if(msg->payload) memcpy(msg->payload, bbuf, mhead->payload_length); else msg->mhead.opcode = SNE_ENOMEM; } + /* TODO: remove this message from list */ pthread_mutex_unlock(&msg->wait); /* wake up thread waiting for */ } } else if((!(mhead->attr & SXMSG_CLOSED) && !(mhead->attr & SXMSG_OPEN)) &&