You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
libsxmp/lib/stream.c

763 lines
20 KiB
C

/*
* Secure X Message Passing Library v2 implementation.
* (sxmplv2) it superseed all versions before due to the:
* - memory consumption
* - new features such as pulse emitting
* - performance optimization
*
* (c) Askele Group 2013-2015 <http://askele.com>
* (c) Alexander Vdolainen 2013-2015,2016 <avdolainen@zoho.com>
*
* libsxmp is free software: you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* libsxmp is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.";
*
*/
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <openssl/ssl.h>
#include <openssl/err.h>
#include <openssl/engine.h>
#include <tdata/usrtc.h>
#include <tdata/list.h>
#include <sexpr/sexp.h>
#include <sxmp/limits.h>
#include <sxmp/sxmp.h>
#include "internal.h"
static void __free_listentry(struct _stream_list_node *listentry);
static int __flag_check(int s_flag, int d_flag)
{
int i;
for(i = 1; i < 7; i++)
if((d_flag & (1 << i)) && !(s_flag & (1 << i))) return 1;
return 0;
}
static sxstream_t *__stream_open(sxlink_t *link, sxchnl_t *channel, const char *opt,
struct sxstream_description *desc, int stid, int flags)
{
char *msgbuf = NULL;
size_t mlen = 0;
sxstream_t *ss = NULL;
sxmsg_t *msg = NULL;
char *buf = NULL;
sexp_t *sx, *isx;
uint64_t sid = 0;
int r = 0, idx;
/* check flags */
if(__flag_check(desc->flags, flags)) {
errno = SXE_EPERM;
goto __fini;
}
/* time to alloc all stuff */
mlen = strlen(_SXSTREAMOPEN_CMD) + strlen(_SXSTREAMTYPE_PREFIX) + strlen(_SXSTREAMFLAGS_PREFIX);
if(!opt)
mlen += (_SXSTREAMOPEN_ADDMINLEN + sizeof(char));
else mlen += (_SXSTREAMOPEN_ADDMAXLEN + sizeof(char)) + strlen(opt);
if(!(msgbuf = malloc(mlen*sizeof(char)))) {
errno = SXE_ENOMEM;
goto __fini;
}
if(!opt)
mlen = snprintf(msgbuf, mlen, "(%s (%s %d)(%s %d))", _SXSTREAMOPEN_CMD,
_SXSTREAMTYPE_PREFIX, stid, _SXSTREAMFLAGS_PREFIX, flags);
else
mlen = snprintf(msgbuf, mlen, "(%s (%s %d)(%s %d)(%s \"%s\"))", _SXSTREAMOPEN_CMD,
_SXSTREAMTYPE_PREFIX, stid, _SXSTREAMFLAGS_PREFIX, flags,
_SXSTREAMOPT_PREFIX, opt);
r = sxmsg_send(channel, msgbuf, mlen + sizeof(char), &msg);
if(r != SXE_RAPIDMSG) {
errno = r;
goto __fini;
}
/* time to get the special id, put it to the link and clean up all */
buf = sxmsg_rapidbuf(msg);
if(!(sx = parse_sexp(buf, strlen(buf)))) {
errno = SXE_ENOMEM;
sxmsg_clean(msg);
goto __fini;
} else sxmsg_clean(msg);
SEXP_ITERATE_LIST(sx, isx, idx) {
if(isx->ty == SEXP_LIST) r = SXE_BADPROTO;
if(!idx && strcmp(isx->val, _SXSTREAMOPEN_CMD_R)) r = SXE_BADPROTO;
if(idx && idx < 2) sid = strtoul(isx->val, NULL, 0);
if(idx >= 2) r = SXE_BADPROTO;
}
destroy_sexp(sx);
if(!r) {
if(!(ss = malloc(sizeof(sxstream_t)))) {
errno = SXE_ENOMEM;
goto __fini;
}
memset(ss, 0, sizeof(sxstream_t));
ss->flags = flags;
ss->sid = sid;
ss->link = link;
ss->channel = channel;
} else errno = r;
__fini:
if(msgbuf) free(msgbuf);
return ss;
}
static struct sxstream_description *__get_stream_description(sxlink_t *link, int stid)
{
usrtc_node_t *node = NULL;
struct sxstream_description *s_desc = NULL;
if(!link->remote_streams) return NULL;
else node = usrtc_lookup(link->remote_streams, &stid);
if(node) s_desc = (struct sxstream_description *)usrtc_node_getdata(node);
return s_desc;
}
/*
* NOTE: sxstream operations (except opening process i.e. few threads
* can open different streams) are NOT _thread-safe_, please
* care about that in your own application.
*/
sxstream_t *sxstream_open(sxlink_t *link, const char *opt, int stid, int flags)
{
sxchnl_t *channel = NULL;
struct sxstream_description *s_desc;
sxstream_t *ss = NULL;
int r = 0;
if(!link) {
r = SXE_FAILED;
goto __fini;
}
if(!link->remote_streams) {
r = SXE_NOSUCHSTREAMTYPE;
goto __fini;
}
/* ok, let's a deal with channel */
if(!(s_desc = __get_stream_description(link, stid))) {
r = SXE_NOSUCHSTREAMTYPE;
goto __fini;
} else channel = sxchannel_open(link, s_desc->pcid);
if(!channel) {
r = errno;
goto __fini;
}
flags |= SXE_O_OCHANNELED;
ss = __stream_open(link, channel, opt, s_desc, stid, flags);
__fini:
errno = r;
if(!ss && channel) sxchannel_close(channel);
return ss;
}
sxstream_t *sxstream_openwch(sxlink_t *link, sxchnl_t *channel, const char *opt,
int stid, int flags)
{
struct sxstream_description *s_desc = NULL;
sxstream_t *ss = NULL;
int r = 0;
if(!link || !channel) {
r = SXE_FAILED;
goto __fini;
}
if(!link->remote_streams) {
r = SXE_NOSUCHSTREAMTYPE;
goto __fini;
}
/* get description */
if(!(s_desc = __get_stream_description(link, stid))) {
r = SXE_NOSUCHSTREAMTYPE;
goto __fini;
}
ss = __stream_open(link, channel, opt, s_desc, stid, flags);
__fini:
errno = r;
return ss;
}
int sxstream_close(sxstream_t *stream)
{
sxmsg_t *msg = NULL;
char *mbuf;
list_node_t *iter, *s_iter;
struct _stream_list_node *listentry;
size_t mbuf_len = strlen(_SXSTREAMCLOSE_CMD) + _SXSTREAMCLOSE_ADDLEN;
int r = 0;
if(!stream) return SXE_FAILED;
if(!stream->link || !stream->channel) return SXE_FAILED;
if(!(mbuf = malloc(mbuf_len))) return SXE_ENOMEM;
else mbuf_len = snprintf(mbuf, mbuf_len, "(%s %lu)", _SXSTREAMCLOSE_CMD, stream->sid);
r = sxmsg_send(stream->channel, mbuf, mbuf_len + sizeof(char), &msg);
if(r == SXE_RAPIDMSG) sxmsg_clean(msg);
if(r == SXE_SUCCESS) {
/* TODO: free all caches, buffers etc */
if(!(stream->flags & SXE_O_BINARY)) {
if(!(stream->flags & SXE_O_NAMED) && stream->entries) {
list_for_each_safe(stream->entries, iter, s_iter) {
listentry = list_entry(iter, struct _stream_list_node, node);
list_del(iter);
__free_listentry(listentry);
}
}
}
if(stream->flags & SXE_O_OCHANNELED)
r = sxchannel_close(stream->channel);
free(stream);
}
free(mbuf);
return r;
}
/* binary stream operations */
/*
* NOTE: it will be done with with a copy paste,
* there are todo - implement it with oilite style.
*/
size_t sxstream_bread(sxstream_t *stream, void *buf, size_t buf_len)
{
return -1;
}
size_t sxstream_breadat(sxstream_t *stream, void *buf, size_t buf_len, off_t off)
{
return -1;
}
size_t sxstream_bwriteat(sxstream_t *stream, const void *buf,
size_t buf_len, off_t off)
{
return -1;
}
size_t sxstream_bsyncat(sxstream_t *stream, void *buf, size_t buf_len, off_t off)
{
return -1;
}
/*
* NOTE: returning value for reading from entry streams
* shouldn't be managed, it's done by close operation.
* That means you should care about ridden values, since
* it might be freed on the next read operation.
*/
static struct _nn_stream_entry_node *__alloc_entry(const char *value)
{
struct _nn_stream_entry_node *o = malloc(sizeof(struct _nn_stream_entry_node));
if(!o) return NULL;
if(!(o->value = strdup(value))) {
free(o);
return NULL;
}
list_init_node(&o->node);
return o;
}
static void __free_entry(struct _nn_stream_entry_node *entry)
{
free(entry->value);
free(entry);
return;
}
static struct _stream_list_node *__alloc_listentry(void)
{
struct _stream_list_node *o = malloc(sizeof(struct _stream_list_node));
if(!o) return NULL;
if(!(o->list = malloc(sizeof(list_head_t)))) {
free(o);
return NULL;
}
list_init_head(o->list);
list_init_node(&o->node);
return o;
}
static void __free_listentry(struct _stream_list_node *listentry)
{
struct _nn_stream_entry_node *entry;
list_node_t *iter, *s_iter;
list_for_each_safe(listentry->list, iter, s_iter) {
entry = list_entry(iter, struct _nn_stream_entry_node, node);
list_del(iter);
__free_entry(entry);
}
free(listentry->list);
free(listentry);
return;
}
/* entry nonamed stream ops */
list_head_t *sxstream_read(sxstream_t *stream)
{
sxmsg_t *msg;
size_t mbuf_len;
char *mbuf = NULL;
list_node_t *cur = NULL, *iter, *s_iter;
struct _stream_list_node *listentry = NULL;
struct _nn_stream_entry_node *entry = NULL;
sexp_t *sx, *isx, *iisx;
int r = 0, idx, iidx;
errno = SXE_FAILED;
if(!stream) goto __fail;
if(!stream->link || !stream->channel) goto __fail;
if(stream->entries) {
if(stream->cur == list_node_last(stream->entries)) {
cur = stream->cur;
stream->cur = NULL; /* we need to read from remote again on the next operation */
} else if(stream->cur && stream->cur != list_node_last(stream->entries)) {
cur = stream->cur;
if(!list_node_next_isbound(cur))
stream->cur = stream->cur->next;
else stream->cur = NULL;
} else goto __read_seq;
__success:
if(cur) {
errno = 0;
listentry = list_entry(cur, struct _stream_list_node, node);
return listentry->list;
} else return NULL;
}
__read_seq:
errno = SXE_ENOMEM;
if(stream->entries) { /* ok, clean up all stuff */
list_for_each_safe(stream->entries, iter, s_iter) {
listentry = list_entry(iter, struct _stream_list_node, node);
list_del(iter);
__free_listentry(listentry);
}
} else { /* allocate it first time */
if(!(stream->entries = malloc(sizeof(list_head_t)))) goto __fail;
list_init_head(stream->entries);
}
/* prepare a message */
mbuf_len = strlen(_SXSTREAMEREAD_CMD) + _SXSTREAMEREAD_ADDLEN;
if(!(mbuf = malloc(mbuf_len))) goto __fail;
mbuf_len = snprintf(mbuf, mbuf_len, "(%s %lu)", _SXSTREAMEREAD_CMD, stream->sid);
r = sxmsg_send(stream->channel, mbuf, mbuf_len, &msg);
switch(r) {
case SXE_RAPIDMSG: /* get our data */
sx = parse_sexp(sxmsg_payload(msg), strlen(sxmsg_payload(msg)));
sxmsg_clean(msg);
if(!sx) {
__badproto:
if(sx) destroy_sexp(sx);
errno = SXE_BADPROTO;
goto __fail;
}
SEXP_ITERATE_LIST(sx, isx, idx) {
if(isx->ty != SEXP_LIST) goto __badproto;
/* allocate list entry and add it */
if(!(listentry = __alloc_listentry())) {
__enomem:
errno = SXE_ENOMEM;
destroy_sexp(sx);
goto __fail;
} else list_add2tail(stream->entries, &listentry->node);
SEXP_ITERATE_LIST(isx, iisx, iidx) {
if(isx->ty == SEXP_LIST) goto __badproto;
if(!(entry = __alloc_entry((const char *)iisx->val))) goto __enomem;
else list_add2tail(listentry->list, &entry->node);
}
}
/* ok let's see what we got */
if(idx) {
stream->cur = list_node_first(stream->entries);
cur = stream->cur;
/* point to the next one if exists */
if(!list_node_next_isbound(cur)) stream->cur = stream->cur->next;
else stream->cur = NULL;
} else {
errno = SXE_NILREPLY;
stream->cur = NULL;
cur = NULL;
}
destroy_sexp(sx);
break;
case SXE_EOS: /* stream is over */
errno = SXE_EOS;
cur = NULL;
break;
default:
errno = r;
goto __fail;
break;
}
free(mbuf);
goto __success;
__fail:
if(mbuf) free(mbuf);
return NULL;
}
/* entry named stream ops */
list_head_t *sxstream_readnamed(sxstream_t *stream)
{
return NULL;
}
/* builtin functions for streams */
static struct sxstream_opened *__alloc_stream(sxlink_t *link)
{
int idx;
struct sxstream_opened *stream;
pthread_mutex_lock(&link->idx_streams_lock);
idx = idx_allocate(&link->idx_streams);
pthread_mutex_unlock(&link->idx_streams_lock);
if(idx == IDX_INVAL) return NULL;
else stream = link->streams[idx];
if(!link->streams[idx] && !(stream = malloc(sizeof(struct sxstream_opened)))) {
__inval:
pthread_mutex_lock(&link->idx_streams_lock);
idx_free(&link->idx_streams, idx);
pthread_mutex_unlock(&link->idx_streams_lock);
return NULL;
} else if(link->streams[idx]) goto __inval;
memset(stream, 0, sizeof(struct sxstream_opened));
stream->sto_id = idx;
link->streams[idx] = stream;
return stream;
}
static void __free_stream(sxlink_t *link, struct sxstream_opened *stream)
{
pthread_mutex_lock(&link->idx_streams_lock);
idx_free(&link->idx_streams, stream->sto_id);
link->streams[stream->sto_id] = NULL;
pthread_mutex_unlock(&link->idx_streams_lock);
free(stream);
return;
}
int _builtin_stream_open(void *m, sexp_t *sx)
{
sxmsg_t *msg = (sxmsg_t *)m;
sxchnl_t *channel = msg->pch;
sxlink_t *link = NULL;
sxhub_t *hub;
sexp_t *isx, *iisx;
char *rbuf = NULL, *opt = NULL, *s_opt = NULL;
usrtc_node_t *node;
struct sxstream_description *s_desc;
struct sxstream_opened *strea;
size_t rbuf_len;
int chan_type, idx, r = SXE_FAILED;
int flags = 0, des_type, iidx;
if(!channel) goto __fail;
link = channel->link;
chan_type = channel->type_id;
if(!link || !channel->rpc_list) goto __fail; /* paranoid test */
/* parse incoming builtin function */
SEXP_ITERATE_LIST(sx, isx, idx) {
if(!idx && isx->ty == SEXP_LIST) {
__badproto:
r = SXE_BADPROTO;
goto __fail;
} else if(idx && isx->ty != SEXP_LIST) goto __badproto;
if(isx->ty == SEXP_LIST) {
SEXP_ITERATE_LIST(isx, iisx, iidx) {
if(iisx->ty == SEXP_LIST) goto __badproto;
switch(iidx) {
case 0:
opt = iisx->val;
if(*opt != ':') goto __badproto;
break;
case 1:
if(!strcmp(opt, _SXSTREAMTYPE_PREFIX)) des_type = atoi(iisx->val);
else if(!strcmp(opt, _SXSTREAMFLAGS_PREFIX)) flags = atoi(iisx->val);
else if(!strcmp(opt, _SXSTREAMOPT_PREFIX)) s_opt = iisx->val;
else goto __badproto;
break;
default: goto __badproto; break;
}
}
} else goto __badproto;
}
/* check availability */
hub = link->ssys;
if(!hub->streams) {
__nostream:
r = SXE_NOSUCHSTREAMTYPE;
goto __fail;
}
if(!(node = usrtc_lookup(hub->streams, &des_type))) goto __nostream;
else s_desc = (struct sxstream_description *)usrtc_node_getdata(node);
if(s_desc->pcid != chan_type) { /* check permission by channel */
__eperm:
r = SXE_EPERM;
goto __fail;
}
if(__flag_check(s_desc->flags, flags)) goto __eperm; /* by flags */
/* create a new remote side channel */
if(!(strea = __alloc_stream(link))) {
r = SXE_ENOMEM;
goto __fail;
}
/* rbuf length */
rbuf_len = _SXSTREAMCLOSE_ADDLEN + strlen(_SXSTREAMOPEN_CMD_R);
/* set stream details */
strea->desc = s_desc;
strea->flags = flags;
strea->pin_channelid = channel->cid;
r = s_desc->ops->s_open(link, strea, s_opt);
if(r != SXE_SUCCESS) {
/* deallocate stream */
__free_stream(link, strea);
goto __fail;
}
rbuf = sxmsg_rapidbuf(msg);
rbuf_len = snprintf(rbuf, rbuf_len, "(%s %lu)", _SXSTREAMOPEN_CMD_R,
strea->sto_id);
return sxmsg_rreply(msg, rbuf_len + sizeof(char));
__fail:
return sxmsg_return(msg, r);
}
int _builtin_stream_close(void *m, sexp_t *sx)
{
sxmsg_t *msg = (sxmsg_t *)m;
sxchnl_t *channel = msg->pch;
sxlink_t *link = NULL;
struct sxstream_opened *stream = NULL;
sexp_t *isx;
uint64_t sid = 0;
int r = SXE_FAILED, idx;
if(!channel) goto __fini;
link = channel->link;
if(!link || !channel->rpc_list) goto __fini; /* paranoid test */
/* parse input */
SEXP_ITERATE_LIST(sx, isx, idx) {
if(isx->ty == SEXP_LIST) {
__badproto:
r = SXE_BADPROTO;
goto __fini;
}
switch(idx) {
case 0: r = SXE_BADPROTO; break;
case 1: sid = strtoul(isx->val, NULL, 0); r = SXE_SUCCESS; break;
default: goto __badproto; break;
}
}
if(r != SXE_SUCCESS) goto __fini;
/* get this */
stream = link->streams[(int)sid];
if(stream) {
if(stream->pin_channelid == channel->cid) {
r = SXE_SUCCESS;
r = stream->desc->ops->s_close(stream);
} else r = SXE_EPERM;
} else r = SXE_NOSUCHSTREAMTYPE;
__fini:
return sxmsg_return(msg, r);
}
int _builtin_stream_eread(void *m, sexp_t *sx)
{
sxmsg_t *msg = (sxmsg_t *)m;
sxchnl_t *channel = msg->pch;
sxlink_t *link = NULL;
struct sxstream_opened *stream = NULL;
sexp_t *isx;
list_node_t *iter, *siter;
struct _nn_stream_entry_node *entry;
char *rbuf;
uint64_t sid = 0;
size_t rbuf_len = 0, list_len = 0;
int r = SXE_FAILED, idx, n;
if(!channel) goto __fail;
link = channel->link;
if(!link || !channel->rpc_list) goto __fail; /* paranoid test */
/* parse input */
SEXP_ITERATE_LIST(sx, isx, idx) {
if(isx->ty == SEXP_LIST) {
__badproto:
r = SXE_BADPROTO;
goto __fail;
}
switch(idx) {
case 0: r = SXE_BADPROTO; break;
case 1: sid = strtoul(isx->val, NULL, 0); r = SXE_SUCCESS; break;
default: goto __badproto; break;
}
}
if(r != SXE_SUCCESS) goto __fail;
/* get this */
stream = link->streams[(int)sid];
if(stream) {
if(stream->pin_channelid == channel->cid) {
/* we shouldn't fail here, check for the stream type first */
if((stream->desc->flags & SXE_O_BINARY) || (stream->desc->flags & SXE_O_NAMED)) {
r = SXE_FAILED;
goto __fail;
}
/* last check: permission for read check */
if(!(stream->flags & SXE_O_READ)) {
r = SXE_EPERM;
goto __fail;
} else rbuf = sxmsg_rapidbuf(msg); idx = 0;
/* here we go */
rbuf_len = snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, "(");
while(1) {
if(stream->ent_buf) {
/* first we need to determine if this list
* of values will fit to our rapid buffer */
list_len = 4*sizeof(char);
list_for_each_safe(stream->ent_buf, iter, siter) {
entry = container_of(iter, struct _nn_stream_entry_node, node);
list_len += strlen(entry->value);
if(iter != list_node_last(stream->ent_buf))
list_len += sizeof(char);
}
if((rbuf_len + list_len) >= MAX_RBBUF_LEN) { /* reach the end */
if(!idx) {
r = SXE_TOOLONG;
goto __fail;
} else break; /* buffer full, ready to send */
}
/* write to rapid buffer */
rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, "(");
list_for_each_safe(stream->ent_buf, iter, siter) {
entry = container_of(iter, struct _nn_stream_entry_node, node);
if(iter == list_node_last(stream->ent_buf))
rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, "%s",
entry->value);
else
rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, "%s ",
entry->value);
}
rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, ")");
idx++;
}
/* read next */
n = stream->desc->ops->s_eread(stream, 1, 0, &stream->ent_buf);
if(n <= 0) {
if(!idx) {
r = SXE_EOS;
goto __fail;
} else break;
}
}
rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, ")");
return sxmsg_rreply(msg, rbuf_len + sizeof(char));
} else r = SXE_EPERM;
} else r = SXE_NOSUCHSTREAMTYPE;
__fail:
return sxmsg_return(msg, r);
}