diff --git a/include/sxmp/limits.h b/include/sxmp/limits.h index e532fb0..79f7cba 100644 --- a/include/sxmp/limits.h +++ b/include/sxmp/limits.h @@ -33,9 +33,11 @@ #define MAX_SXMPLTHREADS 8 -#define MAX_MSGINPROCESS 1024 +#define MAX_MSGINPROCESS 1024 +#define MAX_CHANNELSOPENED 512 +#define MAX_STREAMSOPENED 256 -#define MAX_STREAMS_TYPES 32767 +#define MAX_STREAMS_TYPES 32767 #define MAX_LINKS 32768 diff --git a/include/sxmp/sxmp.h b/include/sxmp/sxmp.h index 44a2324..60d5bf7 100644 --- a/include/sxmp/sxmp.h +++ b/include/sxmp/sxmp.h @@ -41,6 +41,7 @@ #include #include +#include #define VERIFY_DEPTH 1 /* FIXME: */ @@ -100,12 +101,13 @@ typedef struct __sxlink_t { pthread_mutex_t write_pending_lock; volatile uint8_t pending_messages; /** < pending message count */ /* stream part */ - pthread_rwlock_t stream_rwlock; - usrtc_t *stream_list; - usrtc_t *remote_streams; + struct sxstream_opened **streams; + idx_allocator_t idx_streams; + pthread_mutex_t idx_streams_lock; + usrtc_t *remote_streams; /** < streams from remote host */ /* Other stuff */ sxmp_proto_version_t cp_version; - pthread_t thrd_poll[8]; + pthread_t thrd_poll[MAX_SXMPLTHREADS]; volatile uint8_t flags; /** < flags of the connection */ volatile uint8_t usecount; /** < use count for the connection link */ usrtc_node_t csnode; /** < node to store the link within list */ @@ -191,11 +193,11 @@ struct sxstream_opened { void *priv; uint64_t off; uint16_t flags; - usrtc_node_t node; + uint16_t pin_channelid; }; struct sxstream_ops { - int (*s_open)(struct sxstream_opened *, const char *); + int (*s_open)(sxlink_t *, struct sxstream_opened *, const char *); int (*s_close)(struct sxstream_opened *); union { size_t (*s_bread)(struct sxstream_opened *, size_t, uint64_t, void *); diff --git a/lib/stream.c b/lib/stream.c index 2e42b8e..20eff30 100644 --- a/lib/stream.c +++ b/lib/stream.c @@ -478,18 +478,184 @@ list_head_t *sxstream_readnamed(sxstream_t *stream) } /* builtin functions for streams */ +static struct sxstream_opened *__alloc_stream(sxlink_t *link) +{ + int idx; + struct sxstream_opened *stream; + + pthread_mutex_lock(&link->idx_streams_lock); + idx = idx_allocate(&link->idx_streams); + pthread_mutex_unlock(&link->idx_streams_lock); + if(idx == IDX_INVAL) return NULL; + else stream = link->streams[idx]; + + if(!link->streams[idx] && !(stream = malloc(sizeof(struct sxstream_opened)))) { + __inval: + pthread_mutex_lock(&link->idx_streams_lock); + idx_free(&link->idx_streams, idx); + pthread_mutex_unlock(&link->idx_streams_lock); + + return NULL; + } else if(link->streams[idx]) goto __inval; + + memset(stream, 0, sizeof(struct sxstream_opened)); + stream->sto_id = idx; + link->streams[idx] = stream; + + return stream; +} + +static void __free_stream(sxlink_t *link, struct sxstream_opened *stream) +{ + pthread_mutex_lock(&link->idx_streams_lock); + idx_free(&link->idx_streams, stream->sto_id); + link->streams[stream->sto_id] = NULL; + pthread_mutex_unlock(&link->idx_streams_lock); + + free(stream); + + return; +} + int _builtin_stream_open(void *m, sexp_t *sx) { sxmsg_t *msg = (sxmsg_t *)m; + sxchnl_t *channel = msg->pch; + sxlink_t *link = NULL; + sxhub_t *hub; + sexp_t *isx, *iisx; + char *rbuf = NULL, *opt = NULL, *s_opt = NULL; + usrtc_node_t *node; + struct sxstream_description *s_desc; + struct sxstream_opened *strea; + size_t rbuf_len; + int chan_type, idx, r = SXE_FAILED; + int flags = 0, des_type, iidx; - return sxmsg_return(msg, SXE_FAILED); + if(!channel) goto __fail; + link = channel->link; + chan_type = channel->type_id; + + if(!link || !channel->rpc_list) goto __fail; /* paranoid test */ + + /* parse incoming builtin function */ + SEXP_ITERATE_LIST(sx, isx, idx) { + if(!idx && isx->ty == SEXP_LIST) { + __badproto: + r = SXE_BADPROTO; + goto __fail; + } else if(idx && isx->ty != SEXP_LIST) goto __badproto; + + if(isx->ty == SEXP_LIST) { + SEXP_ITERATE_LIST(isx, iisx, iidx) { + if(iisx->ty == SEXP_LIST) goto __badproto; + switch(iidx) { + case 0: + opt = iisx->val; + + if(*opt != ':') goto __badproto; + break; + case 1: + if(!strcmp(opt, _SXSTREAMTYPE_PREFIX)) des_type = atoi(iisx->val); + else if(!strcmp(opt, _SXSTREAMFLAGS_PREFIX)) flags = atoi(iisx->val); + else if(!strcmp(opt, _SXSTREAMOPT_PREFIX)) s_opt = iisx->val; + else goto __badproto; + break; + default: goto __badproto; break; + } + } + } else goto __badproto; + } + + /* check availability */ + hub = link->ssys; + if(!hub->streams) { + __nostream: + r = SXE_NOSUCHSTREAMTYPE; + goto __fail; + } + if(!(node = usrtc_lookup(hub->streams, &des_type))) goto __nostream; + else s_desc = (struct sxstream_description *)usrtc_node_getdata(node); + + if(s_desc->pcid != chan_type) { /* check permission by channel */ + __eperm: + r = SXE_EPERM; + goto __fail; + } + if(__flag_check(s_desc->flags, flags)) goto __eperm; /* by flags */ + + /* create a new remote side channel */ + if(!(strea = __alloc_stream(link))) { + r = SXE_ENOMEM; + goto __fail; + } + + /* rbuf length */ + rbuf_len = _SXSTREAMCLOSE_ADDLEN + strlen(_SXSTREAMOPEN_CMD_R); + + /* set stream details */ + strea->desc = s_desc; + strea->flags = flags; + strea->pin_channelid = channel->cid; + + r = s_desc->ops->s_open(link, strea, s_opt); + if(r != SXE_SUCCESS) { + /* deallocate stream */ + __free_stream(link, strea); + goto __fail; + } + rbuf = sxmsg_rapidbuf(msg); + rbuf_len = snprintf(rbuf, rbuf_len, "(%s %lu)", _SXSTREAMOPEN_CMD_R, + strea->sto_id); + + return sxmsg_rreply(msg, rbuf_len + sizeof(char)); + + __fail: + + return sxmsg_return(msg, r); } int _builtin_stream_close(void *m, sexp_t *sx) { sxmsg_t *msg = (sxmsg_t *)m; + sxchnl_t *channel = msg->pch; + sxlink_t *link = NULL; + struct sxstream_opened *stream = NULL; + sexp_t *isx; + uint64_t sid = 0; + int r = SXE_FAILED, idx; - return sxmsg_return(msg, SXE_FAILED); + if(!channel) goto __fini; + link = channel->link; + + if(!link || !channel->rpc_list) goto __fini; /* paranoid test */ + + /* parse input */ + SEXP_ITERATE_LIST(sx, isx, idx) { + if(isx->ty == SEXP_LIST) { + __badproto: + r = SXE_BADPROTO; + goto __fini; + } + switch(idx) { + case 0: r = SXE_BADPROTO; break; + case 1: sid = strtoul(isx->val, NULL, 0); r = SXE_SUCCESS; break; + default: goto __badproto; break; + } + } + if(r != SXE_SUCCESS) goto __fini; + + /* get this */ + stream = link->streams[(int)sid]; + if(stream) { + if(stream->pin_channelid == channel->cid) { + r = SXE_SUCCESS; + r = stream->desc->ops->s_close(stream); + } else r = SXE_EPERM; + } else r = SXE_NOSUCHSTREAMTYPE; + + __fini: + return sxmsg_return(msg, r); } int _builtin_stream_eread(void *m, sexp_t *sx) diff --git a/lib/sxmplv2.c b/lib/sxmplv2.c index 35c0556..79d708d 100644 --- a/lib/sxmplv2.c +++ b/lib/sxmplv2.c @@ -445,17 +445,28 @@ sxlink_t *__link_minimal_alloc(struct in_addr *addr) return NULL; } +static long __cmp_u64(const void *a, const void *b) +{ + return *(uint64_t *)a - *(uint64_t *)b; +} + static int __link_second_alloc(sxlink_t *co) { usrtc_node_init(&co->csnode, co); + /* initialize index allocators */ memset(&co->idx_ch, 0, sizeof(idx_allocator_t)); memset(&co->idx_msg, 0, sizeof(idx_allocator_t)); - if((idx_allocator_init(&co->idx_ch, 512, 0))) goto __fail; + memset(&co->idx_streams, 0, sizeof(idx_allocator_t)); + if((idx_allocator_init(&co->idx_ch, MAX_CHANNELSOPENED, 0))) goto __fail; if((idx_allocator_init(&co->idx_msg, MAX_MSGINPROCESS, 0))) goto __fail; + if((idx_allocator_init(&co->idx_streams, MAX_STREAMSOPENED, 0))) goto __fail; - if(!(co->channels = malloc(sizeof(uintptr_t)*512))) goto __fail; - else memset(co->channels, 0, sizeof(uintptr_t)*512); + if(!(co->channels = malloc(sizeof(uintptr_t)*MAX_CHANNELSOPENED))) goto __fail; + else memset(co->channels, 0, sizeof(uintptr_t)*MAX_CHANNELSOPENED); + + if(!(co->streams = malloc(sizeof(uintptr_t)*MAX_STREAMSOPENED))) goto __fail; + else memset(co->streams, 0, sizeof(uintptr_t)*MAX_STREAMSOPENED); /* init mutexes */ pthread_mutex_init(&co->idx_ch_lock, NULL); @@ -463,6 +474,7 @@ static int __link_second_alloc(sxlink_t *co) pthread_mutex_init(&co->write_pending_lock, NULL); pthread_mutex_init(&co->sslinout[0], NULL); pthread_mutex_init(&co->sslinout[1], NULL); + pthread_mutex_init(&co->idx_streams_lock, NULL); /* init list */ list_init_head(&co->write_pending); @@ -470,22 +482,29 @@ static int __link_second_alloc(sxlink_t *co) return SXE_SUCCESS; __fail: + if(co->channels) free(co->channels); + if(co->streams) free(co->streams); idx_allocator_destroy(&co->idx_msg); idx_allocator_destroy(&co->idx_ch); + idx_allocator_destroy(&co->idx_streams); + return SXE_ENOMEM; } static void __link_second_free(sxlink_t *co) { if(co->channels) free(co->channels); + if(co->streams) free(co->streams); idx_allocator_destroy(&co->idx_msg); idx_allocator_destroy(&co->idx_ch); + idx_allocator_destroy(&co->idx_streams); pthread_mutex_destroy(&co->idx_ch_lock); pthread_mutex_destroy(&co->idx_msg_lock); pthread_mutex_destroy(&co->write_pending_lock); pthread_mutex_destroy(&co->sslinout[0]); pthread_mutex_destroy(&co->sslinout[1]); + pthread_mutex_destroy(&co->idx_streams_lock); return; }