You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
libsxmp/lib/channel.c

274 lines
6.6 KiB
C

/*
* Secure X Message Passing Library v2 implementation.
* (sxmplv2) it superseed all versions before due to the:
* - memory consumption
* - new features such as pulse emitting
* - performance optimization
*
* (c) Askele Group 2013-2015 <http://askele.com>
* (c) Alexander Vdolainen 2013-2015 <avdolainen@gmail.com>
*
* libsxmp is free software: you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* libsxmp is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.";
*
*/
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <fcntl.h>
#ifdef WIN32
#include <Winsock2.h>
#define EBADE 1
#define NETDB_SUCCESS 0
#else
#include <sys/select.h>
#include <netdb.h>
#include <unistd.h>
#include <uuid/uuid.h>
#endif
#include <openssl/ssl.h>
#include <openssl/err.h>
#include <tdata/usrtc.h>
#include <sexpr/sexp.h>
#include <sxmp/sxmp.h>
10 years ago
#include "internal.h"
/* locally used functions */
uint8_t _channel_open(sxlink_t *co, uint16_t *chid)
{
sxchnl_t *chan;
10 years ago
int typeid = *chid; /* our type */
uint16_t chidx;
usrtc_t *rpc_list = co->rpc_list;
usrtc_node_t *node;
rpc_typed_list_t *rlist;
sxl_rpclist_t *rpclist;
10 years ago
node = usrtc_lookup(rpc_list, &typeid);
if(!node) return SXE_EPERM;
else rlist = (rpc_typed_list_t *)usrtc_node_getdata(node);
if(rlist) rpclist = rlist->rpc_list;
else return SXE_FAILED;
chan = malloc(sizeof(sxchnl_t));
if(!chan) return SXE_ENOMEM;
/* init channel */
chan->link = co;
chan->rpc_list = rpclist;
chan->flags = 0;
chan->type_id = typeid;
pthread_mutex_lock(&co->idx_ch_lock);
chidx = idx_allocate(&co->idx_ch);
pthread_mutex_unlock(&co->idx_ch_lock);
if(chidx == IDX_INVAL) {
free(chan);
return SXE_MCHANNELS;
}
chan->cid = chidx;
co->channels[chidx] = chan;
*chid = chidx;
return SXE_SUCCESS;
}
uint8_t _channel_close(sxlink_t *co, uint16_t chid)
{
sxchnl_t *chan;
10 years ago
ulong_t chidx = chid;
if(chid > 512) return SXE_INVALINDEX;
else chan = co->channels[chidx];
if(!chan) return SXE_NOSUCHCHAN;
pthread_mutex_lock(&co->idx_ch_lock);
10 years ago
idx_free(&co->idx_ch, chidx);
co->channels[chidx] = NULL;
pthread_mutex_unlock(&co->idx_ch_lock);
free(chan);
return SXE_SUCCESS;
}
10 years ago
sxchnl_t *sxchannel_open(sxlink_t *co, int type)
10 years ago
{
sxchnl_t *chan = NULL;
10 years ago
sxmsg_t *msg = NULL;
sxmplv2_head_t *head;
int msgidx, r, ccid;
10 years ago
if(!co) {
r = SXE_FAILED;
10 years ago
goto __reterr;
}
if(!(chan = malloc(sizeof(sxchnl_t)))) {
10 years ago
__enomem:
10 years ago
if(chan) free(chan);
r = SXE_ENOMEM;
10 years ago
goto __reterr;
}
if(!(msg = malloc(sizeof(sxmsg_t)))) goto __enomem;
/* early init for the channel */
chan->link = co;
10 years ago
chan->flags = 0;
/* early init for the message */
pthread_mutex_init(&msg->wait, NULL);
pthread_mutex_lock(&msg->wait);
msg->pch = chan;
msg->payload = NULL;
memset(&msg->mhead, 0, sizeof(sxmplv2_head_t));
10 years ago
head = &msg->mhead;
/* form a header */
head->attr = SXMSG_PROTO | SXMSG_REPLYREQ | SXMSG_OPEN;
head->payload_length = 0;
head->reserve = (uint16_t)type;
/* try to alloc a message */
pthread_mutex_lock(&co->idx_msg_lock);
msgidx = idx_allocate(&co->idx_msg);
if(msgidx != IDX_INVAL) co->messages[msgidx] = msg;
pthread_mutex_unlock(&co->idx_msg_lock);
if(msgidx == IDX_INVAL) { r = SXE_MMESSAGES; goto __reterr2; }
else head->msgid = (uint16_t)msgidx;
10 years ago
/* now we're ready to write it */
r = _sxmpl_writemsg(co, msg);
if(r == SXE_SUCCESS) pthread_mutex_lock(&msg->wait);
10 years ago
else goto __reterr3;
/* we will wakeup on return */
if(msg->mhead.opcode != SXE_SUCCESS) { r = msg->mhead.opcode; goto __reterr3; }
10 years ago
/* ok all is fine */
chan->cid = msg->mhead.reserve;
chan->type_id = type;
ccid = chan->cid;
10 years ago
pthread_mutex_lock(&co->idx_ch_lock);
idx_reserve(&co->idx_ch, ccid);
co->channels[ccid] = chan;
10 years ago
pthread_mutex_unlock(&co->idx_ch_lock);
/* destroy a message */
pthread_mutex_lock(&co->idx_msg_lock);
idx_free(&co->idx_msg, msgidx);
co->messages[msgidx] = NULL;
pthread_mutex_unlock(&co->idx_msg_lock);
/* free allocated resources */
pthread_mutex_unlock(&msg->wait);
pthread_mutex_destroy(&msg->wait);
free(msg);
return chan;
__reterr3:
pthread_mutex_lock(&co->idx_msg_lock);
idx_free(&co->idx_msg, msgidx);
co->messages[msgidx] = NULL;
pthread_mutex_unlock(&co->idx_msg_lock);
__reterr2:
pthread_mutex_unlock(&msg->wait);
pthread_mutex_destroy(&msg->wait);
__reterr:
if(chan) free(chan);
if(msg) free(msg);
errno = r;
return NULL;
}
int sxchannel_close(sxchnl_t *channel)
10 years ago
{
int r = SXE_SUCCESS;
10 years ago
sxmsg_t *msg;
sxmplv2_head_t *head;
sxlink_t *co;
int msgidx = 0, chidx = 0;
10 years ago
/* check channel validity */
if(!channel) return SXE_FAILED;
else if(!(co = channel->link)) return SXE_FAILED;
if(channel->cid > 512) return SXE_IGNORED;
else chidx = channel->cid;
if(channel != co->channels[chidx]) return SXE_IGNORED;
10 years ago
if(!(msg = malloc(sizeof(sxmsg_t)))) return SXE_ENOMEM;
10 years ago
head = &msg->mhead;
memset(head, 0, sizeof(sxmplv2_head_t));
10 years ago
/* setup head */
head->attr = SXMSG_PROTO | SXMSG_REPLYREQ; /* close channel */
head->reserve = channel->cid;
/* setup message */
pthread_mutex_init(&msg->wait, NULL);
pthread_mutex_lock(&msg->wait);
msg->pch = channel;
/* allocate it */
pthread_mutex_lock(&co->idx_msg_lock);
msgidx = idx_allocate(&co->idx_msg);
if(msgidx != IDX_INVAL) co->messages[msgidx] = msg;
pthread_mutex_unlock(&co->idx_msg_lock);
if(msgidx == IDX_INVAL) { r = SXE_MMESSAGES; goto __reterr2; }
else head->msgid = msgidx;
10 years ago
r = _sxmpl_writemsg(co, msg);
if(r == SXE_SUCCESS) {
10 years ago
pthread_mutex_lock(&msg->wait);
r = head->opcode;
/* we will free this anyway */
pthread_mutex_lock(&co->idx_ch_lock);
10 years ago
idx_free(&co->idx_ch, chidx);
co->channels[chidx] = NULL;
10 years ago
pthread_mutex_unlock(&co->idx_ch_lock);
free(channel);
}
pthread_mutex_lock(&co->idx_msg_lock);
idx_free(&co->idx_msg, msgidx);
co->messages[msgidx] = NULL;
pthread_mutex_unlock(&co->idx_msg_lock);
__reterr2:
pthread_mutex_unlock(&msg->wait);
pthread_mutex_destroy(&msg->wait);
free(msg);
return r;
}