[core] builtin: stream nonamed list read added, minor fixes;

v0.5.xx
Alexander Vdolainen 9 years ago
parent faf342548a
commit cf5399738f

@ -188,7 +188,7 @@ struct sxstream_opened {
struct sxstream_description *desc; struct sxstream_description *desc;
union { union {
void *map_data; void *map_data;
char *ent_buf; list_head_t *ent_buf;
}; };
void *priv; void *priv;
uint64_t off; uint64_t off;
@ -201,7 +201,7 @@ struct sxstream_ops {
int (*s_close)(struct sxstream_opened *); int (*s_close)(struct sxstream_opened *);
union { union {
size_t (*s_bread)(struct sxstream_opened *, size_t, uint64_t, void *); size_t (*s_bread)(struct sxstream_opened *, size_t, uint64_t, void *);
size_t (*s_eread)(struct sxstream_opened *, size_t, uint64_t, char *); size_t (*s_eread)(struct sxstream_opened *, size_t, uint64_t, list_head_t **);
}; };
union { union {
size_t (*s_bwrite)(struct sxstream_opened *, size_t, uint64_t, const void *); size_t (*s_bwrite)(struct sxstream_opened *, size_t, uint64_t, const void *);

@ -24,7 +24,7 @@
#define _SXSTREAMOPEN_ADDMINLEN 18 #define _SXSTREAMOPEN_ADDMINLEN 18
#define _SXSTREAMOPEN_ADDMAXLEN 24 #define _SXSTREAMOPEN_ADDMAXLEN 24
#define _SXSTREAMCLOSE_ADDLEN 12 #define _SXSTREAMCLOSE_ADDLEN 12
#define _SXSTREAMEREAD_ADDLEN 4 #define _SXSTREAMEREAD_ADDLEN 10
#define _SXSTREAMOPEN_CMD "!#s>" #define _SXSTREAMOPEN_CMD "!#s>"
#define _SXSTREAMOPEN_CMD_R "!#s<" #define _SXSTREAMOPEN_CMD_R "!#s<"

@ -404,7 +404,7 @@ list_head_t *sxstream_read(sxstream_t *stream)
mbuf_len = strlen(_SXSTREAMEREAD_CMD) + _SXSTREAMEREAD_ADDLEN; mbuf_len = strlen(_SXSTREAMEREAD_CMD) + _SXSTREAMEREAD_ADDLEN;
if(!(mbuf = malloc(mbuf_len))) goto __fail; if(!(mbuf = malloc(mbuf_len))) goto __fail;
mbuf_len = snprintf(mbuf, mbuf_len, "(%s)", _SXSTREAMEREAD_CMD); mbuf_len = snprintf(mbuf, mbuf_len, "(%s %lu)", _SXSTREAMEREAD_CMD, stream->sid);
r = sxmsg_send(stream->channel, mbuf, mbuf_len, &msg); r = sxmsg_send(stream->channel, mbuf, mbuf_len, &msg);
switch(r) { switch(r) {
case SXE_RAPIDMSG: /* get our data */ case SXE_RAPIDMSG: /* get our data */
@ -661,7 +661,102 @@ int _builtin_stream_close(void *m, sexp_t *sx)
int _builtin_stream_eread(void *m, sexp_t *sx) int _builtin_stream_eread(void *m, sexp_t *sx)
{ {
sxmsg_t *msg = (sxmsg_t *)m; sxmsg_t *msg = (sxmsg_t *)m;
sxchnl_t *channel = msg->pch;
sxlink_t *link = NULL;
struct sxstream_opened *stream = NULL;
sexp_t *isx;
list_node_t *iter, *siter;
struct _nn_stream_entry_node *entry;
char *rbuf;
uint64_t sid = 0;
size_t rbuf_len = 0, list_len = 0;
int r = SXE_FAILED, idx, n;
if(!channel) goto __fail;
link = channel->link;
if(!link || !channel->rpc_list) goto __fail; /* paranoid test */
/* parse input */
SEXP_ITERATE_LIST(sx, isx, idx) {
if(isx->ty == SEXP_LIST) {
__badproto:
r = SXE_BADPROTO;
goto __fail;
}
switch(idx) {
case 0: r = SXE_BADPROTO; break;
case 1: sid = strtoul(isx->val, NULL, 0); r = SXE_SUCCESS; break;
default: goto __badproto; break;
}
}
if(r != SXE_SUCCESS) goto __fail;
return sxmsg_return(msg, SXE_FAILED); /* get this */
stream = link->streams[(int)sid];
if(stream) {
if(stream->pin_channelid == channel->cid) {
/* we shouldn't fail here, check for the stream type first */
if((stream->desc->flags & SXE_O_BINARY) || (stream->desc->flags & SXE_O_NAMED)) {
r = SXE_FAILED;
goto __fail;
}
/* last check: permission for read check */
if(!(stream->flags & SXE_O_READ)) {
r = SXE_EPERM;
goto __fail;
} else rbuf = sxmsg_rapidbuf(msg); idx = 0;
/* here we go */
rbuf_len = snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, "(");
while(1) {
if(stream->ent_buf) {
/* first we need to determine if this list
* of values will fit to our rapid buffer */
list_len = 4*sizeof(char);
list_for_each_safe(stream->ent_buf, iter, siter) {
entry = container_of(iter, struct _nn_stream_entry_node, node);
list_len += strlen(entry->value);
if(iter != list_node_last(stream->ent_buf))
list_len += sizeof(char);
}
if((rbuf_len + list_len) >= MAX_RBBUF_LEN) { /* reach the end */
if(!idx) {
r = SXE_TOOLONG;
goto __fail;
} else break; /* buffer full, ready to send */
}
/* write to rapid buffer */
rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, "(");
list_for_each_safe(stream->ent_buf, iter, siter) {
entry = container_of(iter, struct _nn_stream_entry_node, node);
if(iter == list_node_last(stream->ent_buf))
rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, "%s",
entry->value);
else
rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, "%s ",
entry->value);
}
rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, ")");
idx++;
}
/* read next */
n = stream->desc->ops->s_eread(stream, 1, 0, &stream->ent_buf);
if(n <= 0) {
if(!idx) {
r = SXE_EOS;
goto __fail;
} else break;
}
}
rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, ")");
return sxmsg_rreply(msg, rbuf_len + sizeof(char));
} else r = SXE_EPERM;
} else r = SXE_NOSUCHSTREAMTYPE;
__fail:
return sxmsg_return(msg, r);
} }

Loading…
Cancel
Save