diff --git a/include/sxmp/sxmp.h b/include/sxmp/sxmp.h index 60d5bf7..aa07186 100644 --- a/include/sxmp/sxmp.h +++ b/include/sxmp/sxmp.h @@ -188,7 +188,7 @@ struct sxstream_opened { struct sxstream_description *desc; union { void *map_data; - char *ent_buf; + list_head_t *ent_buf; }; void *priv; uint64_t off; @@ -201,7 +201,7 @@ struct sxstream_ops { int (*s_close)(struct sxstream_opened *); union { size_t (*s_bread)(struct sxstream_opened *, size_t, uint64_t, void *); - size_t (*s_eread)(struct sxstream_opened *, size_t, uint64_t, char *); + size_t (*s_eread)(struct sxstream_opened *, size_t, uint64_t, list_head_t **); }; union { size_t (*s_bwrite)(struct sxstream_opened *, size_t, uint64_t, const void *); diff --git a/lib/internal.h b/lib/internal.h index d981b1b..fa2565d 100644 --- a/lib/internal.h +++ b/lib/internal.h @@ -24,7 +24,7 @@ #define _SXSTREAMOPEN_ADDMINLEN 18 #define _SXSTREAMOPEN_ADDMAXLEN 24 #define _SXSTREAMCLOSE_ADDLEN 12 -#define _SXSTREAMEREAD_ADDLEN 4 +#define _SXSTREAMEREAD_ADDLEN 10 #define _SXSTREAMOPEN_CMD "!#s>" #define _SXSTREAMOPEN_CMD_R "!#s<" diff --git a/lib/stream.c b/lib/stream.c index 20eff30..146758a 100644 --- a/lib/stream.c +++ b/lib/stream.c @@ -404,7 +404,7 @@ list_head_t *sxstream_read(sxstream_t *stream) mbuf_len = strlen(_SXSTREAMEREAD_CMD) + _SXSTREAMEREAD_ADDLEN; if(!(mbuf = malloc(mbuf_len))) goto __fail; - mbuf_len = snprintf(mbuf, mbuf_len, "(%s)", _SXSTREAMEREAD_CMD); + mbuf_len = snprintf(mbuf, mbuf_len, "(%s %lu)", _SXSTREAMEREAD_CMD, stream->sid); r = sxmsg_send(stream->channel, mbuf, mbuf_len, &msg); switch(r) { case SXE_RAPIDMSG: /* get our data */ @@ -661,7 +661,102 @@ int _builtin_stream_close(void *m, sexp_t *sx) int _builtin_stream_eread(void *m, sexp_t *sx) { sxmsg_t *msg = (sxmsg_t *)m; + sxchnl_t *channel = msg->pch; + sxlink_t *link = NULL; + struct sxstream_opened *stream = NULL; + sexp_t *isx; + list_node_t *iter, *siter; + struct _nn_stream_entry_node *entry; + char *rbuf; + uint64_t sid = 0; + size_t rbuf_len = 0, list_len = 0; + int r = SXE_FAILED, idx, n; + + if(!channel) goto __fail; + link = channel->link; + + if(!link || !channel->rpc_list) goto __fail; /* paranoid test */ + + /* parse input */ + SEXP_ITERATE_LIST(sx, isx, idx) { + if(isx->ty == SEXP_LIST) { + __badproto: + r = SXE_BADPROTO; + goto __fail; + } + switch(idx) { + case 0: r = SXE_BADPROTO; break; + case 1: sid = strtoul(isx->val, NULL, 0); r = SXE_SUCCESS; break; + default: goto __badproto; break; + } + } + if(r != SXE_SUCCESS) goto __fail; - return sxmsg_return(msg, SXE_FAILED); + /* get this */ + stream = link->streams[(int)sid]; + if(stream) { + if(stream->pin_channelid == channel->cid) { + /* we shouldn't fail here, check for the stream type first */ + if((stream->desc->flags & SXE_O_BINARY) || (stream->desc->flags & SXE_O_NAMED)) { + r = SXE_FAILED; + goto __fail; + } + /* last check: permission for read check */ + if(!(stream->flags & SXE_O_READ)) { + r = SXE_EPERM; + goto __fail; + } else rbuf = sxmsg_rapidbuf(msg); idx = 0; + + /* here we go */ + rbuf_len = snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, "("); + while(1) { + if(stream->ent_buf) { + /* first we need to determine if this list + * of values will fit to our rapid buffer */ + list_len = 4*sizeof(char); + list_for_each_safe(stream->ent_buf, iter, siter) { + entry = container_of(iter, struct _nn_stream_entry_node, node); + list_len += strlen(entry->value); + if(iter != list_node_last(stream->ent_buf)) + list_len += sizeof(char); + } + if((rbuf_len + list_len) >= MAX_RBBUF_LEN) { /* reach the end */ + if(!idx) { + r = SXE_TOOLONG; + goto __fail; + } else break; /* buffer full, ready to send */ + } + /* write to rapid buffer */ + rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, "("); + list_for_each_safe(stream->ent_buf, iter, siter) { + entry = container_of(iter, struct _nn_stream_entry_node, node); + if(iter == list_node_last(stream->ent_buf)) + rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, "%s", + entry->value); + else + rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, "%s ", + entry->value); + } + rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, ")"); + idx++; + } + + /* read next */ + n = stream->desc->ops->s_eread(stream, 1, 0, &stream->ent_buf); + if(n <= 0) { + if(!idx) { + r = SXE_EOS; + goto __fail; + } else break; + } + } + rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, ")"); + + return sxmsg_rreply(msg, rbuf_len + sizeof(char)); + } else r = SXE_EPERM; + } else r = SXE_NOSUCHSTREAMTYPE; + + __fail: + return sxmsg_return(msg, r); }