added messages passing, not all;
This commit is contained in:
parent
1a7b18d749
commit
d56d8bd14d
@ -96,6 +96,11 @@ typedef struct __connection_t {
|
||||
struct __connection_rpc_list_type;
|
||||
struct __message_t;
|
||||
|
||||
typedef struct __pp_msg_type {
|
||||
struct __message_t *msg;
|
||||
list_node_t node;
|
||||
} ppmsg_t;
|
||||
|
||||
typedef struct __channel_t {
|
||||
uint16_t cid; /** < ID of the channel */
|
||||
conn_t *connection; /** < pointer to the parent connection */
|
||||
|
135
lib/messagesx.c
135
lib/messagesx.c
@ -27,6 +27,8 @@
|
||||
|
||||
#include <sntl/sntllv2.h>
|
||||
|
||||
#include "internal.h"
|
||||
|
||||
void _message_process(sxmsg_t *msg)
|
||||
{
|
||||
chnl_t *chan = msg->pch;
|
||||
@ -58,15 +60,86 @@ void _message_process(sxmsg_t *msg)
|
||||
return;
|
||||
}
|
||||
|
||||
int sxmsg_send(chnl_t *channel, const char *data, size_t datalen, sxmsg_t **msg)
|
||||
static inline int __sxmsg_send(chnl_t *channel, const char *data, size_t datalen,
|
||||
sxmsg_t **omsg, int pp)
|
||||
{
|
||||
return SNE_FAILED;
|
||||
conn_t *co;
|
||||
sxmsg_t *msg;
|
||||
sntllv2_head_t *head;
|
||||
ppmsg_t *ppm;
|
||||
int msgidx, r;
|
||||
|
||||
if(!channel) return SNE_FAILED;
|
||||
if(!data || !datalen) return SNE_FAILED;
|
||||
|
||||
if(!(msg = malloc(sizeof(sxmsg_t)))) return SNE_ENOMEM;
|
||||
else memset(msg, 0, sizeof(sxmsg_t));
|
||||
|
||||
co = channel->connection;
|
||||
head = &msg->mhead;
|
||||
/* form a head */
|
||||
head->attr = SXMSG_OPEN | SXMSG_REPLYREQ;
|
||||
head->reserve = channel->cid;
|
||||
head->payload_length = datalen;
|
||||
/* init message itself */
|
||||
pthread_mutex_init(&msg->wait, NULL);
|
||||
pthread_mutex_lock(&msg->wait);
|
||||
msg->pch = channel;
|
||||
msg->payload = (void *)data;
|
||||
|
||||
pthread_mutex_lock(&co->idx_msg_lock);
|
||||
msgidx = idx_allocate(&co->idx_msg);
|
||||
if(msgidx != IDX_INVAL) co->messages[msgidx] = msg;
|
||||
pthread_mutex_unlock(&co->idx_msg_lock);
|
||||
|
||||
if(msgidx == IDX_INVAL) { r = SNE_MMESSAGES; goto __freemsg; }
|
||||
else head->msgid = (uint16_t)msgidx;
|
||||
|
||||
/* ready to send it */
|
||||
if(!pp) {
|
||||
r = _sntll_writemsg(co, msg);
|
||||
if(r != SNE_SUCCESS) goto __closemsg;
|
||||
} else { /* postponed */
|
||||
if(!(ppm = malloc(sizeof(ppmsg_t)))) { r = SNE_ENOMEM; goto __closemsg; }
|
||||
list_init_node(&ppm->node);
|
||||
ppm->msg = msg;
|
||||
|
||||
/* under locking here */
|
||||
pthread_mutex_lock(&co->write_pending_lock);
|
||||
list_add2tail(&co->write_pending, &ppm->node); /* push it to the FIFO */
|
||||
pthread_mutex_unlock(&co->write_pending_lock);
|
||||
}
|
||||
|
||||
pthread_mutex_lock(&msg->wait); /* we will sleep here */
|
||||
|
||||
if(head->payload_length) {
|
||||
*omsg = msg;
|
||||
return head->opcode;
|
||||
} else r = head->opcode;
|
||||
|
||||
__closemsg:
|
||||
pthread_mutex_lock(&co->idx_msg_lock);
|
||||
idx_free(&co->idx_msg, msgidx);
|
||||
co->messages[msgidx] = NULL;
|
||||
pthread_mutex_unlock(&co->idx_msg_lock);
|
||||
__freemsg:
|
||||
/* free resources for message */
|
||||
pthread_mutex_unlock(&msg->wait);
|
||||
pthread_mutex_destroy(&msg->wait);
|
||||
free(msg);
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
int sxmsg_send(chnl_t *channel, const char *data, size_t datalen, sxmsg_t **omsg)
|
||||
{
|
||||
return __sxmsg_send(channel, data, datalen, omsg, 0);
|
||||
}
|
||||
|
||||
/* the same - postponed message i.e. will be written to the queue - not to write immendatly */
|
||||
int sxmsg_send_pp(chnl_t *channel, const char *data, size_t datalen, sxmsg_t **msg)
|
||||
int sxmsg_send_pp(chnl_t *channel, const char *data, size_t datalen, sxmsg_t **omsg)
|
||||
{
|
||||
return SNE_FAILED;
|
||||
return __sxmsg_send(channel, data, datalen, omsg, 1);
|
||||
}
|
||||
|
||||
/* send a pulse message */
|
||||
@ -101,13 +174,63 @@ int sxmsg_rreply_pp(sxmsg_t *msg, const char *data, size_t datalen)
|
||||
return SNE_FAILED;
|
||||
}
|
||||
|
||||
static inline int __sxmsg_return(sxmsg_t *msg, int opcode, int pp)
|
||||
{
|
||||
chnl_t *ch;
|
||||
conn_t *co;
|
||||
sntllv2_head_t *head;
|
||||
ppmsg_t *ppm;
|
||||
int r;
|
||||
|
||||
/* a little bit of paranoid tests */
|
||||
if(!msg) return SNE_FAILED;
|
||||
if(!(ch = msg->pch)) return SNE_FAILED;
|
||||
if(!(co = ch->connection)) return SNE_FAILED;
|
||||
|
||||
head = &msg->mhead;
|
||||
head->attr = 0;
|
||||
head->attr |= SXMSG_CLOSED;
|
||||
head->opcode = opcode;
|
||||
head->payload_length = 0;
|
||||
|
||||
if(!pp) {
|
||||
/* free index */
|
||||
pthread_mutex_lock(&co->idx_msg_lock);
|
||||
idx_free(&co->idx_msg, head->msgid);
|
||||
co->messages[head->msgid] = NULL;
|
||||
pthread_mutex_unlock(&co->idx_msg_lock);
|
||||
|
||||
r = _sntll_writemsg(co, msg);
|
||||
} else {
|
||||
if(!(ppm = malloc(sizeof(ppmsg_t)))) return SNE_ENOMEM;
|
||||
else { /* remove it */
|
||||
pthread_mutex_lock(&co->idx_msg_lock);
|
||||
idx_free(&co->idx_msg, head->msgid);
|
||||
co->messages[head->msgid] = NULL;
|
||||
pthread_mutex_unlock(&co->idx_msg_lock);
|
||||
}
|
||||
|
||||
list_init_node(&ppm->node);
|
||||
ppm->msg = msg;
|
||||
|
||||
/* under locking here */
|
||||
pthread_mutex_lock(&co->write_pending_lock);
|
||||
list_add2tail(&co->write_pending, &ppm->node); /* push it to the FIFO */
|
||||
pthread_mutex_unlock(&co->write_pending_lock);
|
||||
|
||||
r = SNE_SUCCESS;
|
||||
}
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
int sxmsg_return(sxmsg_t *msg, int opcode)
|
||||
{
|
||||
return SNE_FAILED;
|
||||
return __sxmsg_return(msg, opcode, 0);
|
||||
}
|
||||
|
||||
int sxmsg_return_pp(sxmsg_t *msg, int opcode)
|
||||
{
|
||||
return SNE_FAILED;
|
||||
return __sxmsg_return(msg, opcode, 1);
|
||||
}
|
||||
|
||||
|
@ -570,7 +570,7 @@ static void *__sntll_thread(void *b)
|
||||
msg->pch = channel;
|
||||
/* copy header only */
|
||||
memcpy(&msg->mhead, mhead, sizeof(sntllv2_head_t));
|
||||
msg->payload = bbuf;
|
||||
if(mhead->payload_length) msg->payload = bbuf;
|
||||
}
|
||||
|
||||
pthread_mutex_lock(&co->idx_ch_lock);
|
||||
@ -602,6 +602,7 @@ static void *__sntll_thread(void *b)
|
||||
if(msg->payload) memcpy(msg->payload, bbuf, mhead->payload_length);
|
||||
else msg->mhead.opcode = SNE_ENOMEM;
|
||||
}
|
||||
/* TODO: remove this message from list */
|
||||
pthread_mutex_unlock(&msg->wait); /* wake up thread waiting for */
|
||||
}
|
||||
} else if((!(mhead->attr & SXMSG_CLOSED) && !(mhead->attr & SXMSG_OPEN)) &&
|
||||
|
Loading…
x
Reference in New Issue
Block a user