[core] builtin: stream updates, some internal structure updates;

v0.5.xx
Alexander Vdolainen 9 years ago
parent ab242783d8
commit faf342548a

@ -33,9 +33,11 @@
#define MAX_SXMPLTHREADS 8
#define MAX_MSGINPROCESS 1024
#define MAX_MSGINPROCESS 1024
#define MAX_CHANNELSOPENED 512
#define MAX_STREAMSOPENED 256
#define MAX_STREAMS_TYPES 32767
#define MAX_STREAMS_TYPES 32767
#define MAX_LINKS 32768

@ -41,6 +41,7 @@
#include <sxmp/errno.h>
#include <sxmp/version.h>
#include <sxmp/limits.h>
#define VERIFY_DEPTH 1 /* FIXME: */
@ -100,12 +101,13 @@ typedef struct __sxlink_t {
pthread_mutex_t write_pending_lock;
volatile uint8_t pending_messages; /** < pending message count */
/* stream part */
pthread_rwlock_t stream_rwlock;
usrtc_t *stream_list;
usrtc_t *remote_streams;
struct sxstream_opened **streams;
idx_allocator_t idx_streams;
pthread_mutex_t idx_streams_lock;
usrtc_t *remote_streams; /** < streams from remote host */
/* Other stuff */
sxmp_proto_version_t cp_version;
pthread_t thrd_poll[8];
pthread_t thrd_poll[MAX_SXMPLTHREADS];
volatile uint8_t flags; /** < flags of the connection */
volatile uint8_t usecount; /** < use count for the connection link */
usrtc_node_t csnode; /** < node to store the link within list */
@ -191,11 +193,11 @@ struct sxstream_opened {
void *priv;
uint64_t off;
uint16_t flags;
usrtc_node_t node;
uint16_t pin_channelid;
};
struct sxstream_ops {
int (*s_open)(struct sxstream_opened *, const char *);
int (*s_open)(sxlink_t *, struct sxstream_opened *, const char *);
int (*s_close)(struct sxstream_opened *);
union {
size_t (*s_bread)(struct sxstream_opened *, size_t, uint64_t, void *);

@ -478,18 +478,184 @@ list_head_t *sxstream_readnamed(sxstream_t *stream)
}
/* builtin functions for streams */
static struct sxstream_opened *__alloc_stream(sxlink_t *link)
{
int idx;
struct sxstream_opened *stream;
pthread_mutex_lock(&link->idx_streams_lock);
idx = idx_allocate(&link->idx_streams);
pthread_mutex_unlock(&link->idx_streams_lock);
if(idx == IDX_INVAL) return NULL;
else stream = link->streams[idx];
if(!link->streams[idx] && !(stream = malloc(sizeof(struct sxstream_opened)))) {
__inval:
pthread_mutex_lock(&link->idx_streams_lock);
idx_free(&link->idx_streams, idx);
pthread_mutex_unlock(&link->idx_streams_lock);
return NULL;
} else if(link->streams[idx]) goto __inval;
memset(stream, 0, sizeof(struct sxstream_opened));
stream->sto_id = idx;
link->streams[idx] = stream;
return stream;
}
static void __free_stream(sxlink_t *link, struct sxstream_opened *stream)
{
pthread_mutex_lock(&link->idx_streams_lock);
idx_free(&link->idx_streams, stream->sto_id);
link->streams[stream->sto_id] = NULL;
pthread_mutex_unlock(&link->idx_streams_lock);
free(stream);
return;
}
int _builtin_stream_open(void *m, sexp_t *sx)
{
sxmsg_t *msg = (sxmsg_t *)m;
sxchnl_t *channel = msg->pch;
sxlink_t *link = NULL;
sxhub_t *hub;
sexp_t *isx, *iisx;
char *rbuf = NULL, *opt = NULL, *s_opt = NULL;
usrtc_node_t *node;
struct sxstream_description *s_desc;
struct sxstream_opened *strea;
size_t rbuf_len;
int chan_type, idx, r = SXE_FAILED;
int flags = 0, des_type, iidx;
return sxmsg_return(msg, SXE_FAILED);
if(!channel) goto __fail;
link = channel->link;
chan_type = channel->type_id;
if(!link || !channel->rpc_list) goto __fail; /* paranoid test */
/* parse incoming builtin function */
SEXP_ITERATE_LIST(sx, isx, idx) {
if(!idx && isx->ty == SEXP_LIST) {
__badproto:
r = SXE_BADPROTO;
goto __fail;
} else if(idx && isx->ty != SEXP_LIST) goto __badproto;
if(isx->ty == SEXP_LIST) {
SEXP_ITERATE_LIST(isx, iisx, iidx) {
if(iisx->ty == SEXP_LIST) goto __badproto;
switch(iidx) {
case 0:
opt = iisx->val;
if(*opt != ':') goto __badproto;
break;
case 1:
if(!strcmp(opt, _SXSTREAMTYPE_PREFIX)) des_type = atoi(iisx->val);
else if(!strcmp(opt, _SXSTREAMFLAGS_PREFIX)) flags = atoi(iisx->val);
else if(!strcmp(opt, _SXSTREAMOPT_PREFIX)) s_opt = iisx->val;
else goto __badproto;
break;
default: goto __badproto; break;
}
}
} else goto __badproto;
}
/* check availability */
hub = link->ssys;
if(!hub->streams) {
__nostream:
r = SXE_NOSUCHSTREAMTYPE;
goto __fail;
}
if(!(node = usrtc_lookup(hub->streams, &des_type))) goto __nostream;
else s_desc = (struct sxstream_description *)usrtc_node_getdata(node);
if(s_desc->pcid != chan_type) { /* check permission by channel */
__eperm:
r = SXE_EPERM;
goto __fail;
}
if(__flag_check(s_desc->flags, flags)) goto __eperm; /* by flags */
/* create a new remote side channel */
if(!(strea = __alloc_stream(link))) {
r = SXE_ENOMEM;
goto __fail;
}
/* rbuf length */
rbuf_len = _SXSTREAMCLOSE_ADDLEN + strlen(_SXSTREAMOPEN_CMD_R);
/* set stream details */
strea->desc = s_desc;
strea->flags = flags;
strea->pin_channelid = channel->cid;
r = s_desc->ops->s_open(link, strea, s_opt);
if(r != SXE_SUCCESS) {
/* deallocate stream */
__free_stream(link, strea);
goto __fail;
}
rbuf = sxmsg_rapidbuf(msg);
rbuf_len = snprintf(rbuf, rbuf_len, "(%s %lu)", _SXSTREAMOPEN_CMD_R,
strea->sto_id);
return sxmsg_rreply(msg, rbuf_len + sizeof(char));
__fail:
return sxmsg_return(msg, r);
}
int _builtin_stream_close(void *m, sexp_t *sx)
{
sxmsg_t *msg = (sxmsg_t *)m;
sxchnl_t *channel = msg->pch;
sxlink_t *link = NULL;
struct sxstream_opened *stream = NULL;
sexp_t *isx;
uint64_t sid = 0;
int r = SXE_FAILED, idx;
return sxmsg_return(msg, SXE_FAILED);
if(!channel) goto __fini;
link = channel->link;
if(!link || !channel->rpc_list) goto __fini; /* paranoid test */
/* parse input */
SEXP_ITERATE_LIST(sx, isx, idx) {
if(isx->ty == SEXP_LIST) {
__badproto:
r = SXE_BADPROTO;
goto __fini;
}
switch(idx) {
case 0: r = SXE_BADPROTO; break;
case 1: sid = strtoul(isx->val, NULL, 0); r = SXE_SUCCESS; break;
default: goto __badproto; break;
}
}
if(r != SXE_SUCCESS) goto __fini;
/* get this */
stream = link->streams[(int)sid];
if(stream) {
if(stream->pin_channelid == channel->cid) {
r = SXE_SUCCESS;
r = stream->desc->ops->s_close(stream);
} else r = SXE_EPERM;
} else r = SXE_NOSUCHSTREAMTYPE;
__fini:
return sxmsg_return(msg, r);
}
int _builtin_stream_eread(void *m, sexp_t *sx)

@ -445,17 +445,28 @@ sxlink_t *__link_minimal_alloc(struct in_addr *addr)
return NULL;
}
static long __cmp_u64(const void *a, const void *b)
{
return *(uint64_t *)a - *(uint64_t *)b;
}
static int __link_second_alloc(sxlink_t *co)
{
usrtc_node_init(&co->csnode, co);
/* initialize index allocators */
memset(&co->idx_ch, 0, sizeof(idx_allocator_t));
memset(&co->idx_msg, 0, sizeof(idx_allocator_t));
if((idx_allocator_init(&co->idx_ch, 512, 0))) goto __fail;
memset(&co->idx_streams, 0, sizeof(idx_allocator_t));
if((idx_allocator_init(&co->idx_ch, MAX_CHANNELSOPENED, 0))) goto __fail;
if((idx_allocator_init(&co->idx_msg, MAX_MSGINPROCESS, 0))) goto __fail;
if((idx_allocator_init(&co->idx_streams, MAX_STREAMSOPENED, 0))) goto __fail;
if(!(co->channels = malloc(sizeof(uintptr_t)*512))) goto __fail;
else memset(co->channels, 0, sizeof(uintptr_t)*512);
if(!(co->channels = malloc(sizeof(uintptr_t)*MAX_CHANNELSOPENED))) goto __fail;
else memset(co->channels, 0, sizeof(uintptr_t)*MAX_CHANNELSOPENED);
if(!(co->streams = malloc(sizeof(uintptr_t)*MAX_STREAMSOPENED))) goto __fail;
else memset(co->streams, 0, sizeof(uintptr_t)*MAX_STREAMSOPENED);
/* init mutexes */
pthread_mutex_init(&co->idx_ch_lock, NULL);
@ -463,6 +474,7 @@ static int __link_second_alloc(sxlink_t *co)
pthread_mutex_init(&co->write_pending_lock, NULL);
pthread_mutex_init(&co->sslinout[0], NULL);
pthread_mutex_init(&co->sslinout[1], NULL);
pthread_mutex_init(&co->idx_streams_lock, NULL);
/* init list */
list_init_head(&co->write_pending);
@ -470,22 +482,29 @@ static int __link_second_alloc(sxlink_t *co)
return SXE_SUCCESS;
__fail:
if(co->channels) free(co->channels);
if(co->streams) free(co->streams);
idx_allocator_destroy(&co->idx_msg);
idx_allocator_destroy(&co->idx_ch);
idx_allocator_destroy(&co->idx_streams);
return SXE_ENOMEM;
}
static void __link_second_free(sxlink_t *co)
{
if(co->channels) free(co->channels);
if(co->streams) free(co->streams);
idx_allocator_destroy(&co->idx_msg);
idx_allocator_destroy(&co->idx_ch);
idx_allocator_destroy(&co->idx_streams);
pthread_mutex_destroy(&co->idx_ch_lock);
pthread_mutex_destroy(&co->idx_msg_lock);
pthread_mutex_destroy(&co->write_pending_lock);
pthread_mutex_destroy(&co->sslinout[0]);
pthread_mutex_destroy(&co->sslinout[1]);
pthread_mutex_destroy(&co->idx_streams_lock);
return;
}

Loading…
Cancel
Save