/* * Secure X Message Passing Library v2 implementation. * (sxmplv2) it superseed all versions before due to the: * - memory consumption * - new features such as pulse emitting * - performance optimization * * (c) Askele Group 2013-2015 * (c) Alexander Vdolainen 2013-2015,2016 * * libsxmp is free software: you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as published * by the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libsxmp is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * See the GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with this program. If not, see ."; * */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "internal.h" static void __free_listentry(struct _stream_list_node *listentry); static void __free_listentrynamed(struct _stream_list_node *listentry); static int __flag_check(int s_flag, int d_flag) { int i; for(i = 1; i < 6; i++) if((d_flag & (1 << i)) && !(s_flag & (1 << i))) return 1; return 0; } static char *__stream_namedlist_list2sxstr(list_head_t *named) { list_node_t *iter, *siter; struct _stream_named_order_node *entry; char *out = NULL; size_t out_len = sizeof(char)*4, lc = 0; if(!named) return NULL; list_for_each_safe(named, iter, siter) { entry = list_entry(iter, struct _stream_named_order_node, node); out_len += sizeof(char)*9 + strlen(entry->name); } if(!(out = malloc(out_len))) return NULL; lc += snprintf(out + lc, out_len - lc, "("); list_for_each_safe(named, iter, siter) { entry = list_entry(iter, struct _stream_named_order_node, node); lc += snprintf(out + lc, out_len - lc, "(%d %s)", entry->order, entry->name + sizeof(char)); } lc += snprintf(out + lc, out_len - lc, ")"); return out; } static int __stream_namedlist_sexp2list(list_head_t *named, sexp_t *sx) { sexp_t *isx, *iisx; char *name = NULL; int idx, iidx, order = 0; SEXP_ITERATE_LIST(sx, isx, idx) { if(isx->ty != SEXP_LIST) return SXE_BADPROTO; SEXP_ITERATE_LIST(isx, iisx, iidx) { if(iisx->ty == SEXP_LIST) return SXE_BADPROTO; switch(iidx) { case 0: order = atoi(iisx->val); break; case 1: name = iisx->val; break; default: return SXE_BADPROTO; } } if(sxstream_generic_nmlist_additem(named, name, order) < 0) return SXE_BADPROTO; } return 0; } static sxstream_t *__stream_open(sxlink_t *link, sxchnl_t *channel, const char *opt, struct sxstream_description *desc, int stid, int flags) { char *msgbuf = NULL; size_t mlen = 0; sxstream_t *ss = NULL; list_head_t *named = NULL; sxmsg_t *msg = NULL; char *buf = NULL; sexp_t *sx, *isx; uint64_t sid = 0; int r = 0, idx; /* check flags */ if(__flag_check(desc->flags, flags)) { errno = SXE_EPERM; goto __fini; } /* time to alloc all stuff */ mlen = strlen(_SXSTREAMOPEN_CMD) + strlen(_SXSTREAMTYPE_PREFIX) + strlen(_SXSTREAMFLAGS_PREFIX); if(!opt) mlen += (_SXSTREAMOPEN_ADDMINLEN + sizeof(char)); else mlen += (_SXSTREAMOPEN_ADDMAXLEN + sizeof(char)) + strlen(opt); if(!(msgbuf = malloc(mlen*sizeof(char)))) { errno = SXE_ENOMEM; goto __fini; } if(!opt) mlen = snprintf(msgbuf, mlen, "(%s (%s %d)(%s %d))", _SXSTREAMOPEN_CMD, _SXSTREAMTYPE_PREFIX, stid, _SXSTREAMFLAGS_PREFIX, flags); else mlen = snprintf(msgbuf, mlen, "(%s (%s %d)(%s %d)(%s \"%s\"))", _SXSTREAMOPEN_CMD, _SXSTREAMTYPE_PREFIX, stid, _SXSTREAMFLAGS_PREFIX, flags, _SXSTREAMOPT_PREFIX, opt); r = sxmsg_send(channel, msgbuf, mlen + sizeof(char), &msg); if(r != SXE_RAPIDMSG) { errno = r; goto __fini; } else r = 0; /* time to get the special id, put it to the link and clean up all */ buf = sxmsg_rapidbuf(msg); if(!(sx = parse_sexp(buf, strlen(buf)))) { __enomem: r = errno = SXE_ENOMEM; sxmsg_clean(msg); goto __fini; } else sxmsg_clean(msg); if(desc->type == SXE_O_NAMED) { named = sxstream_generic_named_alloc(); if(!named) goto __enomem; } SEXP_ITERATE_LIST(sx, isx, idx) { if(isx->ty == SEXP_LIST && desc->type != SXE_O_NAMED) r = SXE_BADPROTO; if(isx->ty == SEXP_LIST && idx < 2) r = SXE_BADPROTO; switch(idx) { case 0: if(strcmp(isx->val, _SXSTREAMOPEN_CMD_R)) r = SXE_BADPROTO; break; case 1: sid = strtoul(isx->val, NULL, 0); break; case 2: if(desc->type != SXE_O_NAMED) r = SXE_BADPROTO; else if(isx->ty == SEXP_LIST) r = __stream_namedlist_sexp2list(named, isx); else r = SXE_BADPROTO; break; default: r = SXE_BADPROTO; break; } } destroy_sexp(sx); if(!r) { if(!(ss = malloc(sizeof(sxstream_t)))) { r = errno = SXE_ENOMEM; goto __fini; } memset(ss, 0, sizeof(sxstream_t)); ss->flags = flags; ss->sid = sid; ss->link = link; ss->channel = channel; ss->named = named; } else errno = r; __fini: if(r && named) sxstream_generic_named_free(named); if(msgbuf) free(msgbuf); return ss; } static struct sxstream_description *__get_stream_description(sxlink_t *link, int stid) { usrtc_node_t *node = NULL; struct sxstream_description *s_desc = NULL; if(!link->remote_streams) return NULL; else node = usrtc_lookup(link->remote_streams, &stid); if(node) s_desc = (struct sxstream_description *)usrtc_node_getdata(node); return s_desc; } /* * NOTE: sxstream operations (except opening process i.e. few threads * can open different streams) are NOT _thread-safe_, please * care about that in your own application. */ sxstream_t *sxstream_open(sxlink_t *link, const char *opt, int stid, int flags) { sxchnl_t *channel = NULL; struct sxstream_description *s_desc; sxstream_t *ss = NULL; int r = 0; if(!link) { r = SXE_FAILED; goto __fini; } if(!link->remote_streams) { r = SXE_NOSUCHSTREAMTYPE; goto __fini; } /* ok, let's a deal with channel */ if(!(s_desc = __get_stream_description(link, stid))) { r = SXE_NOSUCHSTREAMTYPE; goto __fini; } else channel = sxchannel_open(link, s_desc->pcid); if(!channel) { r = errno; goto __fini; } flags |= SXE_O_OCHANNELED; ss = __stream_open(link, channel, opt, s_desc, stid, flags); r = errno; __fini: if(r) errno = r; if(!ss && channel) sxchannel_close(channel); errno = r; return ss; } sxstream_t *sxstream_openwch(sxlink_t *link, sxchnl_t *channel, const char *opt, int stid, int flags) { struct sxstream_description *s_desc = NULL; sxstream_t *ss = NULL; int r = 0; if(!link || !channel) { r = SXE_FAILED; goto __fini; } if(!link->remote_streams) { r = SXE_NOSUCHSTREAMTYPE; goto __fini; } /* get description */ if(!(s_desc = __get_stream_description(link, stid))) { r = SXE_NOSUCHSTREAMTYPE; goto __fini; } ss = __stream_open(link, channel, opt, s_desc, stid, flags); __fini: if(r) errno = r; return ss; } int sxstream_close(sxstream_t *stream) { sxmsg_t *msg = NULL; char *mbuf; list_node_t *iter, *s_iter; struct _stream_list_node *listentry; size_t mbuf_len = strlen(_SXSTREAMCLOSE_CMD) + _SXSTREAMCLOSE_ADDLEN; int r = 0, type = 0; if(!stream) return SXE_FAILED; if(!stream->link || !stream->channel) return SXE_FAILED; if(!(mbuf = malloc(mbuf_len))) return SXE_ENOMEM; else mbuf_len = snprintf(mbuf, mbuf_len, "(%s %lu)", _SXSTREAMCLOSE_CMD, stream->sid); r = sxmsg_send(stream->channel, mbuf, mbuf_len + sizeof(char), &msg); if(r == SXE_RAPIDMSG) sxmsg_clean(msg); if(r == SXE_SUCCESS) { if(stream->flags & SXE_O_BINARY) type = SXE_O_BINARY; else if(stream->flags & SXE_O_NAMED) type = SXE_O_NAMED; else type = 0; switch(type) { case 0: case SXE_O_NAMED: if(stream->entries) { list_for_each_safe(stream->entries, iter, s_iter) { listentry = list_entry(iter, struct _stream_list_node, node); list_del(iter); if(!type) __free_listentry(listentry); else __free_listentrynamed(listentry); } } break; case SXE_O_BINARY: /* TODO: free all caches, buffers etc */ break; } if(stream->flags & SXE_O_OCHANNELED) r = sxchannel_close(stream->channel); free(stream); } free(mbuf); return r; } /* binary stream operations */ /* * NOTE: it will be done with with a copy paste, * there are todo - implement it with iolite style. */ size_t sxstream_bread(sxstream_t *stream, void *buf, size_t buf_len) { off_t offset = 0; size_t rd = 0; errno = SXE_FAILED; if(!stream || !buf) goto __fail; offset = (off_t)stream->cur_offset; rd = sxstream_breadat(stream, buf, buf_len, offset); if(rd > 0) stream->cur_offset += rd; return rd; __fail: return -1; } size_t sxstream_breadat(sxstream_t *stream, void *buf, size_t buf_len, off_t off) { sxmsg_t *msg; char *mbuf = NULL; size_t msglen = 0, r = -1; int n = 0; errno = SXE_ENOMEM; msglen = strlen(_SXSTREAMBREAD_CMD) + sizeof(char)*6 + 32*sizeof(char); if(!(mbuf = malloc(msglen))) return -1; errno = SXE_FAILED; if(!stream || !buf) goto __fail; if(!stream->link || !stream->channel) goto __fail; if(!buf_len || off < 0) goto __fail; msglen = snprintf(mbuf, msglen, "(%s %lu %lu %lu)", _SXSTREAMBREAD_CMD, stream->sid, off, buf_len); n = sxmsg_send(stream->channel, mbuf, msglen, &msg); if(n == SXE_RAPIDMSG) { errno = 0; r = sxmsg_datalen(msg); memcpy(buf, sxmsg_payload(msg), r); sxmsg_clean(msg); } else if(n == SXE_EOS) { r = 0; errno = SXE_EOS; } else { r = -1; errno = n; } __fail: if(mbuf) free(mbuf); return r; } size_t sxstream_bwriteat(sxstream_t *stream, const void *buf, size_t buf_len, off_t off) { sxmsg_t *msg; char *mbuf = NULL; sexp_t *sx = NULL, *isx; size_t msglen = 0, wr = -1; int idx, r; msglen = strlen(_SXSTREAMBWRITE_CMD) + sizeof(char)*8 + 32*sizeof(char); if(MAX_RBBUF_LEN - msglen < sxt_rawlen2b64len(buf_len)) /* cut the data */ buf_len = ((MAX_RBBUF_LEN - msglen)/4)*3 - 2; msglen += sxt_rawlen2b64len(buf_len); if(!(mbuf = malloc(msglen))) { errno = SXE_ENOMEM; goto __fail; } msglen = snprintf(mbuf, msglen, "(%s %lu %lu %lu \"", _SXSTREAMBWRITE_CMD, stream->sid, off, buf_len); wr = sxt_b64encode_in(buf, mbuf + msglen, buf_len); if(wr < 0) goto __fail; else msglen += wr; msglen += snprintf(mbuf + msglen, MAX_RBBUF_LEN - msglen, "\")"); r = sxmsg_send(stream->channel, mbuf, msglen, &msg); if(r == SXE_RAPIDMSG) { sx = parse_sexp(sxmsg_rapidbuf(msg), strlen(sxmsg_rapidbuf(msg))); sxmsg_clean(msg); if(!sx) { __badproto: wr = -1; errno = SXE_BADPROTO; goto __fail; } SEXP_ITERATE_LIST(sx, isx, idx) { if(isx->ty == SEXP_LIST) goto __badproto; if(!idx) wr = strtoul(isx->val, NULL, 0); else goto __badproto; } } else { wr = -1; errno = r; } __fail: if(sx) destroy_sexp(sx); if(mbuf) free(mbuf); return wr; } size_t sxstream_bsyncat(sxstream_t *stream, void *buf, size_t buf_len, off_t off) { /* TODO: for 0.4.4 */ return SXE_IGNORED; } /* * NOTE: returning value for reading from entry streams * shouldn't be managed, it's done by close operation. * That means you should care about ridden values, since * it might be freed on the next read operation. */ static struct _nn_stream_entry_node *__alloc_entry(const char *value) { struct _nn_stream_entry_node *o = malloc(sizeof(struct _nn_stream_entry_node)); if(!o) return NULL; if(!(o->value = strdup(value))) { free(o); return NULL; } list_init_node(&o->node); return o; } static void __free_entry(struct _nn_stream_entry_node *entry) { free(entry->value); free(entry); return; } static struct _stream_list_node *__alloc_listentry(void) { struct _stream_list_node *o = malloc(sizeof(struct _stream_list_node)); if(!o) return NULL; if(!(o->list = malloc(sizeof(list_head_t)))) { free(o); return NULL; } list_init_head(o->list); list_init_node(&o->node); return o; } static void __free_listentry(struct _stream_list_node *listentry) { struct _nn_stream_entry_node *entry; list_node_t *iter, *s_iter; list_for_each_safe(listentry->list, iter, s_iter) { entry = list_entry(iter, struct _nn_stream_entry_node, node); list_del(iter); __free_entry(entry); } free(listentry->list); free(listentry); return; } static void __free_listentrynamed(struct _stream_list_node *listentry) { struct _stream_entry_node *entry; list_node_t *iter, *s_iter; list_for_each_safe(listentry->list, iter, s_iter) { entry = list_entry(iter, struct _stream_entry_node, node); list_del(iter); free(entry->value); /* only value, name is a pointer to named */ free(entry); } free(listentry->list); free(listentry); return; } /* we have something like that ((:order 1 2) "value1" "value2") */ static int __stream_namedentry_sexp2list(sexp_t *sx, list_head_t *list, list_head_t *named) { struct _stream_entry_node *entry; struct _stream_named_order_node *onode; sexp_t *isx, *iisx; list_node_t *iter, *siter; char *name = NULL; list_head_t olist; int idx, iidx, order, r = 0; SEXP_ITERATE_LIST(sx, isx, idx) { if(!idx && isx->ty != SEXP_LIST) return SXE_BADPROTO; if(!idx) { list_init_head(&olist); SEXP_ITERATE_LIST(isx, iisx, iidx) { if(iisx->ty == SEXP_LIST) goto __badproto; switch(iidx) { case 0: if(strcmp(iisx->val, ":order")) return SXE_BADPROTO; break; default: order = atoi(iisx->val); name = (char *)sxstream_generic_named_lookupname(named, order); if(!name) goto __badproto; else if(!(onode = malloc(sizeof(struct _stream_named_order_node)))) goto __enomem; list_init_node(&onode->node); onode->name = name - sizeof(char); onode->order = order; list_add2tail(&olist, &onode->node); break; } } } else { if(isx->ty == SEXP_LIST) goto __badproto; if(idx == 1) iter = list_node_first(&olist); else { if(iter == list_head(&olist)) goto __badproto; iter = iter->next; } onode = list_entry(iter, struct _stream_named_order_node, node); name = onode->name; if(!(entry = malloc(sizeof(struct _stream_entry_node)))) goto __enomem; else if(!(entry->value = strdup(isx->val))) { free(entry); goto __enomem; } list_init_node(&entry->node); list_add2tail(list, &entry->node); } } return 0; __badproto: r = SXE_BADPROTO; __enomem: if(!r) r = SXE_ENOMEM; list_for_each_safe(&olist, iter, siter) { onode = list_entry(iter, struct _stream_named_order_node, node); list_del(iter); free(onode); } return r; } /* entries read ops */ static list_head_t *__sxstream_entryread(sxstream_t *stream, int _named) { sxmsg_t *msg; size_t mbuf_len; char *mbuf = NULL; list_node_t *cur = NULL, *iter, *s_iter; struct _stream_list_node *listentry = NULL; struct _nn_stream_entry_node *entry = NULL; sexp_t *sx, *isx, *iisx; int r = 0, idx, iidx; errno = SXE_FAILED; if(!stream) goto __fail; if(!stream->link || !stream->channel) goto __fail; if(_named < 0 || _named > 1) goto __fail; if(stream->entries) { if(stream->cur == list_node_last(stream->entries)) { cur = stream->cur; stream->cur = NULL; /* we need to read from remote again on the next operation */ } else if(stream->cur && stream->cur != list_node_last(stream->entries)) { cur = stream->cur; if(!list_node_next_isbound(cur)) stream->cur = stream->cur->next; else stream->cur = NULL; } else goto __read_seq; __success: if(cur) { errno = 0; listentry = list_entry(cur, struct _stream_list_node, node); return listentry->list; } else return NULL; } __read_seq: errno = SXE_ENOMEM; if(stream->entries) { /* ok, clean up all stuff */ list_for_each_safe(stream->entries, iter, s_iter) { listentry = list_entry(iter, struct _stream_list_node, node); list_del(iter); if(!_named) __free_listentry(listentry); else __free_listentrynamed(listentry); } } else { /* allocate it first time */ if(!(stream->entries = malloc(sizeof(list_head_t)))) goto __fail; list_init_head(stream->entries); } /* prepare a message */ mbuf_len = strlen(_SXSTREAMEREAD_CMD) + _SXSTREAMEREAD_ADDLEN; if(!(mbuf = malloc(mbuf_len))) goto __fail; mbuf_len = snprintf(mbuf, mbuf_len, "(%s %lu)", _SXSTREAMEREAD_CMD, stream->sid); r = sxmsg_send(stream->channel, mbuf, mbuf_len, &msg); switch(r) { case SXE_RAPIDMSG: /* get our data */ sx = parse_sexp(sxmsg_payload(msg), strlen(sxmsg_payload(msg))); sxmsg_clean(msg); if(!sx) { __badproto: if(sx) destroy_sexp(sx); errno = SXE_BADPROTO; goto __fail; } SEXP_ITERATE_LIST(sx, isx, idx) { if(isx->ty != SEXP_LIST) goto __badproto; /* allocate list entry and add it */ if(!(listentry = __alloc_listentry())) { __enomem: errno = SXE_ENOMEM; destroy_sexp(sx); goto __fail; } else list_add2tail(stream->entries, &listentry->node); if(!_named) { SEXP_ITERATE_LIST(isx, iisx, iidx) { if(iisx->ty == SEXP_LIST) goto __badproto; if(!(entry = __alloc_entry((const char *)iisx->val))) goto __enomem; else list_add2tail(listentry->list, &entry->node); } } else { /* named stream */ r = __stream_namedentry_sexp2list(isx, listentry->list, stream->named); switch(r) { case SXE_BADPROTO: goto __badproto; break; case SXE_ENOMEM: goto __enomem; break; default: r = 0; break; } } } /* ok let's see what we got */ if(idx) { stream->cur = list_node_first(stream->entries); cur = stream->cur; /* point to the next one if exists */ if(!list_node_next_isbound(cur)) stream->cur = stream->cur->next; else stream->cur = NULL; } else { errno = SXE_NILREPLY; stream->cur = NULL; cur = NULL; } destroy_sexp(sx); break; case SXE_EOS: /* stream is over */ errno = SXE_EOS; cur = NULL; break; default: errno = r; goto __fail; break; } free(mbuf); goto __success; __fail: if(mbuf) free(mbuf); return NULL; } /* entry nonamed stream ops */ list_head_t *sxstream_read(sxstream_t *stream) { return __sxstream_entryread(stream, 0); } /* entry named stream ops */ list_head_t *sxstream_readnamed(sxstream_t *stream) { return __sxstream_entryread(stream, 1); } /* builtin functions for streams */ static struct sxstream_opened *__alloc_stream(sxlink_t *link) { int idx; struct sxstream_opened *stream; pthread_mutex_lock(&link->idx_streams_lock); idx = idx_allocate(&link->idx_streams); pthread_mutex_unlock(&link->idx_streams_lock); if(idx == IDX_INVAL) return NULL; else stream = link->streams[idx]; if(!link->streams[idx] && !(stream = malloc(sizeof(struct sxstream_opened)))) { __inval: pthread_mutex_lock(&link->idx_streams_lock); idx_free(&link->idx_streams, idx); pthread_mutex_unlock(&link->idx_streams_lock); return NULL; } else if(link->streams[idx]) goto __inval; memset(stream, 0, sizeof(struct sxstream_opened)); stream->sto_id = idx; link->streams[idx] = stream; return stream; } static void __free_stream(sxlink_t *link, struct sxstream_opened *stream) { pthread_mutex_lock(&link->idx_streams_lock); idx_free(&link->idx_streams, stream->sto_id); link->streams[stream->sto_id] = NULL; pthread_mutex_unlock(&link->idx_streams_lock); free(stream); return; } int _builtin_stream_open(void *m, sexp_t *sx) { sxmsg_t *msg = (sxmsg_t *)m; sxchnl_t *channel = msg->pch; sxlink_t *link = NULL; sxhub_t *hub; sexp_t *isx, *iisx; char *rbuf = NULL, *opt = NULL, *s_opt = NULL, *nmliststr = NULL; usrtc_node_t *node; struct sxstream_description *s_desc; struct sxstream_opened *strea; size_t rbuf_len; int chan_type, idx, r = SXE_FAILED; int flags = 0, des_type, iidx; if(!channel) goto __fail; link = channel->link; chan_type = channel->type_id; if(!link || !channel->rpc_list) goto __fail; /* paranoid test */ /* parse incoming builtin function */ SEXP_ITERATE_LIST(sx, isx, idx) { if(!idx && isx->ty == SEXP_LIST) { __badproto: r = SXE_BADPROTO; goto __fail; } else if(idx && isx->ty != SEXP_LIST) goto __badproto; if(isx->ty == SEXP_LIST) { SEXP_ITERATE_LIST(isx, iisx, iidx) { if(iisx->ty == SEXP_LIST) goto __badproto; switch(iidx) { case 0: opt = iisx->val; if(*opt != ':') goto __badproto; break; case 1: if(!strcmp(opt, _SXSTREAMTYPE_PREFIX)) des_type = atoi(iisx->val); else if(!strcmp(opt, _SXSTREAMFLAGS_PREFIX)) flags = atoi(iisx->val); else if(!strcmp(opt, _SXSTREAMOPT_PREFIX)) s_opt = iisx->val; else goto __badproto; break; default: goto __badproto; break; } } } else if(idx) goto __badproto; } /* check availability */ hub = link->hub; if(!hub->streams) { __nostream: r = SXE_NOSUCHSTREAMTYPE; goto __fail; } if(!(node = usrtc_lookup(hub->streams, &des_type))) goto __nostream; else s_desc = (struct sxstream_description *)usrtc_node_getdata(node); if(s_desc->pcid != chan_type) { /* check permission by channel */ __eperm: r = SXE_EPERM; goto __fail; } if(__flag_check(s_desc->flags, flags)) goto __eperm; /* by flags */ /* create a new remote side channel */ if(!(strea = __alloc_stream(link))) { r = SXE_ENOMEM; goto __fail; } /* set stream details */ strea->desc = s_desc; strea->flags = flags; strea->pin_channelid = channel->cid; r = s_desc->ops->s_open(link, strea, s_opt); if(r != SXE_SUCCESS) { /* deallocate stream */ __free_stream(link, strea); goto __fail; } /* rbuf length */ rbuf_len = _SXSTREAMCLOSE_ADDLEN + strlen(_SXSTREAMOPEN_CMD_R); if(s_desc->type == SXE_O_NAMED) { nmliststr = __stream_namedlist_list2sxstr(s_desc->named); if(!nmliststr) { r = SXE_FAILED; __free_stream(link, strea); goto __fail; } rbuf_len += strlen(nmliststr) + sizeof(char)*4; } rbuf = sxmsg_rapidbuf(msg); if(s_desc->type != SXE_O_NAMED) rbuf_len = snprintf(rbuf, rbuf_len, "(%s %lu)", _SXSTREAMOPEN_CMD_R, strea->sto_id); else { rbuf_len = snprintf(rbuf, rbuf_len, "(%s %lu %s)", _SXSTREAMOPEN_CMD_R, strea->sto_id, nmliststr); free(nmliststr); } return sxmsg_rreply(msg, rbuf_len + sizeof(char)); __fail: return sxmsg_return(msg, r); } int _builtin_stream_close(void *m, sexp_t *sx) { sxmsg_t *msg = (sxmsg_t *)m; sxchnl_t *channel = msg->pch; sxlink_t *link = NULL; struct sxstream_opened *stream = NULL; sexp_t *isx; uint64_t sid = 0; int r = SXE_FAILED, idx; if(!channel) goto __fini; link = channel->link; if(!link || !channel->rpc_list) goto __fini; /* paranoid test */ /* parse input */ SEXP_ITERATE_LIST(sx, isx, idx) { if(isx->ty == SEXP_LIST) { __badproto: r = SXE_BADPROTO; goto __fini; } switch(idx) { case 0: r = SXE_BADPROTO; break; case 1: sid = strtoul(isx->val, NULL, 0); r = SXE_SUCCESS; break; default: goto __badproto; break; } } if(r != SXE_SUCCESS) goto __fini; /* get this */ stream = link->streams[(int)sid]; if(stream) { if(stream->pin_channelid == channel->cid) { r = SXE_SUCCESS; r = stream->desc->ops->s_close(stream); if(r == SXE_SUCCESS) __free_stream(link, stream); } else r = SXE_EPERM; } else r = SXE_NOSUCHSTREAMTYPE; __fini: return sxmsg_return(msg, r); } /* * this simple function returns length only, i.e. * it's going thru the list and lookup for all the * required stuff. * also it will check up the values. */ static size_t __entry_order_sexplen(list_head_t *list, struct sxstream_description *desc) { size_t len = 4*sizeof(char) + strlen(":order"); list_node_t *iter, *siter; struct _stream_entry_node *entry; int order; if(!desc->named) goto __fail; /* avoid segfaults */ list_for_each_safe(list, iter, siter) { entry = container_of(iter, struct _stream_entry_node, node); if(!entry->name) goto __fail; else order = sxstream_generic_named_lookuporder(desc->named, entry->name); if(order < 0) goto __fail; else len += 4*sizeof(char); if(iter != list_node_last(list)) len += sizeof(char); } __fail: return 0; } /* * this function hasn't checks for validity of the given list * use it with care. * if __entry_order_sexplen() was success it's safe to call it and * expect a normal data in the given buffer. * btw, it will stop if max_len reached to avoid out of the buffer writes. */ static size_t __entry_order_tosexpstr(list_head_t *list, struct sxstream_description *desc, char *buf, size_t max_len) { size_t dw = 0; list_node_t *iter, *siter; struct _stream_entry_node *entry; int order; dw += snprintf(buf + dw, max_len - dw, "(:order "); list_for_each_safe(list, iter, siter) { entry = container_of(iter, struct _stream_entry_node, node); order = sxstream_generic_named_lookuporder(desc->named, entry->name); if(iter != list_node_last(list)) dw += snprintf(buf + dw, max_len - dw, "%d ", order); else dw += snprintf(buf + dw, max_len - dw, "%d", order); } dw += snprintf(buf + dw, max_len - dw, ")"); return dw; } int _builtin_stream_eread(void *m, sexp_t *sx) { sxmsg_t *msg = (sxmsg_t *)m; sxchnl_t *channel = msg->pch; sxlink_t *link = NULL; struct sxstream_opened *stream = NULL; sexp_t *isx; list_node_t *iter, *siter; struct _nn_stream_entry_node *entry; struct _stream_entry_node *nentry; char *rbuf, *tentry; uint64_t sid = 0; size_t rbuf_len = 0, list_len = 0; int r = SXE_FAILED, idx, n; if(!channel) goto __fail; link = channel->link; if(!link || !channel->rpc_list) goto __fail; /* paranoid test */ /* parse input */ SEXP_ITERATE_LIST(sx, isx, idx) { if(isx->ty == SEXP_LIST) { __badproto: r = SXE_BADPROTO; goto __fail; } switch(idx) { case 0: r = SXE_BADPROTO; break; case 1: sid = strtoul(isx->val, NULL, 0); r = SXE_SUCCESS; break; default: goto __badproto; break; } } if(r != SXE_SUCCESS) goto __fail; /* get this */ stream = link->streams[(int)sid]; if(stream) { if(stream->pin_channelid == channel->cid) { /* we shouldn't fail here, check for the stream type first */ if(stream->desc->flags & SXE_O_BINARY) { r = SXE_FAILED; goto __fail; } /* last check: permission for read check */ if(!(stream->flags & SXE_O_READ)) { r = SXE_EPERM; goto __fail; } else rbuf = sxmsg_rapidbuf(msg); idx = 0; /* here we go */ rbuf_len = snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, "("); while(1) { if(stream->ent_buf) { /* first we need to determine if this list * of values will fit to our rapid buffer */ if(!stream->desc->type) { /* the case for nonamed streams */ list_len = 4*sizeof(char); list_for_each_safe(stream->ent_buf, iter, siter) { entry = container_of(iter, struct _nn_stream_entry_node, node); list_len += strlen(entry->value); if(iter != list_node_last(stream->ent_buf)) list_len += sizeof(char); } } else if(stream->desc->type == SXE_O_NAMED) { /* named entry streams */ /* each entry contains a small list of value order to keep * space for data i.e. instead of send a value names, * send order. */ list_len = 8*sizeof(char) + __entry_order_sexplen(stream->ent_buf, stream->desc); if(list_len <= 8*sizeof(char)) { r = SXE_FAILED; goto __fail; } list_for_each_safe(stream->ent_buf, iter, siter) { nentry = container_of(iter, struct _stream_entry_node, node); list_len += strlen(nentry->value); if(iter != list_node_last(stream->ent_buf)) list_len += sizeof(char); } } else { r = SXE_FAILED; goto __fail; } /* there are important to check up the buffer capability * if it's possible to fit entry - we will fit it, otherwise * leave this entry for the further read operation, and try to * send what we has already. */ if((rbuf_len + list_len) >= MAX_RBBUF_LEN) { /* reach the end */ if(!idx) { r = SXE_TOOLONG; goto __fail; } else break; /* buffer full, ready to send */ } /* write to rapid buffer */ rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, "("); if(stream->desc->type == SXE_O_NAMED) /* write order list info */ rbuf_len += __entry_order_tosexpstr(stream->ent_buf, stream->desc, rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len); /* write entry data */ list_for_each_safe(stream->ent_buf, iter, siter) { if(stream->desc->type == SXE_O_NAMED) { nentry = container_of(iter, struct _stream_entry_node, node); tentry = nentry->value; } else { entry = container_of(iter, struct _nn_stream_entry_node, node); tentry = entry->value; } if(iter == list_node_last(stream->ent_buf)) rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, "%s", tentry); else rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, "%s ", tentry); } rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, ")"); idx++; } /* read next */ n = stream->desc->ops->s_eread(stream, 1, 0, &stream->ent_buf); if(n <= 0) { if(!idx) { r = SXE_EOS; goto __fail; } else break; } } rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, ")"); return sxmsg_rreply(msg, rbuf_len + sizeof(char)); } else r = SXE_EPERM; } else r = SXE_NOSUCHSTREAMTYPE; __fail: return sxmsg_return(msg, r); } int _builtin_stream_bread(void *m, sexp_t *sx) { sxmsg_t *msg = (sxmsg_t *)m; sxchnl_t *channel = msg->pch; sxlink_t *link = NULL; struct sxstream_opened *stream = NULL; sexp_t *isx; void *rbuf; uint64_t sid = 0, offset; size_t rdlen; int r = SXE_FAILED, idx; if(!channel) goto __fail; link = channel->link; if(!link || !channel->rpc_list) goto __fail; /* paranoid test */ /* parse input */ SEXP_ITERATE_LIST(sx, isx, idx) { if(isx->ty == SEXP_LIST) { __badproto: r = SXE_BADPROTO; goto __fail; } switch(idx) { case 0: r = SXE_BADPROTO; break; case 1: sid = strtoul(isx->val, NULL, 0); break; case 2: offset = strtoul(isx->val, NULL, 0); break; case 3: rdlen = strtoul(isx->val, NULL, 0); r = SXE_SUCCESS; break; default: goto __badproto; break; } } if(r != SXE_SUCCESS) goto __fail; if(!rdlen) { /* in case of reading zero bytes - we will send nilreply error */ r = SXE_NILREPLY; goto __fail; } if(rdlen >= MAX_RBBUF_LEN) rdlen = MAX_RBBUF_LEN - 1; /* check up for out of buffer reading */ /* check out if stream exists */ if(!(stream = link->streams[(int)sid])) { r = SXE_NOSUCHSTREAMTYPE; goto __fail; } /* check up if we have permission to do it */ if((stream->pin_channelid != channel->cid) || !(stream->flags & SXE_O_READ)) { r = SXE_EPERM; goto __fail; } /* check if the givn stream has binary capability */ if(stream->desc->type != SXE_O_BINARY) { r = SXE_FAILED; goto __fail; } rbuf = sxmsg_rapidbuf(msg); rdlen = stream->desc->ops->s_bread(stream, rdlen, offset, rbuf); if(!rdlen) { r = SXE_EOS; goto __fail; } if(rdlen < 0) { /* nil reply */ r = SXE_NILREPLY; goto __fail; } else return sxmsg_rreply(msg, rdlen); __fail: return sxmsg_return(msg, r); } int _builtin_stream_bwrite(void *m, sexp_t *sx) { int r = SXE_FAILED; sxmsg_t *msg = (sxmsg_t *)m; return sxmsg_return(msg, r); }