diff --git a/include/sxmp/limits.h b/include/sxmp/limits.h index 35c2859..639c3b0 100644 --- a/include/sxmp/limits.h +++ b/include/sxmp/limits.h @@ -33,6 +33,8 @@ #define MAX_SXMPLTHREADS 8 +#define MAX_STREAMS_TYPES 32767 + #define MAX_LINKS 32768 #endif /* __SXMP_LIMITS_H__ */ diff --git a/include/sxmp/sxmp.h b/include/sxmp/sxmp.h index 2ca3519..54a7948 100644 --- a/include/sxmp/sxmp.h +++ b/include/sxmp/sxmp.h @@ -99,7 +99,12 @@ typedef struct __sxlink_t { list_head_t write_pending; /** < list of messages waiting for write */ pthread_mutex_t write_pending_lock; volatile uint8_t pending_messages; /** < pending message count */ + /* stream part */ + pthread_rwlock_t stream_rwlock; + usrtc_t *stream_list; + usrtc_t *remote_streams; /* Other stuff */ + sxmp_proto_version_t cp_version; pthread_t thrd_poll[8]; volatile uint8_t flags; /** < flags of the connection */ volatile uint8_t usecount; /** < use count for the connection link */ @@ -164,6 +169,43 @@ typedef struct __link_rpc_list_type { char *opt_version; /** < reserved for future implementations */ } sxl_rpclist_t; +struct sxstream_ops; + +struct sxstream_description { + uint16_t stid; + uint16_t pcid; + uint16_t type; + uint16_t flags; + struct sxstream_ops *ops; + usrtc_node_t node; +}; + +struct sxstream_opened { + uint64_t sto_id; + struct sxstream_description *desc; + union { + void *map_data; + char *ent_buf; + }; + void *priv; + uint64_t off; + uint16_t flags; + usrtc_node_t node; +}; + +struct sxstream_ops { + int (*s_open)(struct sxstream_opened *, const char *); + int (*s_close)(struct sxstream_opened *); + union { + size_t (*s_bread)(struct sxstream_opened *, size_t, uint64_t, void *); + size_t (*s_eread)(struct sxstream_opened *, size_t, uint64_t, char *); + }; + union { + size_t (*s_bwrite)(struct sxstream_opened *, size_t, uint64_t, const void *); + size_t (*s_ewrite)(struct sxstream_opened *, uint64_t, list_head_t *); + }; +}; + /** * \brief Hub subsystem structure. * @@ -174,6 +216,7 @@ typedef struct __link_rpc_list_type { */ typedef struct __sxhub_type { usrtc_t *links; + usrtc_t *streams; pthread_rwlock_t rwlock; char *rootca, *certpem, *certkey; /* path name to the certificates */ sxl_rpclist_t *system_rpc; @@ -206,6 +249,55 @@ typedef struct __rpc_typed_list_type { usrtc_node_t lnode; } rpc_typed_list_t; +/* streams */ + +/* flags for streams */ +#define SXE_O_BINARY (1 << 1) +#define SXE_O_READ (1 << 2) +#define SXE_O_WRITE (1 << 3) +#define SXE_O_ASYNC (1 << 4) +#define SXE_O_TRUNC (1 << 5) +#define SXE_O_NAMED (1 << 6) +#define SXE_O_OCHANNELED (1 << 7) + +/* sxstream_t used to access stream i.e. this is something returned on + * open operations. + * you should care about closing it, because this ones doesn't tracked. + */ +typedef struct __sxtream_type { + sxlink_t *link; + sxchnl_t *channel; + int flags; + uint64_t sid; + union { + sexp_t *sx; + void *ebuf; + }; + /* for reading */ + union { + struct { + void *data; + uintptr_t cur_offset; + }; + struct { + list_head_t *entries; + list_node_t *cur; + }; + }; + list_head_t *named; +} sxstream_t; + +struct _nn_steam_entry_node { + char *value; + list_node_t node; +}; + +struct _steam_entry_node { + char *name; + char *value; + list_node_t node; +}; + #ifdef __cplusplus extern "C" { #endif @@ -254,6 +346,37 @@ int sxmsg_return(sxmsg_t *msg, int opcode); int sxmsg_return_pp(sxmsg_t *msg, int opcode); void sxmsg_clean(sxmsg_t *msg); +/* streams */ + /* + * NOTE: sxstream operations (except opening process i.e. few threads + * can open different streams) are NOT _thread-safe_, please + * care about that in your own application. + */ + sxstream_t *sxstream_open(sxlink_t *link, const char *opt, int stid, int flags); + sxstream_t *sxstream_openwch(sxlink_t *link, sxchnl_t *channel, const char *opt, int stid, int flags); + int sxstream_close(sxstream_t *stream); + + /* binary stream operations */ + size_t sxstream_bread(sxstream_t *stream, void *buf, size_t buf_len); + size_t sxstream_breadat(sxstream_t *stream, void *buf, size_t buf_len, off_t off); + size_t sxstream_bwriteat(sxstream_t *stream, const void *buf, size_t buf_len, off_t off); + size_t sxstream_bsyncat(sxstream_t *stream, void *buf, size_t buf_len, off_t off); + + /* + * NOTE: returning value for reading from entry streams + * shouldn't be managed, it's done by close operation. + * That means you should care about ridden values, since + * it might be freed on the next read operation. + */ + /* entry nonamed stream ops */ + list_head_t *sxstream_read(sxstream_t *stream); + + /* entry named stream ops */ + list_head_t *sxstream_readnamed(sxstream_t *stream); + + /* provider-side functions for streams */ + int sxhub_stream_register(sxhub_t *hub, const struct sxstream_description *s_desc); + #ifdef __cplusplus } #endif diff --git a/include/sxmp/version.h b/include/sxmp/version.h index a37619a..f1147e0 100644 --- a/include/sxmp/version.h +++ b/include/sxmp/version.h @@ -30,5 +30,11 @@ #define SXMP_MAJOR 4 #define SXMP_MINOR 2 +typedef enum { + V2 = 0, + V2_1, + V_UNKNOWN = 200, +} sxmp_proto_version_t; + #endif /* __SXMP_VERSION_H__ */ diff --git a/lib/link.c b/lib/link.c index b796c7e..3df9647 100644 --- a/lib/link.c +++ b/lib/link.c @@ -137,7 +137,10 @@ static int __get_channels_list(void *cctx, sexp_t *sx) size_t maxlen = 65535 - sizeof(sxmplv2_head_t); size_t ulen = 0; - /* we will avoid S-exp scanning here */ + /* determine how this function was called */ + if(!strcmp(sx->list->val, "get-channels-list")) + co->cp_version = V2; + else co->cp_version = V2_1; /* last supported in this version */ /* call the function */ if(ssys->get_rpc_typed_list_tree) @@ -154,12 +157,235 @@ static int __get_channels_list(void *cctx, sexp_t *sx) ulen += snprintf(buf + ulen, maxlen - ulen, ")"); msg->mhead.payload_length = ulen + 1; - /* we're ready for messaging mode */ - co->flags |= SXMP_MESSAGINGMODE; + if(co->cp_version == V2) { + /* we're ready for messaging mode */ + co->flags |= SXMP_MESSAGINGMODE; + } return SXE_SUCCESS; } +static int __get_streams(void *cctx, sexp_t *sx) +{ + sxlink_t *link = (sxlink_t *)cctx; + sxhub_t *hub = link->ssys; + sxmsg_t *msg = link->messages[0]; + char *buf = msg->payload; + usrtc_node_t *node, *rpc_node; + struct sxstream_description *s_desc; + size_t maxlen = 65535 - sizeof(sxmplv2_head_t); + size_t ulen = 0, sts = 0; + int tcid = 0; + + if(link->cp_version == V_UNKNOWN) /* oops, version doesn't a happy one */ + return SXE_FAILED; + + if(!hub->streams) { /* no streams provided */ + __return_nil: + ulen = snprintf(buf, maxlen, "(!@s< nil)"); + msg->mhead.payload_length = ulen + 1; + /* and now we're ready to exit from batch mode */ + link->flags |= SXMP_MESSAGINGMODE; + + return SXE_SUCCESS; + } else { /* set all streams available */ + ulen += snprintf(buf + ulen, maxlen - ulen, "(!@s< "); + + for(node = usrtc_first(hub->streams); node != NULL; node = usrtc_next(hub->streams, node)) { + s_desc = (struct sxstream_description *)usrtc_node_getdata(node); + tcid = s_desc->pcid; + if((rpc_node = usrtc_lookup(link->rpc_list, &tcid))) { /* channel allowed */ + /* ids */ + ulen += snprintf(buf + ulen, maxlen - ulen, "(:tcid %d :stid %d :t ", s_desc->pcid, + s_desc->stid); + /* type */ + switch(s_desc->type) { + case 0: + ulen += snprintf(buf + ulen, maxlen - ulen, "e "); + break; + case SXE_O_NAMED: + ulen += snprintf(buf + ulen, maxlen - ulen, "n "); + break; + case SXE_O_BINARY: + ulen += snprintf(buf + ulen, maxlen - ulen, "b "); + break; + } + /* flags */ + ulen += snprintf(buf + ulen, maxlen - ulen, ":a "); + if(s_desc->flags & SXE_O_READ) + ulen += snprintf(buf + ulen, maxlen - ulen, "r"); + else ulen += snprintf(buf + ulen, maxlen - ulen, "-"); + if(s_desc->flags & SXE_O_WRITE) + ulen += snprintf(buf + ulen, maxlen - ulen, "w"); + else ulen += snprintf(buf + ulen, maxlen - ulen, "-"); + if(s_desc->flags & SXE_O_TRUNC) + ulen += snprintf(buf + ulen, maxlen - ulen, "t"); + else ulen += snprintf(buf + ulen, maxlen - ulen, "-"); + if(s_desc->flags & SXE_O_ASYNC) + ulen += snprintf(buf + ulen, maxlen - ulen, "a"); + else ulen += snprintf(buf + ulen, maxlen - ulen, "-"); + + ulen += snprintf(buf + ulen, maxlen - ulen, ")"); + + sts++; + } + } + + if(!sts) goto __return_nil; + + ulen += snprintf(buf + ulen, maxlen - ulen, ")"); + msg->mhead.payload_length = ulen + 1; + } + + link->flags |= SXMP_MESSAGINGMODE; + + return SXE_SUCCESS; +} + +static long __cmp_uint16(const void *a, const void *b) +{ + return (long) *(uint16_t*)a - *(uint16_t*)b; +} + +#define _MOD_STID 0xa +#define _MOD_TCID 0xb +#define _MOD_TYPE 0xc +#define _MOD_FLAGS 0xd +#define _MOD_UNKWN 0x0 + +static int __set_streams(void *cctx, sexp_t *sx) +{ + register int idx, iidx; + usrtc_t *tree = NULL; + sxlink_t *link = (sxlink_t *)cctx; + sexp_t *isx, *iisx; + struct sxstream_description *s_desc; + usrtc_node_t *node = NULL; + int modp = 0, r, value_mod = 0, mod = _MOD_UNKWN; + int flags, type, stid, pcid; + + if(!link->remote_streams) { + tree = malloc(sizeof(usrtc_t)); + if(!tree) return SXE_ENOMEM; + usrtc_init(tree, USRTC_SPLAY, MAX_STREAMS_TYPES, __cmp_uint16); + link->remote_streams = tree; + } + + SEXP_ITERATE_LIST(sx, isx, idx) { + if(!idx) continue; + + if(isx->ty != SEXP_LIST) { + free(tree); + link->remote_streams = NULL; + if(!strcmp(isx->val, "nil")) goto __fini; + else return SXE_BADPROTO; + } else { + SEXP_ITERATE_LIST(isx, iisx, iidx) { + if(iisx->ty == SEXP_LIST) { + r = SXE_BADPROTO; + __clean_up_on_error: + /* clean up all stuff */ + for(node = usrtc_first(tree); node != NULL; node = usrtc_first(tree)) { + s_desc = (struct sxstream_description *)usrtc_node_getdata(node); + usrtc_delete(tree, node); + free(s_desc); + } + free(tree); + link->remote_streams = NULL; + + return r; + } + + if(iisx->val[0] == ':' && value_mod) { + r = SXE_BADPROTO; + goto __clean_up_on_error; + } else if(iisx->val[0] == ':' && !value_mod) { + value_mod = 1; + if(!strcmp(iisx->val, ":stid")) mod = _MOD_STID; + else if(!strcmp(iisx->val, ":tcid")) mod = _MOD_TCID; + else if(!strcmp(iisx->val, ":t")) mod = _MOD_TYPE; + else if(!strcmp(iisx->val, ":a")) mod = _MOD_FLAGS; + else { + r = SXE_BADPROTO; + goto __clean_up_on_error; + } + } else if(iisx->val[0] != ':' && value_mod) { + switch(mod) { + case _MOD_STID: + stid = atoi(iisx->val); + modp++; + break; + case _MOD_TCID: + pcid = atoi(iisx->val); + modp++; + break; + case _MOD_TYPE: + switch(iisx->val[0]) { + case 'e': type = 0; break; + case 'b': type = SXE_O_BINARY; break; + case 'n': type = SXE_O_NAMED; break; + default: + r = SXE_BADPROTO; + goto __clean_up_on_error; + } + modp++; + break; + case _MOD_FLAGS: + flags = 0; + if(iisx->val[0] == 'r') flags |= SXE_O_READ; + else if(iisx->val[1] == 'w') flags |= SXE_O_WRITE; + else if(iisx->val[2] == 't') flags |= SXE_O_TRUNC; + else if(iisx->val[3] == 'a') flags |= SXE_O_ASYNC; + else { + r = SXE_BADPROTO; + goto __clean_up_on_error; + } + modp++; + break; + } + value_mod = 0; + } else { + r = SXE_BADPROTO; + goto __clean_up_on_error; + } + } + /* here we go */ + if(modp < 4) { + r = SXE_BADPROTO; + goto __clean_up_on_error; + } else if(!(s_desc = malloc(sizeof(struct sxstream_description)))) { + r = SXE_ENOMEM; + goto __clean_up_on_error; + } + + usrtc_node_init(&s_desc->node, s_desc); + s_desc->ops = NULL; + + s_desc->flags = (uint16_t)flags; + s_desc->type = (uint16_t)type; + s_desc->pcid = (uint16_t)pcid; + s_desc->stid = (uint16_t)stid; + + usrtc_insert(tree, &s_desc->node, &s_desc->stid); + + /* finish */ + modp = 0; + } + } + + __fini: + link->flags |= SXMP_MESSAGINGMODE; + link->flags &= ~SXMP_BATCHMODE; + + return SXE_SUCCESS; +} + +#undef _MOD_STID +#undef _MOD_TCID +#undef _MOD_TYPE +#undef _MOD_FLAGS +#undef _MOD_UNKWN + static int __set_channels_list(void *cctx, sexp_t *sx) { register int idx; @@ -193,9 +419,45 @@ static int __set_channels_list(void *cctx, sexp_t *sx) } } - /* we're ready for messaging mode */ - co->flags |= SXMP_MESSAGINGMODE; - co->flags &= ~SXMP_BATCHMODE; + if(co->cp_version == V2) { + /* we're ready for messaging mode */ + co->flags |= SXMP_MESSAGINGMODE; + co->flags &= ~SXMP_BATCHMODE; + } + + return SXE_SUCCESS; +} + +static int __my_version_ack(void *cctx, sexp_t *sx) +{ + register int idx; + sxlink_t *link = (sxlink_t *)cctx; + sexp_t *isx; + + SEXP_ITERATE_LIST(sx, isx, idx) { + if(!idx) continue; + if(idx > 2) return SXE_BADPROTO; + if(isx->ty == SEXP_LIST) return SXE_BADPROTO; + if(!strcmp(isx->val, "v2_1")) link->cp_version = V2_1; + else link->cp_version = V_UNKNOWN; + } + + return SXE_SUCCESS; +} + +static int __my_version_set(void *cctx, sexp_t *sx) +{ + register int idx; + sxlink_t *link = (sxlink_t *)cctx; + sexp_t *isx; + + SEXP_ITERATE_LIST(sx, isx, idx) { + if(!idx) continue; + if(idx > 2) return SXE_BADPROTO; + if(isx->ty == SEXP_LIST) return SXE_BADPROTO; + if(!strcmp(isx->val, "v2_1")) link->cp_version = V2_1; + else return SXE_FAILED; /* failed to set another version */ + } return SXE_SUCCESS; } @@ -204,9 +466,21 @@ static int __init_systemrpc_tree(usrtc_t *rtree) { /* batch mode negotiation context functions */ if(__insert_rpc_function(rtree, "auth-set-credentials", __set_credentials)) goto __fail; - if(__insert_rpc_function(rtree, "get-channels-list", __get_channels_list)) goto __fail; + if(__insert_rpc_function(rtree, "get-channels-list", __get_channels_list)) goto __fail; /* old V2 (v1 also) */ if(__insert_rpc_function(rtree, "set-channels-list", __set_channels_list)) goto __fail; + /* sync functions */ + /* channels */ + if(__insert_rpc_function(rtree, "!@c>", __get_channels_list)) goto __fail; + if(__insert_rpc_function(rtree, "!@c<", __set_channels_list)) goto __fail; + /* version */ + if(__insert_rpc_function(rtree, "!@v>", __my_version_ack)) goto __fail; + if(__insert_rpc_function(rtree, "!@v<", __my_version_ack)) goto __fail; + if(__insert_rpc_function(rtree, "!@V>", __my_version_set)) goto __fail; + /* streams */ + if(__insert_rpc_function(rtree, "!@s>", __get_streams)) goto __fail; + if(__insert_rpc_function(rtree, "!@s<", __set_streams)) goto __fail; + return 0; __fail: @@ -264,9 +538,19 @@ int sxhub_init(sxhub_t *ssys) int sxhub_free(sxhub_t *ssys) { + usrtc_node_t *node; + __destroy_rpc_list_tree(ssys->system_rpc->rpc_tree); free(ssys->system_rpc->rpc_tree); free(ssys->system_rpc); + + /* free streams description tree */ + if(ssys->streams) { + for(node = usrtc_first(ssys->streams); node != NULL; node = usrtc_first(ssys->streams)) + usrtc_delete(ssys->streams, node); + free(ssys->streams); + } + free(ssys->links); pthread_rwlock_destroy(&(ssys->rwlock)); SSL_CTX_free(ssys->ctx); @@ -323,6 +607,31 @@ int sxhub_setsslserts(sxhub_t *ssys, const char *rootca, return r; } +int sxhub_stream_register(sxhub_t *hub, const struct sxstream_description *s_desc) +{ + usrtc_t *tree = NULL; + usrtc_node_t *node = NULL; + + if(!hub->streams) { + tree = malloc(sizeof(usrtc_t)); + if(!tree) return ENOMEM; + usrtc_init(tree, USRTC_SPLAY, MAX_STREAMS_TYPES, __cmp_uint16); + hub->streams = tree; + } else { + tree = hub->streams; + node = usrtc_lookup(tree, &s_desc->stid); + if(node) return EEXIST; + } + + node = (usrtc_node_t *)&s_desc->node; + usrtc_node_init(node, (void *)s_desc); + usrtc_insert(tree, node, &s_desc->stid); + + return 0; +} + +/* errors */ + struct __scerrcode { int code; const char *desc; diff --git a/lib/sxmplv2.c b/lib/sxmplv2.c index 5e53d8a..8423d98 100644 --- a/lib/sxmplv2.c +++ b/lib/sxmplv2.c @@ -516,6 +516,8 @@ static int __eval_syssexp(sxlink_t *co, sexp_t *sx) rpcf = sx->list->val; else return SXE_BADPROTO; + /* TODO: add builtin functions processing */ + /* find an appropriate function */ node = usrtc_lookup(rpc_list->rpc_tree, rpcf);