From 350aa0321f79398b5d147f3634ddb65fd1782f58 Mon Sep 17 00:00:00 2001 From: Alexander Vdolainen Date: Tue, 1 Mar 2016 22:43:43 +0200 Subject: [PATCH] [core] Added named entry operations; --- include/sxmp/sxmp.h | 1 + lib/stream.c | 366 +++++++++++++++++++++++++++++++++++++++----- 2 files changed, 331 insertions(+), 36 deletions(-) diff --git a/include/sxmp/sxmp.h b/include/sxmp/sxmp.h index b45738b..ea63a03 100644 --- a/include/sxmp/sxmp.h +++ b/include/sxmp/sxmp.h @@ -185,6 +185,7 @@ struct sxstream_description { uint16_t type; /** < type: 0||SXE_O_NAMED||SXE_O_BINARY */ uint16_t flags; /** < possible flags */ struct sxstream_ops *ops; /** < operations */ + list_head_t *named; /** < named entries order */ usrtc_node_t node; /** < internal node struct */ }; diff --git a/lib/stream.c b/lib/stream.c index 21a1e32..89c30fb 100644 --- a/lib/stream.c +++ b/lib/stream.c @@ -48,6 +48,7 @@ #include "internal.h" static void __free_listentry(struct _stream_list_node *listentry); +static void __free_listentrynamed(struct _stream_list_node *listentry); static int __flag_check(int s_flag, int d_flag) { @@ -59,12 +60,63 @@ static int __flag_check(int s_flag, int d_flag) return 0; } +static char *__stream_namedlist_list2sxstr(list_head_t *named) +{ + list_node_t *iter, *siter; + struct _stream_named_order_node *entry; + char *out = NULL; + size_t out_len = sizeof(char)*4, lc = 0; + + if(!named) return NULL; + + list_for_each_safe(named, iter, siter) { + entry = list_entry(iter, struct _stream_named_order_node, node); + out_len += sizeof(char)*9 + strlen(entry->name); + } + + if(!(out = malloc(out_len))) return NULL; + + lc += snprintf(out + lc, out_len - lc, "("); + list_for_each_safe(named, iter, siter) { + entry = list_entry(iter, struct _stream_named_order_node, node); + lc += snprintf(out + lc, out_len - lc, "(%d %s)", entry->order, entry->name + sizeof(char)); + } + lc += snprintf(out + lc, out_len - lc, ")"); + + return out; +} + +static int __stream_namedlist_sexp2list(list_head_t *named, sexp_t *sx) +{ + sexp_t *isx, *iisx; + char *name = NULL; + int idx, iidx, order = 0; + + SEXP_ITERATE_LIST(sx, isx, idx) { + if(isx->ty != SEXP_LIST) return SXE_BADPROTO; + + SEXP_ITERATE_LIST(isx, iisx, iidx) { + if(iisx->ty == SEXP_LIST) return SXE_BADPROTO; + + switch(iidx) { + case 0: order = atoi(iisx->val); break; + case 1: name = iisx->val; break; + default: return SXE_BADPROTO; + } + } + if(sxstream_generic_nmlist_additem(named, name, order) < 0) return SXE_BADPROTO; + } + + return 0; +} + static sxstream_t *__stream_open(sxlink_t *link, sxchnl_t *channel, const char *opt, struct sxstream_description *desc, int stid, int flags) { char *msgbuf = NULL; size_t mlen = 0; sxstream_t *ss = NULL; + list_head_t *named = NULL; sxmsg_t *msg = NULL; char *buf = NULL; sexp_t *sx, *isx; @@ -105,23 +157,37 @@ static sxstream_t *__stream_open(sxlink_t *link, sxchnl_t *channel, const char * /* time to get the special id, put it to the link and clean up all */ buf = sxmsg_rapidbuf(msg); if(!(sx = parse_sexp(buf, strlen(buf)))) { - errno = SXE_ENOMEM; + __enomem: + r = errno = SXE_ENOMEM; sxmsg_clean(msg); goto __fini; } else sxmsg_clean(msg); + if(desc->type == SXE_O_NAMED) { + named = sxstream_generic_named_alloc(); + if(!named) goto __enomem; + } + SEXP_ITERATE_LIST(sx, isx, idx) { - if(isx->ty == SEXP_LIST) r = SXE_BADPROTO; + if(isx->ty == SEXP_LIST && desc->type != SXE_O_NAMED) r = SXE_BADPROTO; + if(isx->ty == SEXP_LIST && idx < 2) r = SXE_BADPROTO; - if(!idx && strcmp(isx->val, _SXSTREAMOPEN_CMD_R)) r = SXE_BADPROTO; - if(idx && idx < 2) sid = strtoul(isx->val, NULL, 0); - if(idx >= 2) r = SXE_BADPROTO; + switch(idx) { + case 0: if(strcmp(isx->val, _SXSTREAMOPEN_CMD_R)) r = SXE_BADPROTO; break; + case 1: sid = strtoul(isx->val, NULL, 0); break; + case 2: + if(desc->type != SXE_O_NAMED) r = SXE_BADPROTO; + else if(isx->ty == SEXP_LIST) r = __stream_namedlist_sexp2list(named, isx); + else r = SXE_BADPROTO; + break; + default: r = SXE_BADPROTO; break; + } } destroy_sexp(sx); if(!r) { if(!(ss = malloc(sizeof(sxstream_t)))) { - errno = SXE_ENOMEM; + r = errno = SXE_ENOMEM; goto __fini; } @@ -130,9 +196,11 @@ static sxstream_t *__stream_open(sxlink_t *link, sxchnl_t *channel, const char * ss->sid = sid; ss->link = link; ss->channel = channel; + ss->named = named; } else errno = r; __fini: + if(r && named) sxstream_generic_named_free(named); if(msgbuf) free(msgbuf); return ss; @@ -231,7 +299,7 @@ int sxstream_close(sxstream_t *stream) list_node_t *iter, *s_iter; struct _stream_list_node *listentry; size_t mbuf_len = strlen(_SXSTREAMCLOSE_CMD) + _SXSTREAMCLOSE_ADDLEN; - int r = 0; + int r = 0, type = 0; if(!stream) return SXE_FAILED; if(!stream->link || !stream->channel) return SXE_FAILED; @@ -242,16 +310,27 @@ int sxstream_close(sxstream_t *stream) if(r == SXE_RAPIDMSG) sxmsg_clean(msg); if(r == SXE_SUCCESS) { - /* TODO: free all caches, buffers etc */ - if(!(stream->flags & SXE_O_BINARY)) { - if(!(stream->flags & SXE_O_NAMED) && stream->entries) { + if(stream->flags & SXE_O_BINARY) type = SXE_O_BINARY; + else if(stream->flags & SXE_O_NAMED) type = SXE_O_NAMED; + else type = 0; + + switch(type) { + case 0: + case SXE_O_NAMED: + if(stream->entries) { list_for_each_safe(stream->entries, iter, s_iter) { listentry = list_entry(iter, struct _stream_list_node, node); list_del(iter); - __free_listentry(listentry); + if(!type) __free_listentry(listentry); + else __free_listentrynamed(listentry); } } + break; + case SXE_O_BINARY: + /* TODO: free all caches, buffers etc */ + break; } + if(stream->flags & SXE_O_OCHANNELED) r = sxchannel_close(stream->channel); @@ -351,8 +430,96 @@ static void __free_listentry(struct _stream_list_node *listentry) return; } -/* entry nonamed stream ops */ -list_head_t *sxstream_read(sxstream_t *stream) +static void __free_listentrynamed(struct _stream_list_node *listentry) +{ + struct _stream_entry_node *entry; + list_node_t *iter, *s_iter; + + list_for_each_safe(listentry->list, iter, s_iter) { + entry = list_entry(iter, struct _stream_entry_node, node); + list_del(iter); + free(entry->value); /* only value, name is a pointer to named */ + free(entry); + } + + free(listentry->list); + free(listentry); + + return; +} + +/* we have something like that ((:order 1 2) "value1" "value2") */ +static int __stream_namedentry_sexp2list(sexp_t *sx, list_head_t *list, list_head_t *named) +{ + struct _stream_entry_node *entry; + struct _stream_named_order_node *onode; + sexp_t *isx, *iisx; + list_node_t *iter, *siter; + char *name = NULL; + list_head_t olist; + int idx, iidx, order, r = 0; + + SEXP_ITERATE_LIST(sx, isx, idx) { + if(!idx && isx->ty != SEXP_LIST) return SXE_BADPROTO; + + if(!idx) { + list_init_head(&olist); + + SEXP_ITERATE_LIST(isx, iisx, iidx) { + if(iisx->ty == SEXP_LIST) goto __badproto; + switch(iidx) { + case 0: if(strcmp(iisx->val, ":order")) return SXE_BADPROTO; break; + default: + order = atoi(iisx->val); + name = (char *)sxstream_generic_named_lookupname(named, order); + if(!name) goto __badproto; + else if(!(onode = malloc(sizeof(struct _stream_named_order_node)))) goto __enomem; + + list_init_node(&onode->node); + onode->name = name - sizeof(char); + onode->order = order; + + list_add2tail(&olist, &onode->node); + break; + } + } + } else { + if(isx->ty == SEXP_LIST) goto __badproto; + if(idx == 1) iter = list_node_first(&olist); + else { + if(iter == list_head(&olist)) goto __badproto; + iter = iter->next; + } + onode = list_entry(iter, struct _stream_named_order_node, node); + name = onode->name; + if(!(entry = malloc(sizeof(struct _stream_entry_node)))) goto __enomem; + else if(!(entry->value = strdup(isx->val))) { + free(entry); + goto __enomem; + } + + list_init_node(&entry->node); + list_add2tail(list, &entry->node); + } + } + + return 0; + + __badproto: + r = SXE_BADPROTO; + __enomem: + if(!r) r = SXE_ENOMEM; + list_for_each_safe(&olist, iter, siter) { + onode = list_entry(iter, struct _stream_named_order_node, node); + list_del(iter); + free(onode); + } + + return r; +} + +/* entries read ops */ +static list_head_t *__sxstream_entryread(sxstream_t *stream, int _named) { sxmsg_t *msg; size_t mbuf_len; @@ -366,6 +533,7 @@ list_head_t *sxstream_read(sxstream_t *stream) errno = SXE_FAILED; if(!stream) goto __fail; if(!stream->link || !stream->channel) goto __fail; + if(_named < 0 || _named > 1) goto __fail; if(stream->entries) { if(stream->cur == list_node_last(stream->entries)) { @@ -393,7 +561,8 @@ list_head_t *sxstream_read(sxstream_t *stream) list_for_each_safe(stream->entries, iter, s_iter) { listentry = list_entry(iter, struct _stream_list_node, node); list_del(iter); - __free_listentry(listentry); + if(!_named) __free_listentry(listentry); + else __free_listentrynamed(listentry); } } else { /* allocate it first time */ if(!(stream->entries = malloc(sizeof(list_head_t)))) goto __fail; @@ -430,10 +599,19 @@ list_head_t *sxstream_read(sxstream_t *stream) goto __fail; } else list_add2tail(stream->entries, &listentry->node); - SEXP_ITERATE_LIST(isx, iisx, iidx) { - if(iisx->ty == SEXP_LIST) goto __badproto; - if(!(entry = __alloc_entry((const char *)iisx->val))) goto __enomem; - else list_add2tail(listentry->list, &entry->node); + if(!_named) { + SEXP_ITERATE_LIST(isx, iisx, iidx) { + if(iisx->ty == SEXP_LIST) goto __badproto; + if(!(entry = __alloc_entry((const char *)iisx->val))) goto __enomem; + else list_add2tail(listentry->list, &entry->node); + } + } else { /* named stream */ + r = __stream_namedentry_sexp2list(isx, listentry->list, stream->named); + switch(r) { + case SXE_BADPROTO: goto __badproto; break; + case SXE_ENOMEM: goto __enomem; break; + default: r = 0; break; + } } } @@ -472,10 +650,16 @@ list_head_t *sxstream_read(sxstream_t *stream) return NULL; } +/* entry nonamed stream ops */ +list_head_t *sxstream_read(sxstream_t *stream) +{ + return __sxstream_entryread(stream, 0); +} + /* entry named stream ops */ list_head_t *sxstream_readnamed(sxstream_t *stream) { - return NULL; + return __sxstream_entryread(stream, 1); } /* builtin functions for streams */ @@ -525,7 +709,7 @@ int _builtin_stream_open(void *m, sexp_t *sx) sxlink_t *link = NULL; sxhub_t *hub; sexp_t *isx, *iisx; - char *rbuf = NULL, *opt = NULL, *s_opt = NULL; + char *rbuf = NULL, *opt = NULL, *s_opt = NULL, *nmliststr = NULL; usrtc_node_t *node; struct sxstream_description *s_desc; struct sxstream_opened *strea; @@ -591,9 +775,6 @@ int _builtin_stream_open(void *m, sexp_t *sx) goto __fail; } - /* rbuf length */ - rbuf_len = _SXSTREAMCLOSE_ADDLEN + strlen(_SXSTREAMOPEN_CMD_R); - /* set stream details */ strea->desc = s_desc; strea->flags = flags; @@ -605,9 +786,24 @@ int _builtin_stream_open(void *m, sexp_t *sx) __free_stream(link, strea); goto __fail; } + /* rbuf length */ + rbuf_len = _SXSTREAMCLOSE_ADDLEN + strlen(_SXSTREAMOPEN_CMD_R); + if(s_desc->type == SXE_O_NAMED) { + nmliststr = __stream_namedlist_list2sxstr(s_desc->named); + if(!nmliststr) { r = SXE_FAILED; __free_stream(link, strea); goto __fail; } + + rbuf_len += strlen(nmliststr) + sizeof(char)*4; + } + rbuf = sxmsg_rapidbuf(msg); - rbuf_len = snprintf(rbuf, rbuf_len, "(%s %lu)", _SXSTREAMOPEN_CMD_R, - strea->sto_id); + if(s_desc->type != SXE_O_NAMED) + rbuf_len = snprintf(rbuf, rbuf_len, "(%s %lu)", _SXSTREAMOPEN_CMD_R, + strea->sto_id); + else { + rbuf_len = snprintf(rbuf, rbuf_len, "(%s %lu %s)", _SXSTREAMOPEN_CMD_R, + strea->sto_id, nmliststr); + free(nmliststr); + } return sxmsg_rreply(msg, rbuf_len + sizeof(char)); @@ -659,6 +855,66 @@ int _builtin_stream_close(void *m, sexp_t *sx) return sxmsg_return(msg, r); } +/* + * this simple function returns length only, i.e. + * it's going thru the list and lookup for all the + * required stuff. + * also it will check up the values. + */ +static size_t __entry_order_sexplen(list_head_t *list, struct sxstream_description *desc) +{ + size_t len = 4*sizeof(char) + strlen(":order"); + list_node_t *iter, *siter; + struct _stream_entry_node *entry; + int order; + + if(!desc->named) goto __fail; /* avoid segfaults */ + + list_for_each_safe(list, iter, siter) { + entry = container_of(iter, struct _stream_entry_node, node); + if(!entry->name) goto __fail; + else order = sxstream_generic_named_lookuporder(desc->named, entry->name); + + if(order < 0) goto __fail; + else len += 4*sizeof(char); + + if(iter != list_node_last(list)) len += sizeof(char); + } + + __fail: + return 0; +} + +/* + * this function hasn't checks for validity of the given list + * use it with care. + * if __entry_order_sexplen() was success it's safe to call it and + * expect a normal data in the given buffer. + * btw, it will stop if max_len reached to avoid out of the buffer writes. + */ +static size_t __entry_order_tosexpstr(list_head_t *list, struct sxstream_description *desc, + char *buf, size_t max_len) +{ + size_t dw = 0; + list_node_t *iter, *siter; + struct _stream_entry_node *entry; + int order; + + dw += snprintf(buf + dw, max_len - dw, "(:order "); + list_for_each_safe(list, iter, siter) { + entry = container_of(iter, struct _stream_entry_node, node); + order = sxstream_generic_named_lookuporder(desc->named, entry->name); + + if(iter != list_node_last(list)) + dw += snprintf(buf + dw, max_len - dw, "%d ", order); + else dw += snprintf(buf + dw, max_len - dw, "%d", order); + } + + dw += snprintf(buf + dw, max_len - dw, ")"); + + return dw; +} + int _builtin_stream_eread(void *m, sexp_t *sx) { sxmsg_t *msg = (sxmsg_t *)m; @@ -668,7 +924,8 @@ int _builtin_stream_eread(void *m, sexp_t *sx) sexp_t *isx; list_node_t *iter, *siter; struct _nn_stream_entry_node *entry; - char *rbuf; + struct _stream_entry_node *nentry; + char *rbuf, *tentry; uint64_t sid = 0; size_t rbuf_len = 0, list_len = 0; int r = SXE_FAILED, idx, n; @@ -698,7 +955,7 @@ int _builtin_stream_eread(void *m, sexp_t *sx) if(stream) { if(stream->pin_channelid == channel->cid) { /* we shouldn't fail here, check for the stream type first */ - if((stream->desc->flags & SXE_O_BINARY) || (stream->desc->flags & SXE_O_NAMED)) { + if(stream->desc->flags & SXE_O_BINARY) { r = SXE_FAILED; goto __fail; } @@ -714,29 +971,66 @@ int _builtin_stream_eread(void *m, sexp_t *sx) if(stream->ent_buf) { /* first we need to determine if this list * of values will fit to our rapid buffer */ - list_len = 4*sizeof(char); - list_for_each_safe(stream->ent_buf, iter, siter) { - entry = container_of(iter, struct _nn_stream_entry_node, node); - list_len += strlen(entry->value); - if(iter != list_node_last(stream->ent_buf)) - list_len += sizeof(char); + + if(!stream->desc->type) { /* the case for nonamed streams */ + list_len = 4*sizeof(char); + list_for_each_safe(stream->ent_buf, iter, siter) { + entry = container_of(iter, struct _nn_stream_entry_node, node); + list_len += strlen(entry->value); + if(iter != list_node_last(stream->ent_buf)) + list_len += sizeof(char); + } + } else if(stream->desc->type == SXE_O_NAMED) { /* named entry streams */ + /* each entry contains a small list of value order to keep + * space for data i.e. instead of send a value names, + * send order. + */ + list_len = 8*sizeof(char) + __entry_order_sexplen(stream->ent_buf, stream->desc); + if(list_len <= 8*sizeof(char)) { + r = SXE_FAILED; goto __fail; + } + list_for_each_safe(stream->ent_buf, iter, siter) { + nentry = container_of(iter, struct _stream_entry_node, node); + list_len += strlen(nentry->value); + if(iter != list_node_last(stream->ent_buf)) list_len += sizeof(char); + } + } else { + r = SXE_FAILED; goto __fail; } + + /* there are important to check up the buffer capability + * if it's possible to fit entry - we will fit it, otherwise + * leave this entry for the further read operation, and try to + * send what we has already. + */ if((rbuf_len + list_len) >= MAX_RBBUF_LEN) { /* reach the end */ if(!idx) { r = SXE_TOOLONG; goto __fail; } else break; /* buffer full, ready to send */ } + /* write to rapid buffer */ rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, "("); + if(stream->desc->type == SXE_O_NAMED) /* write order list info */ + rbuf_len += __entry_order_tosexpstr(stream->ent_buf, stream->desc, rbuf + rbuf_len, + MAX_RBBUF_LEN - rbuf_len); + + /* write entry data */ list_for_each_safe(stream->ent_buf, iter, siter) { - entry = container_of(iter, struct _nn_stream_entry_node, node); + if(stream->desc->type == SXE_O_NAMED) { + nentry = container_of(iter, struct _stream_entry_node, node); + tentry = nentry->value; + } else { + entry = container_of(iter, struct _nn_stream_entry_node, node); + tentry = entry->value; + } if(iter == list_node_last(stream->ent_buf)) rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, "%s", - entry->value); + tentry); else rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, "%s ", - entry->value); + tentry); } rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, ")"); idx++;