[core] Added named entry operations;

v0.5.xx
Alexander Vdolainen 9 years ago
parent 868b7f41f6
commit 350aa0321f

@ -185,6 +185,7 @@ struct sxstream_description {
uint16_t type; /** < type: 0||SXE_O_NAMED||SXE_O_BINARY */
uint16_t flags; /** < possible flags */
struct sxstream_ops *ops; /** < operations */
list_head_t *named; /** < named entries order */
usrtc_node_t node; /** < internal node struct */
};

@ -48,6 +48,7 @@
#include "internal.h"
static void __free_listentry(struct _stream_list_node *listentry);
static void __free_listentrynamed(struct _stream_list_node *listentry);
static int __flag_check(int s_flag, int d_flag)
{
@ -59,12 +60,63 @@ static int __flag_check(int s_flag, int d_flag)
return 0;
}
static char *__stream_namedlist_list2sxstr(list_head_t *named)
{
list_node_t *iter, *siter;
struct _stream_named_order_node *entry;
char *out = NULL;
size_t out_len = sizeof(char)*4, lc = 0;
if(!named) return NULL;
list_for_each_safe(named, iter, siter) {
entry = list_entry(iter, struct _stream_named_order_node, node);
out_len += sizeof(char)*9 + strlen(entry->name);
}
if(!(out = malloc(out_len))) return NULL;
lc += snprintf(out + lc, out_len - lc, "(");
list_for_each_safe(named, iter, siter) {
entry = list_entry(iter, struct _stream_named_order_node, node);
lc += snprintf(out + lc, out_len - lc, "(%d %s)", entry->order, entry->name + sizeof(char));
}
lc += snprintf(out + lc, out_len - lc, ")");
return out;
}
static int __stream_namedlist_sexp2list(list_head_t *named, sexp_t *sx)
{
sexp_t *isx, *iisx;
char *name = NULL;
int idx, iidx, order = 0;
SEXP_ITERATE_LIST(sx, isx, idx) {
if(isx->ty != SEXP_LIST) return SXE_BADPROTO;
SEXP_ITERATE_LIST(isx, iisx, iidx) {
if(iisx->ty == SEXP_LIST) return SXE_BADPROTO;
switch(iidx) {
case 0: order = atoi(iisx->val); break;
case 1: name = iisx->val; break;
default: return SXE_BADPROTO;
}
}
if(sxstream_generic_nmlist_additem(named, name, order) < 0) return SXE_BADPROTO;
}
return 0;
}
static sxstream_t *__stream_open(sxlink_t *link, sxchnl_t *channel, const char *opt,
struct sxstream_description *desc, int stid, int flags)
{
char *msgbuf = NULL;
size_t mlen = 0;
sxstream_t *ss = NULL;
list_head_t *named = NULL;
sxmsg_t *msg = NULL;
char *buf = NULL;
sexp_t *sx, *isx;
@ -105,23 +157,37 @@ static sxstream_t *__stream_open(sxlink_t *link, sxchnl_t *channel, const char *
/* time to get the special id, put it to the link and clean up all */
buf = sxmsg_rapidbuf(msg);
if(!(sx = parse_sexp(buf, strlen(buf)))) {
errno = SXE_ENOMEM;
__enomem:
r = errno = SXE_ENOMEM;
sxmsg_clean(msg);
goto __fini;
} else sxmsg_clean(msg);
if(desc->type == SXE_O_NAMED) {
named = sxstream_generic_named_alloc();
if(!named) goto __enomem;
}
SEXP_ITERATE_LIST(sx, isx, idx) {
if(isx->ty == SEXP_LIST) r = SXE_BADPROTO;
if(isx->ty == SEXP_LIST && desc->type != SXE_O_NAMED) r = SXE_BADPROTO;
if(isx->ty == SEXP_LIST && idx < 2) r = SXE_BADPROTO;
if(!idx && strcmp(isx->val, _SXSTREAMOPEN_CMD_R)) r = SXE_BADPROTO;
if(idx && idx < 2) sid = strtoul(isx->val, NULL, 0);
if(idx >= 2) r = SXE_BADPROTO;
switch(idx) {
case 0: if(strcmp(isx->val, _SXSTREAMOPEN_CMD_R)) r = SXE_BADPROTO; break;
case 1: sid = strtoul(isx->val, NULL, 0); break;
case 2:
if(desc->type != SXE_O_NAMED) r = SXE_BADPROTO;
else if(isx->ty == SEXP_LIST) r = __stream_namedlist_sexp2list(named, isx);
else r = SXE_BADPROTO;
break;
default: r = SXE_BADPROTO; break;
}
}
destroy_sexp(sx);
if(!r) {
if(!(ss = malloc(sizeof(sxstream_t)))) {
errno = SXE_ENOMEM;
r = errno = SXE_ENOMEM;
goto __fini;
}
@ -130,9 +196,11 @@ static sxstream_t *__stream_open(sxlink_t *link, sxchnl_t *channel, const char *
ss->sid = sid;
ss->link = link;
ss->channel = channel;
ss->named = named;
} else errno = r;
__fini:
if(r && named) sxstream_generic_named_free(named);
if(msgbuf) free(msgbuf);
return ss;
@ -231,7 +299,7 @@ int sxstream_close(sxstream_t *stream)
list_node_t *iter, *s_iter;
struct _stream_list_node *listentry;
size_t mbuf_len = strlen(_SXSTREAMCLOSE_CMD) + _SXSTREAMCLOSE_ADDLEN;
int r = 0;
int r = 0, type = 0;
if(!stream) return SXE_FAILED;
if(!stream->link || !stream->channel) return SXE_FAILED;
@ -242,16 +310,27 @@ int sxstream_close(sxstream_t *stream)
if(r == SXE_RAPIDMSG) sxmsg_clean(msg);
if(r == SXE_SUCCESS) {
/* TODO: free all caches, buffers etc */
if(!(stream->flags & SXE_O_BINARY)) {
if(!(stream->flags & SXE_O_NAMED) && stream->entries) {
if(stream->flags & SXE_O_BINARY) type = SXE_O_BINARY;
else if(stream->flags & SXE_O_NAMED) type = SXE_O_NAMED;
else type = 0;
switch(type) {
case 0:
case SXE_O_NAMED:
if(stream->entries) {
list_for_each_safe(stream->entries, iter, s_iter) {
listentry = list_entry(iter, struct _stream_list_node, node);
list_del(iter);
__free_listentry(listentry);
if(!type) __free_listentry(listentry);
else __free_listentrynamed(listentry);
}
}
break;
case SXE_O_BINARY:
/* TODO: free all caches, buffers etc */
break;
}
if(stream->flags & SXE_O_OCHANNELED)
r = sxchannel_close(stream->channel);
@ -351,8 +430,96 @@ static void __free_listentry(struct _stream_list_node *listentry)
return;
}
/* entry nonamed stream ops */
list_head_t *sxstream_read(sxstream_t *stream)
static void __free_listentrynamed(struct _stream_list_node *listentry)
{
struct _stream_entry_node *entry;
list_node_t *iter, *s_iter;
list_for_each_safe(listentry->list, iter, s_iter) {
entry = list_entry(iter, struct _stream_entry_node, node);
list_del(iter);
free(entry->value); /* only value, name is a pointer to named */
free(entry);
}
free(listentry->list);
free(listentry);
return;
}
/* we have something like that ((:order 1 2) "value1" "value2") */
static int __stream_namedentry_sexp2list(sexp_t *sx, list_head_t *list, list_head_t *named)
{
struct _stream_entry_node *entry;
struct _stream_named_order_node *onode;
sexp_t *isx, *iisx;
list_node_t *iter, *siter;
char *name = NULL;
list_head_t olist;
int idx, iidx, order, r = 0;
SEXP_ITERATE_LIST(sx, isx, idx) {
if(!idx && isx->ty != SEXP_LIST) return SXE_BADPROTO;
if(!idx) {
list_init_head(&olist);
SEXP_ITERATE_LIST(isx, iisx, iidx) {
if(iisx->ty == SEXP_LIST) goto __badproto;
switch(iidx) {
case 0: if(strcmp(iisx->val, ":order")) return SXE_BADPROTO; break;
default:
order = atoi(iisx->val);
name = (char *)sxstream_generic_named_lookupname(named, order);
if(!name) goto __badproto;
else if(!(onode = malloc(sizeof(struct _stream_named_order_node)))) goto __enomem;
list_init_node(&onode->node);
onode->name = name - sizeof(char);
onode->order = order;
list_add2tail(&olist, &onode->node);
break;
}
}
} else {
if(isx->ty == SEXP_LIST) goto __badproto;
if(idx == 1) iter = list_node_first(&olist);
else {
if(iter == list_head(&olist)) goto __badproto;
iter = iter->next;
}
onode = list_entry(iter, struct _stream_named_order_node, node);
name = onode->name;
if(!(entry = malloc(sizeof(struct _stream_entry_node)))) goto __enomem;
else if(!(entry->value = strdup(isx->val))) {
free(entry);
goto __enomem;
}
list_init_node(&entry->node);
list_add2tail(list, &entry->node);
}
}
return 0;
__badproto:
r = SXE_BADPROTO;
__enomem:
if(!r) r = SXE_ENOMEM;
list_for_each_safe(&olist, iter, siter) {
onode = list_entry(iter, struct _stream_named_order_node, node);
list_del(iter);
free(onode);
}
return r;
}
/* entries read ops */
static list_head_t *__sxstream_entryread(sxstream_t *stream, int _named)
{
sxmsg_t *msg;
size_t mbuf_len;
@ -366,6 +533,7 @@ list_head_t *sxstream_read(sxstream_t *stream)
errno = SXE_FAILED;
if(!stream) goto __fail;
if(!stream->link || !stream->channel) goto __fail;
if(_named < 0 || _named > 1) goto __fail;
if(stream->entries) {
if(stream->cur == list_node_last(stream->entries)) {
@ -393,7 +561,8 @@ list_head_t *sxstream_read(sxstream_t *stream)
list_for_each_safe(stream->entries, iter, s_iter) {
listentry = list_entry(iter, struct _stream_list_node, node);
list_del(iter);
__free_listentry(listentry);
if(!_named) __free_listentry(listentry);
else __free_listentrynamed(listentry);
}
} else { /* allocate it first time */
if(!(stream->entries = malloc(sizeof(list_head_t)))) goto __fail;
@ -430,10 +599,19 @@ list_head_t *sxstream_read(sxstream_t *stream)
goto __fail;
} else list_add2tail(stream->entries, &listentry->node);
SEXP_ITERATE_LIST(isx, iisx, iidx) {
if(iisx->ty == SEXP_LIST) goto __badproto;
if(!(entry = __alloc_entry((const char *)iisx->val))) goto __enomem;
else list_add2tail(listentry->list, &entry->node);
if(!_named) {
SEXP_ITERATE_LIST(isx, iisx, iidx) {
if(iisx->ty == SEXP_LIST) goto __badproto;
if(!(entry = __alloc_entry((const char *)iisx->val))) goto __enomem;
else list_add2tail(listentry->list, &entry->node);
}
} else { /* named stream */
r = __stream_namedentry_sexp2list(isx, listentry->list, stream->named);
switch(r) {
case SXE_BADPROTO: goto __badproto; break;
case SXE_ENOMEM: goto __enomem; break;
default: r = 0; break;
}
}
}
@ -472,10 +650,16 @@ list_head_t *sxstream_read(sxstream_t *stream)
return NULL;
}
/* entry nonamed stream ops */
list_head_t *sxstream_read(sxstream_t *stream)
{
return __sxstream_entryread(stream, 0);
}
/* entry named stream ops */
list_head_t *sxstream_readnamed(sxstream_t *stream)
{
return NULL;
return __sxstream_entryread(stream, 1);
}
/* builtin functions for streams */
@ -525,7 +709,7 @@ int _builtin_stream_open(void *m, sexp_t *sx)
sxlink_t *link = NULL;
sxhub_t *hub;
sexp_t *isx, *iisx;
char *rbuf = NULL, *opt = NULL, *s_opt = NULL;
char *rbuf = NULL, *opt = NULL, *s_opt = NULL, *nmliststr = NULL;
usrtc_node_t *node;
struct sxstream_description *s_desc;
struct sxstream_opened *strea;
@ -591,9 +775,6 @@ int _builtin_stream_open(void *m, sexp_t *sx)
goto __fail;
}
/* rbuf length */
rbuf_len = _SXSTREAMCLOSE_ADDLEN + strlen(_SXSTREAMOPEN_CMD_R);
/* set stream details */
strea->desc = s_desc;
strea->flags = flags;
@ -605,9 +786,24 @@ int _builtin_stream_open(void *m, sexp_t *sx)
__free_stream(link, strea);
goto __fail;
}
/* rbuf length */
rbuf_len = _SXSTREAMCLOSE_ADDLEN + strlen(_SXSTREAMOPEN_CMD_R);
if(s_desc->type == SXE_O_NAMED) {
nmliststr = __stream_namedlist_list2sxstr(s_desc->named);
if(!nmliststr) { r = SXE_FAILED; __free_stream(link, strea); goto __fail; }
rbuf_len += strlen(nmliststr) + sizeof(char)*4;
}
rbuf = sxmsg_rapidbuf(msg);
rbuf_len = snprintf(rbuf, rbuf_len, "(%s %lu)", _SXSTREAMOPEN_CMD_R,
strea->sto_id);
if(s_desc->type != SXE_O_NAMED)
rbuf_len = snprintf(rbuf, rbuf_len, "(%s %lu)", _SXSTREAMOPEN_CMD_R,
strea->sto_id);
else {
rbuf_len = snprintf(rbuf, rbuf_len, "(%s %lu %s)", _SXSTREAMOPEN_CMD_R,
strea->sto_id, nmliststr);
free(nmliststr);
}
return sxmsg_rreply(msg, rbuf_len + sizeof(char));
@ -659,6 +855,66 @@ int _builtin_stream_close(void *m, sexp_t *sx)
return sxmsg_return(msg, r);
}
/*
* this simple function returns length only, i.e.
* it's going thru the list and lookup for all the
* required stuff.
* also it will check up the values.
*/
static size_t __entry_order_sexplen(list_head_t *list, struct sxstream_description *desc)
{
size_t len = 4*sizeof(char) + strlen(":order");
list_node_t *iter, *siter;
struct _stream_entry_node *entry;
int order;
if(!desc->named) goto __fail; /* avoid segfaults */
list_for_each_safe(list, iter, siter) {
entry = container_of(iter, struct _stream_entry_node, node);
if(!entry->name) goto __fail;
else order = sxstream_generic_named_lookuporder(desc->named, entry->name);
if(order < 0) goto __fail;
else len += 4*sizeof(char);
if(iter != list_node_last(list)) len += sizeof(char);
}
__fail:
return 0;
}
/*
* this function hasn't checks for validity of the given list
* use it with care.
* if __entry_order_sexplen() was success it's safe to call it and
* expect a normal data in the given buffer.
* btw, it will stop if max_len reached to avoid out of the buffer writes.
*/
static size_t __entry_order_tosexpstr(list_head_t *list, struct sxstream_description *desc,
char *buf, size_t max_len)
{
size_t dw = 0;
list_node_t *iter, *siter;
struct _stream_entry_node *entry;
int order;
dw += snprintf(buf + dw, max_len - dw, "(:order ");
list_for_each_safe(list, iter, siter) {
entry = container_of(iter, struct _stream_entry_node, node);
order = sxstream_generic_named_lookuporder(desc->named, entry->name);
if(iter != list_node_last(list))
dw += snprintf(buf + dw, max_len - dw, "%d ", order);
else dw += snprintf(buf + dw, max_len - dw, "%d", order);
}
dw += snprintf(buf + dw, max_len - dw, ")");
return dw;
}
int _builtin_stream_eread(void *m, sexp_t *sx)
{
sxmsg_t *msg = (sxmsg_t *)m;
@ -668,7 +924,8 @@ int _builtin_stream_eread(void *m, sexp_t *sx)
sexp_t *isx;
list_node_t *iter, *siter;
struct _nn_stream_entry_node *entry;
char *rbuf;
struct _stream_entry_node *nentry;
char *rbuf, *tentry;
uint64_t sid = 0;
size_t rbuf_len = 0, list_len = 0;
int r = SXE_FAILED, idx, n;
@ -698,7 +955,7 @@ int _builtin_stream_eread(void *m, sexp_t *sx)
if(stream) {
if(stream->pin_channelid == channel->cid) {
/* we shouldn't fail here, check for the stream type first */
if((stream->desc->flags & SXE_O_BINARY) || (stream->desc->flags & SXE_O_NAMED)) {
if(stream->desc->flags & SXE_O_BINARY) {
r = SXE_FAILED;
goto __fail;
}
@ -714,29 +971,66 @@ int _builtin_stream_eread(void *m, sexp_t *sx)
if(stream->ent_buf) {
/* first we need to determine if this list
* of values will fit to our rapid buffer */
list_len = 4*sizeof(char);
list_for_each_safe(stream->ent_buf, iter, siter) {
entry = container_of(iter, struct _nn_stream_entry_node, node);
list_len += strlen(entry->value);
if(iter != list_node_last(stream->ent_buf))
list_len += sizeof(char);
if(!stream->desc->type) { /* the case for nonamed streams */
list_len = 4*sizeof(char);
list_for_each_safe(stream->ent_buf, iter, siter) {
entry = container_of(iter, struct _nn_stream_entry_node, node);
list_len += strlen(entry->value);
if(iter != list_node_last(stream->ent_buf))
list_len += sizeof(char);
}
} else if(stream->desc->type == SXE_O_NAMED) { /* named entry streams */
/* each entry contains a small list of value order to keep
* space for data i.e. instead of send a value names,
* send order.
*/
list_len = 8*sizeof(char) + __entry_order_sexplen(stream->ent_buf, stream->desc);
if(list_len <= 8*sizeof(char)) {
r = SXE_FAILED; goto __fail;
}
list_for_each_safe(stream->ent_buf, iter, siter) {
nentry = container_of(iter, struct _stream_entry_node, node);
list_len += strlen(nentry->value);
if(iter != list_node_last(stream->ent_buf)) list_len += sizeof(char);
}
} else {
r = SXE_FAILED; goto __fail;
}
/* there are important to check up the buffer capability
* if it's possible to fit entry - we will fit it, otherwise
* leave this entry for the further read operation, and try to
* send what we has already.
*/
if((rbuf_len + list_len) >= MAX_RBBUF_LEN) { /* reach the end */
if(!idx) {
r = SXE_TOOLONG;
goto __fail;
} else break; /* buffer full, ready to send */
}
/* write to rapid buffer */
rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, "(");
if(stream->desc->type == SXE_O_NAMED) /* write order list info */
rbuf_len += __entry_order_tosexpstr(stream->ent_buf, stream->desc, rbuf + rbuf_len,
MAX_RBBUF_LEN - rbuf_len);
/* write entry data */
list_for_each_safe(stream->ent_buf, iter, siter) {
entry = container_of(iter, struct _nn_stream_entry_node, node);
if(stream->desc->type == SXE_O_NAMED) {
nentry = container_of(iter, struct _stream_entry_node, node);
tentry = nentry->value;
} else {
entry = container_of(iter, struct _nn_stream_entry_node, node);
tentry = entry->value;
}
if(iter == list_node_last(stream->ent_buf))
rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, "%s",
entry->value);
tentry);
else
rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, "%s ",
entry->value);
tentry);
}
rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, ")");
idx++;

Loading…
Cancel
Save