You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1231 lines
33 KiB
C
1231 lines
33 KiB
C
/*
|
|
* Secure X Message Passing Library v2 implementation.
|
|
* (sxmplv2) it superseed all versions before due to the:
|
|
* - memory consumption
|
|
* - new features such as pulse emitting
|
|
* - performance optimization
|
|
*
|
|
* (c) Askele Group 2013-2015 <http://askele.com>
|
|
* (c) Alexander Vdolainen 2013-2015,2016 <avdolainen@zoho.com>
|
|
*
|
|
* libsxmp is free software: you can redistribute it and/or modify it
|
|
* under the terms of the GNU Lesser General Public License as published
|
|
* by the Free Software Foundation, either version 2.1 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* libsxmp is distributed in the hope that it will be useful, but
|
|
* WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
|
|
* See the GNU Lesser General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Lesser General Public License
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.";
|
|
*
|
|
*/
|
|
|
|
#include <stdlib.h>
|
|
#include <stdio.h>
|
|
#include <errno.h>
|
|
#include <string.h>
|
|
#include <pthread.h>
|
|
#include <sys/stat.h>
|
|
#include <sys/time.h>
|
|
#include <sys/types.h>
|
|
#include <sys/mman.h>
|
|
#include <fcntl.h>
|
|
|
|
#include <openssl/ssl.h>
|
|
#include <openssl/err.h>
|
|
#include <openssl/engine.h>
|
|
|
|
#include <tdata/usrtc.h>
|
|
#include <tdata/list.h>
|
|
#include <sxt/base64.h>
|
|
#include <sexpr/sexp.h>
|
|
|
|
#include <sxmp/limits.h>
|
|
#include <sxmp/sxmp.h>
|
|
|
|
#include "internal.h"
|
|
|
|
static void __free_listentry(struct _stream_list_node *listentry);
|
|
static void __free_listentrynamed(struct _stream_list_node *listentry);
|
|
|
|
static int __flag_check(int s_flag, int d_flag)
|
|
{
|
|
int i;
|
|
|
|
for(i = 1; i < 6; i++)
|
|
if((d_flag & (1 << i)) && !(s_flag & (1 << i))) return 1;
|
|
|
|
return 0;
|
|
}
|
|
|
|
static char *__stream_namedlist_list2sxstr(list_head_t *named)
|
|
{
|
|
list_node_t *iter, *siter;
|
|
struct _stream_named_order_node *entry;
|
|
char *out = NULL;
|
|
size_t out_len = sizeof(char)*4, lc = 0;
|
|
|
|
if(!named) return NULL;
|
|
|
|
list_for_each_safe(named, iter, siter) {
|
|
entry = list_entry(iter, struct _stream_named_order_node, node);
|
|
out_len += sizeof(char)*9 + strlen(entry->name);
|
|
}
|
|
|
|
if(!(out = malloc(out_len))) return NULL;
|
|
|
|
lc += snprintf(out + lc, out_len - lc, "(");
|
|
list_for_each_safe(named, iter, siter) {
|
|
entry = list_entry(iter, struct _stream_named_order_node, node);
|
|
lc += snprintf(out + lc, out_len - lc, "(%d %s)", entry->order, entry->name + sizeof(char));
|
|
}
|
|
lc += snprintf(out + lc, out_len - lc, ")");
|
|
|
|
return out;
|
|
}
|
|
|
|
static int __stream_namedlist_sexp2list(list_head_t *named, sexp_t *sx)
|
|
{
|
|
sexp_t *isx, *iisx;
|
|
char *name = NULL;
|
|
int idx, iidx, order = 0;
|
|
|
|
SEXP_ITERATE_LIST(sx, isx, idx) {
|
|
if(isx->ty != SEXP_LIST) return SXE_BADPROTO;
|
|
|
|
SEXP_ITERATE_LIST(isx, iisx, iidx) {
|
|
if(iisx->ty == SEXP_LIST) return SXE_BADPROTO;
|
|
|
|
switch(iidx) {
|
|
case 0: order = atoi(iisx->val); break;
|
|
case 1: name = iisx->val; break;
|
|
default: return SXE_BADPROTO;
|
|
}
|
|
}
|
|
if(sxstream_generic_nmlist_additem(named, name, order) < 0) return SXE_BADPROTO;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static sxstream_t *__stream_open(sxlink_t *link, sxchnl_t *channel, const char *opt,
|
|
struct sxstream_description *desc, int stid, int flags)
|
|
{
|
|
char *msgbuf = NULL;
|
|
size_t mlen = 0;
|
|
sxstream_t *ss = NULL;
|
|
list_head_t *named = NULL;
|
|
sxmsg_t *msg = NULL;
|
|
char *buf = NULL;
|
|
sexp_t *sx, *isx;
|
|
uint64_t sid = 0;
|
|
int r = 0, idx;
|
|
|
|
/* check flags */
|
|
if(__flag_check(desc->flags, flags)) {
|
|
errno = SXE_EPERM;
|
|
goto __fini;
|
|
}
|
|
|
|
/* time to alloc all stuff */
|
|
mlen = strlen(_SXSTREAMOPEN_CMD) + strlen(_SXSTREAMTYPE_PREFIX) + strlen(_SXSTREAMFLAGS_PREFIX);
|
|
if(!opt)
|
|
mlen += (_SXSTREAMOPEN_ADDMINLEN + sizeof(char));
|
|
else mlen += (_SXSTREAMOPEN_ADDMAXLEN + sizeof(char)) + strlen(opt);
|
|
|
|
if(!(msgbuf = malloc(mlen*sizeof(char)))) {
|
|
errno = SXE_ENOMEM;
|
|
goto __fini;
|
|
}
|
|
|
|
if(!opt)
|
|
mlen = snprintf(msgbuf, mlen, "(%s (%s %d)(%s %d))", _SXSTREAMOPEN_CMD,
|
|
_SXSTREAMTYPE_PREFIX, stid, _SXSTREAMFLAGS_PREFIX, flags);
|
|
else
|
|
mlen = snprintf(msgbuf, mlen, "(%s (%s %d)(%s %d)(%s \"%s\"))", _SXSTREAMOPEN_CMD,
|
|
_SXSTREAMTYPE_PREFIX, stid, _SXSTREAMFLAGS_PREFIX, flags,
|
|
_SXSTREAMOPT_PREFIX, opt);
|
|
|
|
r = sxmsg_send(channel, msgbuf, mlen + sizeof(char), &msg);
|
|
if(r != SXE_RAPIDMSG) {
|
|
errno = r;
|
|
goto __fini;
|
|
} else r = 0;
|
|
|
|
/* time to get the special id, put it to the link and clean up all */
|
|
buf = sxmsg_rapidbuf(msg);
|
|
if(!(sx = parse_sexp(buf, strlen(buf)))) {
|
|
__enomem:
|
|
r = errno = SXE_ENOMEM;
|
|
sxmsg_clean(msg);
|
|
goto __fini;
|
|
} else sxmsg_clean(msg);
|
|
|
|
if(desc->type == SXE_O_NAMED) {
|
|
named = sxstream_generic_named_alloc();
|
|
if(!named) goto __enomem;
|
|
}
|
|
|
|
SEXP_ITERATE_LIST(sx, isx, idx) {
|
|
if(isx->ty == SEXP_LIST && desc->type != SXE_O_NAMED) r = SXE_BADPROTO;
|
|
if(isx->ty == SEXP_LIST && idx < 2) r = SXE_BADPROTO;
|
|
|
|
switch(idx) {
|
|
case 0: if(strcmp(isx->val, _SXSTREAMOPEN_CMD_R)) r = SXE_BADPROTO; break;
|
|
case 1: sid = strtoul(isx->val, NULL, 0); break;
|
|
case 2:
|
|
if(desc->type != SXE_O_NAMED) r = SXE_BADPROTO;
|
|
else if(isx->ty == SEXP_LIST) r = __stream_namedlist_sexp2list(named, isx);
|
|
else r = SXE_BADPROTO;
|
|
break;
|
|
default: r = SXE_BADPROTO; break;
|
|
}
|
|
}
|
|
destroy_sexp(sx);
|
|
|
|
if(!r) {
|
|
if(!(ss = malloc(sizeof(sxstream_t)))) {
|
|
r = errno = SXE_ENOMEM;
|
|
goto __fini;
|
|
}
|
|
|
|
memset(ss, 0, sizeof(sxstream_t));
|
|
ss->flags = flags;
|
|
ss->sid = sid;
|
|
ss->link = link;
|
|
ss->channel = channel;
|
|
ss->named = named;
|
|
} else errno = r;
|
|
|
|
__fini:
|
|
if(r && named) sxstream_generic_named_free(named);
|
|
if(msgbuf) free(msgbuf);
|
|
|
|
return ss;
|
|
}
|
|
|
|
static struct sxstream_description *__get_stream_description(sxlink_t *link, int stid)
|
|
{
|
|
usrtc_node_t *node = NULL;
|
|
struct sxstream_description *s_desc = NULL;
|
|
|
|
if(!link->remote_streams) return NULL;
|
|
else node = usrtc_lookup(link->remote_streams, &stid);
|
|
|
|
if(node) s_desc = (struct sxstream_description *)usrtc_node_getdata(node);
|
|
|
|
return s_desc;
|
|
}
|
|
|
|
/*
|
|
* NOTE: sxstream operations (except opening process i.e. few threads
|
|
* can open different streams) are NOT _thread-safe_, please
|
|
* care about that in your own application.
|
|
*/
|
|
sxstream_t *sxstream_open(sxlink_t *link, const char *opt, int stid, int flags)
|
|
{
|
|
sxchnl_t *channel = NULL;
|
|
struct sxstream_description *s_desc;
|
|
sxstream_t *ss = NULL;
|
|
int r = 0;
|
|
|
|
if(!link) {
|
|
r = SXE_FAILED;
|
|
goto __fini;
|
|
}
|
|
if(!link->remote_streams) {
|
|
r = SXE_NOSUCHSTREAMTYPE;
|
|
goto __fini;
|
|
}
|
|
|
|
/* ok, let's a deal with channel */
|
|
if(!(s_desc = __get_stream_description(link, stid))) {
|
|
r = SXE_NOSUCHSTREAMTYPE;
|
|
goto __fini;
|
|
} else channel = sxchannel_open(link, s_desc->pcid);
|
|
|
|
if(!channel) {
|
|
r = errno;
|
|
goto __fini;
|
|
}
|
|
|
|
flags |= SXE_O_OCHANNELED;
|
|
ss = __stream_open(link, channel, opt, s_desc, stid, flags);
|
|
r = errno;
|
|
|
|
__fini:
|
|
if(r) errno = r;
|
|
if(!ss && channel) sxchannel_close(channel);
|
|
errno = r;
|
|
|
|
return ss;
|
|
}
|
|
|
|
sxstream_t *sxstream_openwch(sxlink_t *link, sxchnl_t *channel, const char *opt,
|
|
int stid, int flags)
|
|
{
|
|
struct sxstream_description *s_desc = NULL;
|
|
sxstream_t *ss = NULL;
|
|
int r = 0;
|
|
|
|
if(!link || !channel) {
|
|
r = SXE_FAILED;
|
|
goto __fini;
|
|
}
|
|
if(!link->remote_streams) {
|
|
r = SXE_NOSUCHSTREAMTYPE;
|
|
goto __fini;
|
|
}
|
|
|
|
/* get description */
|
|
if(!(s_desc = __get_stream_description(link, stid))) {
|
|
r = SXE_NOSUCHSTREAMTYPE;
|
|
goto __fini;
|
|
}
|
|
|
|
ss = __stream_open(link, channel, opt, s_desc, stid, flags);
|
|
|
|
__fini:
|
|
if(r) errno = r;
|
|
return ss;
|
|
}
|
|
|
|
int sxstream_close(sxstream_t *stream)
|
|
{
|
|
sxmsg_t *msg = NULL;
|
|
char *mbuf;
|
|
list_node_t *iter, *s_iter;
|
|
struct _stream_list_node *listentry;
|
|
size_t mbuf_len = strlen(_SXSTREAMCLOSE_CMD) + _SXSTREAMCLOSE_ADDLEN;
|
|
int r = 0, type = 0;
|
|
|
|
if(!stream) return SXE_FAILED;
|
|
if(!stream->link || !stream->channel) return SXE_FAILED;
|
|
if(!(mbuf = malloc(mbuf_len))) return SXE_ENOMEM;
|
|
else mbuf_len = snprintf(mbuf, mbuf_len, "(%s %lu)", _SXSTREAMCLOSE_CMD, stream->sid);
|
|
|
|
r = sxmsg_send(stream->channel, mbuf, mbuf_len + sizeof(char), &msg);
|
|
if(r == SXE_RAPIDMSG) sxmsg_clean(msg);
|
|
|
|
if(r == SXE_SUCCESS) {
|
|
if(stream->flags & SXE_O_BINARY) type = SXE_O_BINARY;
|
|
else if(stream->flags & SXE_O_NAMED) type = SXE_O_NAMED;
|
|
else type = 0;
|
|
|
|
switch(type) {
|
|
case 0:
|
|
case SXE_O_NAMED:
|
|
if(stream->entries) {
|
|
list_for_each_safe(stream->entries, iter, s_iter) {
|
|
listentry = list_entry(iter, struct _stream_list_node, node);
|
|
list_del(iter);
|
|
if(!type) __free_listentry(listentry);
|
|
else __free_listentrynamed(listentry);
|
|
}
|
|
}
|
|
break;
|
|
case SXE_O_BINARY:
|
|
/* TODO: free all caches, buffers etc */
|
|
break;
|
|
}
|
|
|
|
if(stream->flags & SXE_O_OCHANNELED)
|
|
r = sxchannel_close(stream->channel);
|
|
|
|
free(stream);
|
|
}
|
|
|
|
free(mbuf);
|
|
|
|
return r;
|
|
}
|
|
|
|
/* binary stream operations */
|
|
/*
|
|
* NOTE: it will be done with with a copy paste,
|
|
* there are todo - implement it with iolite style.
|
|
*/
|
|
size_t sxstream_bread(sxstream_t *stream, void *buf, size_t buf_len)
|
|
{
|
|
off_t offset = 0;
|
|
size_t rd = 0;
|
|
|
|
errno = SXE_FAILED;
|
|
if(!stream || !buf) goto __fail;
|
|
offset = (off_t)stream->cur_offset;
|
|
|
|
rd = sxstream_breadat(stream, buf, buf_len, offset);
|
|
if(rd > 0) stream->cur_offset += rd;
|
|
|
|
return rd;
|
|
|
|
__fail:
|
|
return -1;
|
|
}
|
|
|
|
size_t sxstream_breadat(sxstream_t *stream, void *buf, size_t buf_len, off_t off)
|
|
{
|
|
sxmsg_t *msg;
|
|
char *mbuf = NULL;
|
|
size_t msglen = 0, r = -1;
|
|
int n = 0;
|
|
|
|
errno = SXE_ENOMEM;
|
|
|
|
msglen = strlen(_SXSTREAMBREAD_CMD) + sizeof(char)*6 + 32*sizeof(char);
|
|
if(!(mbuf = malloc(msglen))) return -1;
|
|
|
|
errno = SXE_FAILED;
|
|
if(!stream || !buf) goto __fail;
|
|
if(!stream->link || !stream->channel) goto __fail;
|
|
if(!buf_len || off < 0) goto __fail;
|
|
|
|
msglen = snprintf(mbuf, msglen, "(%s %lu %lu %lu)", _SXSTREAMBREAD_CMD, stream->sid, off, buf_len);
|
|
n = sxmsg_send(stream->channel, mbuf, msglen, &msg);
|
|
if(n == SXE_RAPIDMSG) {
|
|
errno = 0;
|
|
r = sxmsg_datalen(msg);
|
|
memcpy(buf, sxmsg_payload(msg), r);
|
|
sxmsg_clean(msg);
|
|
} else if(n == SXE_EOS) {
|
|
r = 0;
|
|
errno = SXE_EOS;
|
|
} else {
|
|
r = -1;
|
|
errno = n;
|
|
}
|
|
|
|
__fail:
|
|
if(mbuf) free(mbuf);
|
|
return r;
|
|
}
|
|
|
|
size_t sxstream_bwriteat(sxstream_t *stream, const void *buf,
|
|
size_t buf_len, off_t off)
|
|
{
|
|
sxmsg_t *msg;
|
|
char *mbuf = NULL;
|
|
sexp_t *sx = NULL, *isx;
|
|
size_t msglen = 0, wr = -1;
|
|
int idx, r;
|
|
|
|
msglen = strlen(_SXSTREAMBWRITE_CMD) + sizeof(char)*8 + 32*sizeof(char);
|
|
if(MAX_RBBUF_LEN - msglen < sxt_rawlen2b64len(buf_len)) /* cut the data */
|
|
buf_len = ((MAX_RBBUF_LEN - msglen)/4)*3 - 2;
|
|
|
|
msglen += sxt_rawlen2b64len(buf_len);
|
|
if(!(mbuf = malloc(msglen))) {
|
|
errno = SXE_ENOMEM;
|
|
goto __fail;
|
|
}
|
|
msglen = snprintf(mbuf, msglen, "(%s %lu %lu %lu \"", _SXSTREAMBWRITE_CMD, stream->sid,
|
|
off, buf_len);
|
|
wr = sxt_b64encode_in(buf, mbuf + msglen, buf_len);
|
|
if(wr < 0) goto __fail;
|
|
else msglen += wr;
|
|
msglen += snprintf(mbuf + msglen, MAX_RBBUF_LEN - msglen, "\")");
|
|
|
|
r = sxmsg_send(stream->channel, mbuf, msglen, &msg);
|
|
if(r == SXE_RAPIDMSG) {
|
|
sx = parse_sexp(sxmsg_rapidbuf(msg), strlen(sxmsg_rapidbuf(msg)));
|
|
sxmsg_clean(msg);
|
|
if(!sx) {
|
|
__badproto:
|
|
wr = -1;
|
|
errno = SXE_BADPROTO;
|
|
goto __fail;
|
|
}
|
|
|
|
SEXP_ITERATE_LIST(sx, isx, idx) {
|
|
if(isx->ty == SEXP_LIST) goto __badproto;
|
|
if(!idx) wr = strtoul(isx->val, NULL, 0);
|
|
else goto __badproto;
|
|
}
|
|
} else {
|
|
wr = -1;
|
|
errno = r;
|
|
}
|
|
|
|
__fail:
|
|
if(sx) destroy_sexp(sx);
|
|
if(mbuf) free(mbuf);
|
|
return wr;
|
|
}
|
|
|
|
size_t sxstream_bsyncat(sxstream_t *stream, void *buf, size_t buf_len, off_t off)
|
|
{
|
|
/* TODO: for 0.4.4 */
|
|
return SXE_IGNORED;
|
|
}
|
|
|
|
/*
|
|
* NOTE: returning value for reading from entry streams
|
|
* shouldn't be managed, it's done by close operation.
|
|
* That means you should care about ridden values, since
|
|
* it might be freed on the next read operation.
|
|
*/
|
|
static struct _nn_stream_entry_node *__alloc_entry(const char *value)
|
|
{
|
|
struct _nn_stream_entry_node *o = malloc(sizeof(struct _nn_stream_entry_node));
|
|
|
|
if(!o) return NULL;
|
|
if(!(o->value = strdup(value))) {
|
|
free(o);
|
|
return NULL;
|
|
}
|
|
|
|
list_init_node(&o->node);
|
|
|
|
return o;
|
|
}
|
|
|
|
static void __free_entry(struct _nn_stream_entry_node *entry)
|
|
{
|
|
free(entry->value);
|
|
free(entry);
|
|
|
|
return;
|
|
}
|
|
|
|
static struct _stream_list_node *__alloc_listentry(void)
|
|
{
|
|
struct _stream_list_node *o = malloc(sizeof(struct _stream_list_node));
|
|
|
|
if(!o) return NULL;
|
|
if(!(o->list = malloc(sizeof(list_head_t)))) {
|
|
free(o);
|
|
return NULL;
|
|
}
|
|
|
|
list_init_head(o->list);
|
|
list_init_node(&o->node);
|
|
|
|
return o;
|
|
}
|
|
|
|
static void __free_listentry(struct _stream_list_node *listentry)
|
|
{
|
|
struct _nn_stream_entry_node *entry;
|
|
list_node_t *iter, *s_iter;
|
|
|
|
list_for_each_safe(listentry->list, iter, s_iter) {
|
|
entry = list_entry(iter, struct _nn_stream_entry_node, node);
|
|
list_del(iter);
|
|
__free_entry(entry);
|
|
}
|
|
|
|
free(listentry->list);
|
|
free(listentry);
|
|
|
|
return;
|
|
}
|
|
|
|
static void __free_listentrynamed(struct _stream_list_node *listentry)
|
|
{
|
|
struct _stream_entry_node *entry;
|
|
list_node_t *iter, *s_iter;
|
|
|
|
list_for_each_safe(listentry->list, iter, s_iter) {
|
|
entry = list_entry(iter, struct _stream_entry_node, node);
|
|
list_del(iter);
|
|
free(entry->value); /* only value, name is a pointer to named */
|
|
free(entry);
|
|
}
|
|
|
|
free(listentry->list);
|
|
free(listentry);
|
|
|
|
return;
|
|
}
|
|
|
|
/* we have something like that ((:order 1 2) "value1" "value2") */
|
|
static int __stream_namedentry_sexp2list(sexp_t *sx, list_head_t *list, list_head_t *named)
|
|
{
|
|
struct _stream_entry_node *entry;
|
|
struct _stream_named_order_node *onode;
|
|
sexp_t *isx, *iisx;
|
|
list_node_t *iter, *siter;
|
|
char *name = NULL;
|
|
list_head_t olist;
|
|
int idx, iidx, order, r = 0;
|
|
|
|
SEXP_ITERATE_LIST(sx, isx, idx) {
|
|
if(!idx && isx->ty != SEXP_LIST) return SXE_BADPROTO;
|
|
|
|
if(!idx) {
|
|
list_init_head(&olist);
|
|
|
|
SEXP_ITERATE_LIST(isx, iisx, iidx) {
|
|
if(iisx->ty == SEXP_LIST) goto __badproto;
|
|
switch(iidx) {
|
|
case 0: if(strcmp(iisx->val, ":order")) return SXE_BADPROTO; break;
|
|
default:
|
|
order = atoi(iisx->val);
|
|
name = (char *)sxstream_generic_named_lookupname(named, order);
|
|
if(!name) goto __badproto;
|
|
else if(!(onode = malloc(sizeof(struct _stream_named_order_node)))) goto __enomem;
|
|
|
|
list_init_node(&onode->node);
|
|
onode->name = name - sizeof(char);
|
|
onode->order = order;
|
|
|
|
list_add2tail(&olist, &onode->node);
|
|
break;
|
|
}
|
|
}
|
|
} else {
|
|
if(isx->ty == SEXP_LIST) goto __badproto;
|
|
if(idx == 1) iter = list_node_first(&olist);
|
|
else {
|
|
if(iter == list_head(&olist)) goto __badproto;
|
|
iter = iter->next;
|
|
}
|
|
onode = list_entry(iter, struct _stream_named_order_node, node);
|
|
name = onode->name;
|
|
if(!(entry = malloc(sizeof(struct _stream_entry_node)))) goto __enomem;
|
|
else if(!(entry->value = strdup(isx->val))) {
|
|
free(entry);
|
|
goto __enomem;
|
|
}
|
|
|
|
list_init_node(&entry->node);
|
|
list_add2tail(list, &entry->node);
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
|
|
__badproto:
|
|
r = SXE_BADPROTO;
|
|
__enomem:
|
|
if(!r) r = SXE_ENOMEM;
|
|
list_for_each_safe(&olist, iter, siter) {
|
|
onode = list_entry(iter, struct _stream_named_order_node, node);
|
|
list_del(iter);
|
|
free(onode);
|
|
}
|
|
|
|
return r;
|
|
}
|
|
|
|
/* entries read ops */
|
|
static list_head_t *__sxstream_entryread(sxstream_t *stream, int _named)
|
|
{
|
|
sxmsg_t *msg;
|
|
size_t mbuf_len;
|
|
char *mbuf = NULL;
|
|
list_node_t *cur = NULL, *iter, *s_iter;
|
|
struct _stream_list_node *listentry = NULL;
|
|
struct _nn_stream_entry_node *entry = NULL;
|
|
sexp_t *sx, *isx, *iisx;
|
|
int r = 0, idx, iidx;
|
|
|
|
errno = SXE_FAILED;
|
|
if(!stream) goto __fail;
|
|
if(!stream->link || !stream->channel) goto __fail;
|
|
if(_named < 0 || _named > 1) goto __fail;
|
|
|
|
if(stream->entries) {
|
|
if(stream->cur == list_node_last(stream->entries)) {
|
|
cur = stream->cur;
|
|
stream->cur = NULL; /* we need to read from remote again on the next operation */
|
|
} else if(stream->cur && stream->cur != list_node_last(stream->entries)) {
|
|
cur = stream->cur;
|
|
if(!list_node_next_isbound(cur))
|
|
stream->cur = stream->cur->next;
|
|
else stream->cur = NULL;
|
|
} else goto __read_seq;
|
|
|
|
__success:
|
|
if(cur) {
|
|
errno = 0;
|
|
listentry = list_entry(cur, struct _stream_list_node, node);
|
|
return listentry->list;
|
|
} else return NULL;
|
|
}
|
|
|
|
__read_seq:
|
|
errno = SXE_ENOMEM;
|
|
|
|
if(stream->entries) { /* ok, clean up all stuff */
|
|
list_for_each_safe(stream->entries, iter, s_iter) {
|
|
listentry = list_entry(iter, struct _stream_list_node, node);
|
|
list_del(iter);
|
|
if(!_named) __free_listentry(listentry);
|
|
else __free_listentrynamed(listentry);
|
|
}
|
|
} else { /* allocate it first time */
|
|
if(!(stream->entries = malloc(sizeof(list_head_t)))) goto __fail;
|
|
|
|
list_init_head(stream->entries);
|
|
}
|
|
|
|
/* prepare a message */
|
|
mbuf_len = strlen(_SXSTREAMEREAD_CMD) + _SXSTREAMEREAD_ADDLEN;
|
|
if(!(mbuf = malloc(mbuf_len))) goto __fail;
|
|
|
|
mbuf_len = snprintf(mbuf, mbuf_len, "(%s %lu)", _SXSTREAMEREAD_CMD, stream->sid);
|
|
r = sxmsg_send(stream->channel, mbuf, mbuf_len, &msg);
|
|
switch(r) {
|
|
case SXE_RAPIDMSG: /* get our data */
|
|
sx = parse_sexp(sxmsg_payload(msg), strlen(sxmsg_payload(msg)));
|
|
sxmsg_clean(msg);
|
|
|
|
if(!sx) {
|
|
__badproto:
|
|
if(sx) destroy_sexp(sx);
|
|
errno = SXE_BADPROTO;
|
|
goto __fail;
|
|
}
|
|
|
|
SEXP_ITERATE_LIST(sx, isx, idx) {
|
|
if(isx->ty != SEXP_LIST) goto __badproto;
|
|
|
|
/* allocate list entry and add it */
|
|
if(!(listentry = __alloc_listentry())) {
|
|
__enomem:
|
|
errno = SXE_ENOMEM;
|
|
destroy_sexp(sx);
|
|
goto __fail;
|
|
} else list_add2tail(stream->entries, &listentry->node);
|
|
|
|
if(!_named) {
|
|
SEXP_ITERATE_LIST(isx, iisx, iidx) {
|
|
if(iisx->ty == SEXP_LIST) goto __badproto;
|
|
if(!(entry = __alloc_entry((const char *)iisx->val))) goto __enomem;
|
|
else list_add2tail(listentry->list, &entry->node);
|
|
}
|
|
} else { /* named stream */
|
|
r = __stream_namedentry_sexp2list(isx, listentry->list, stream->named);
|
|
switch(r) {
|
|
case SXE_BADPROTO: goto __badproto; break;
|
|
case SXE_ENOMEM: goto __enomem; break;
|
|
default: r = 0; break;
|
|
}
|
|
}
|
|
}
|
|
|
|
/* ok let's see what we got */
|
|
if(idx) {
|
|
stream->cur = list_node_first(stream->entries);
|
|
cur = stream->cur;
|
|
|
|
/* point to the next one if exists */
|
|
if(!list_node_next_isbound(cur)) stream->cur = stream->cur->next;
|
|
else stream->cur = NULL;
|
|
} else {
|
|
errno = SXE_NILREPLY;
|
|
stream->cur = NULL;
|
|
cur = NULL;
|
|
}
|
|
|
|
destroy_sexp(sx);
|
|
break;
|
|
case SXE_EOS: /* stream is over */
|
|
errno = SXE_EOS;
|
|
cur = NULL;
|
|
break;
|
|
default:
|
|
errno = r;
|
|
goto __fail;
|
|
break;
|
|
}
|
|
|
|
free(mbuf);
|
|
|
|
goto __success;
|
|
|
|
__fail:
|
|
if(mbuf) free(mbuf);
|
|
return NULL;
|
|
}
|
|
|
|
/* entry nonamed stream ops */
|
|
list_head_t *sxstream_read(sxstream_t *stream)
|
|
{
|
|
return __sxstream_entryread(stream, 0);
|
|
}
|
|
|
|
/* entry named stream ops */
|
|
list_head_t *sxstream_readnamed(sxstream_t *stream)
|
|
{
|
|
return __sxstream_entryread(stream, 1);
|
|
}
|
|
|
|
/* builtin functions for streams */
|
|
static struct sxstream_opened *__alloc_stream(sxlink_t *link)
|
|
{
|
|
int idx;
|
|
struct sxstream_opened *stream;
|
|
|
|
pthread_mutex_lock(&link->idx_streams_lock);
|
|
idx = idx_allocate(&link->idx_streams);
|
|
pthread_mutex_unlock(&link->idx_streams_lock);
|
|
if(idx == IDX_INVAL) return NULL;
|
|
else stream = link->streams[idx];
|
|
|
|
if(!link->streams[idx] && !(stream = malloc(sizeof(struct sxstream_opened)))) {
|
|
__inval:
|
|
pthread_mutex_lock(&link->idx_streams_lock);
|
|
idx_free(&link->idx_streams, idx);
|
|
pthread_mutex_unlock(&link->idx_streams_lock);
|
|
|
|
return NULL;
|
|
} else if(link->streams[idx]) goto __inval;
|
|
|
|
memset(stream, 0, sizeof(struct sxstream_opened));
|
|
stream->sto_id = idx;
|
|
link->streams[idx] = stream;
|
|
|
|
return stream;
|
|
}
|
|
|
|
static void __free_stream(sxlink_t *link, struct sxstream_opened *stream)
|
|
{
|
|
pthread_mutex_lock(&link->idx_streams_lock);
|
|
idx_free(&link->idx_streams, stream->sto_id);
|
|
link->streams[stream->sto_id] = NULL;
|
|
pthread_mutex_unlock(&link->idx_streams_lock);
|
|
|
|
free(stream);
|
|
|
|
return;
|
|
}
|
|
|
|
int _builtin_stream_open(void *m, sexp_t *sx)
|
|
{
|
|
sxmsg_t *msg = (sxmsg_t *)m;
|
|
sxchnl_t *channel = msg->pch;
|
|
sxlink_t *link = NULL;
|
|
sxhub_t *hub;
|
|
sexp_t *isx, *iisx;
|
|
char *rbuf = NULL, *opt = NULL, *s_opt = NULL, *nmliststr = NULL;
|
|
usrtc_node_t *node;
|
|
struct sxstream_description *s_desc;
|
|
struct sxstream_opened *strea;
|
|
size_t rbuf_len;
|
|
int chan_type, idx, r = SXE_FAILED;
|
|
int flags = 0, des_type, iidx;
|
|
|
|
if(!channel) goto __fail;
|
|
link = channel->link;
|
|
chan_type = channel->type_id;
|
|
|
|
if(!link || !channel->rpc_list) goto __fail; /* paranoid test */
|
|
|
|
/* parse incoming builtin function */
|
|
SEXP_ITERATE_LIST(sx, isx, idx) {
|
|
if(!idx && isx->ty == SEXP_LIST) {
|
|
__badproto:
|
|
r = SXE_BADPROTO;
|
|
goto __fail;
|
|
} else if(idx && isx->ty != SEXP_LIST) goto __badproto;
|
|
|
|
if(isx->ty == SEXP_LIST) {
|
|
SEXP_ITERATE_LIST(isx, iisx, iidx) {
|
|
if(iisx->ty == SEXP_LIST) goto __badproto;
|
|
switch(iidx) {
|
|
case 0:
|
|
opt = iisx->val;
|
|
|
|
if(*opt != ':') goto __badproto;
|
|
break;
|
|
case 1:
|
|
if(!strcmp(opt, _SXSTREAMTYPE_PREFIX)) des_type = atoi(iisx->val);
|
|
else if(!strcmp(opt, _SXSTREAMFLAGS_PREFIX)) flags = atoi(iisx->val);
|
|
else if(!strcmp(opt, _SXSTREAMOPT_PREFIX)) s_opt = iisx->val;
|
|
else goto __badproto;
|
|
break;
|
|
default: goto __badproto; break;
|
|
}
|
|
}
|
|
} else if(idx) goto __badproto;
|
|
}
|
|
|
|
/* check availability */
|
|
hub = link->hub;
|
|
if(!hub->streams) {
|
|
__nostream:
|
|
r = SXE_NOSUCHSTREAMTYPE;
|
|
goto __fail;
|
|
}
|
|
if(!(node = usrtc_lookup(hub->streams, &des_type))) goto __nostream;
|
|
else s_desc = (struct sxstream_description *)usrtc_node_getdata(node);
|
|
|
|
if(s_desc->pcid != chan_type) { /* check permission by channel */
|
|
__eperm:
|
|
r = SXE_EPERM;
|
|
goto __fail;
|
|
}
|
|
if(__flag_check(s_desc->flags, flags)) goto __eperm; /* by flags */
|
|
|
|
/* create a new remote side channel */
|
|
if(!(strea = __alloc_stream(link))) {
|
|
r = SXE_ENOMEM;
|
|
goto __fail;
|
|
}
|
|
|
|
/* set stream details */
|
|
strea->desc = s_desc;
|
|
strea->flags = flags;
|
|
strea->pin_channelid = channel->cid;
|
|
|
|
r = s_desc->ops->s_open(link, strea, s_opt);
|
|
if(r != SXE_SUCCESS) {
|
|
/* deallocate stream */
|
|
__free_stream(link, strea);
|
|
goto __fail;
|
|
}
|
|
/* rbuf length */
|
|
rbuf_len = _SXSTREAMCLOSE_ADDLEN + strlen(_SXSTREAMOPEN_CMD_R);
|
|
if(s_desc->type == SXE_O_NAMED) {
|
|
nmliststr = __stream_namedlist_list2sxstr(s_desc->named);
|
|
if(!nmliststr) { r = SXE_FAILED; __free_stream(link, strea); goto __fail; }
|
|
|
|
rbuf_len += strlen(nmliststr) + sizeof(char)*4;
|
|
}
|
|
|
|
rbuf = sxmsg_rapidbuf(msg);
|
|
if(s_desc->type != SXE_O_NAMED)
|
|
rbuf_len = snprintf(rbuf, rbuf_len, "(%s %lu)", _SXSTREAMOPEN_CMD_R,
|
|
strea->sto_id);
|
|
else {
|
|
rbuf_len = snprintf(rbuf, rbuf_len, "(%s %lu %s)", _SXSTREAMOPEN_CMD_R,
|
|
strea->sto_id, nmliststr);
|
|
free(nmliststr);
|
|
}
|
|
|
|
return sxmsg_rreply(msg, rbuf_len + sizeof(char));
|
|
|
|
__fail:
|
|
|
|
return sxmsg_return(msg, r);
|
|
}
|
|
|
|
int _builtin_stream_close(void *m, sexp_t *sx)
|
|
{
|
|
sxmsg_t *msg = (sxmsg_t *)m;
|
|
sxchnl_t *channel = msg->pch;
|
|
sxlink_t *link = NULL;
|
|
struct sxstream_opened *stream = NULL;
|
|
sexp_t *isx;
|
|
uint64_t sid = 0;
|
|
int r = SXE_FAILED, idx;
|
|
|
|
if(!channel) goto __fini;
|
|
link = channel->link;
|
|
|
|
if(!link || !channel->rpc_list) goto __fini; /* paranoid test */
|
|
|
|
/* parse input */
|
|
SEXP_ITERATE_LIST(sx, isx, idx) {
|
|
if(isx->ty == SEXP_LIST) {
|
|
__badproto:
|
|
r = SXE_BADPROTO;
|
|
goto __fini;
|
|
}
|
|
switch(idx) {
|
|
case 0: r = SXE_BADPROTO; break;
|
|
case 1: sid = strtoul(isx->val, NULL, 0); r = SXE_SUCCESS; break;
|
|
default: goto __badproto; break;
|
|
}
|
|
}
|
|
if(r != SXE_SUCCESS) goto __fini;
|
|
|
|
/* get this */
|
|
stream = link->streams[(int)sid];
|
|
if(stream) {
|
|
if(stream->pin_channelid == channel->cid) {
|
|
r = SXE_SUCCESS;
|
|
r = stream->desc->ops->s_close(stream);
|
|
if(r == SXE_SUCCESS) __free_stream(link, stream);
|
|
} else r = SXE_EPERM;
|
|
} else r = SXE_NOSUCHSTREAMTYPE;
|
|
|
|
__fini:
|
|
return sxmsg_return(msg, r);
|
|
}
|
|
|
|
/*
|
|
* this simple function returns length only, i.e.
|
|
* it's going thru the list and lookup for all the
|
|
* required stuff.
|
|
* also it will check up the values.
|
|
*/
|
|
static size_t __entry_order_sexplen(list_head_t *list, struct sxstream_description *desc)
|
|
{
|
|
size_t len = 4*sizeof(char) + strlen(":order");
|
|
list_node_t *iter, *siter;
|
|
struct _stream_entry_node *entry;
|
|
int order;
|
|
|
|
if(!desc->named) goto __fail; /* avoid segfaults */
|
|
|
|
list_for_each_safe(list, iter, siter) {
|
|
entry = container_of(iter, struct _stream_entry_node, node);
|
|
if(!entry->name) goto __fail;
|
|
else order = sxstream_generic_named_lookuporder(desc->named, entry->name);
|
|
|
|
if(order < 0) goto __fail;
|
|
else len += 4*sizeof(char);
|
|
|
|
if(iter != list_node_last(list)) len += sizeof(char);
|
|
}
|
|
|
|
__fail:
|
|
return 0;
|
|
}
|
|
|
|
/*
|
|
* this function hasn't checks for validity of the given list
|
|
* use it with care.
|
|
* if __entry_order_sexplen() was success it's safe to call it and
|
|
* expect a normal data in the given buffer.
|
|
* btw, it will stop if max_len reached to avoid out of the buffer writes.
|
|
*/
|
|
static size_t __entry_order_tosexpstr(list_head_t *list, struct sxstream_description *desc,
|
|
char *buf, size_t max_len)
|
|
{
|
|
size_t dw = 0;
|
|
list_node_t *iter, *siter;
|
|
struct _stream_entry_node *entry;
|
|
int order;
|
|
|
|
dw += snprintf(buf + dw, max_len - dw, "(:order ");
|
|
list_for_each_safe(list, iter, siter) {
|
|
entry = container_of(iter, struct _stream_entry_node, node);
|
|
order = sxstream_generic_named_lookuporder(desc->named, entry->name);
|
|
|
|
if(iter != list_node_last(list))
|
|
dw += snprintf(buf + dw, max_len - dw, "%d ", order);
|
|
else dw += snprintf(buf + dw, max_len - dw, "%d", order);
|
|
}
|
|
|
|
dw += snprintf(buf + dw, max_len - dw, ")");
|
|
|
|
return dw;
|
|
}
|
|
|
|
int _builtin_stream_eread(void *m, sexp_t *sx)
|
|
{
|
|
sxmsg_t *msg = (sxmsg_t *)m;
|
|
sxchnl_t *channel = msg->pch;
|
|
sxlink_t *link = NULL;
|
|
struct sxstream_opened *stream = NULL;
|
|
sexp_t *isx;
|
|
list_node_t *iter, *siter;
|
|
struct _nn_stream_entry_node *entry;
|
|
struct _stream_entry_node *nentry;
|
|
char *rbuf, *tentry;
|
|
uint64_t sid = 0;
|
|
size_t rbuf_len = 0, list_len = 0;
|
|
int r = SXE_FAILED, idx, n;
|
|
|
|
if(!channel) goto __fail;
|
|
link = channel->link;
|
|
|
|
if(!link || !channel->rpc_list) goto __fail; /* paranoid test */
|
|
|
|
/* parse input */
|
|
SEXP_ITERATE_LIST(sx, isx, idx) {
|
|
if(isx->ty == SEXP_LIST) {
|
|
__badproto:
|
|
r = SXE_BADPROTO;
|
|
goto __fail;
|
|
}
|
|
switch(idx) {
|
|
case 0: r = SXE_BADPROTO; break;
|
|
case 1: sid = strtoul(isx->val, NULL, 0); r = SXE_SUCCESS; break;
|
|
default: goto __badproto; break;
|
|
}
|
|
}
|
|
if(r != SXE_SUCCESS) goto __fail;
|
|
|
|
/* get this */
|
|
stream = link->streams[(int)sid];
|
|
if(stream) {
|
|
if(stream->pin_channelid == channel->cid) {
|
|
/* we shouldn't fail here, check for the stream type first */
|
|
if(stream->desc->flags & SXE_O_BINARY) {
|
|
r = SXE_FAILED;
|
|
goto __fail;
|
|
}
|
|
/* last check: permission for read check */
|
|
if(!(stream->flags & SXE_O_READ)) {
|
|
r = SXE_EPERM;
|
|
goto __fail;
|
|
} else rbuf = sxmsg_rapidbuf(msg); idx = 0;
|
|
|
|
/* here we go */
|
|
rbuf_len = snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, "(");
|
|
while(1) {
|
|
if(stream->ent_buf) {
|
|
/* first we need to determine if this list
|
|
* of values will fit to our rapid buffer */
|
|
|
|
if(!stream->desc->type) { /* the case for nonamed streams */
|
|
list_len = 4*sizeof(char);
|
|
list_for_each_safe(stream->ent_buf, iter, siter) {
|
|
entry = container_of(iter, struct _nn_stream_entry_node, node);
|
|
list_len += strlen(entry->value);
|
|
if(iter != list_node_last(stream->ent_buf))
|
|
list_len += sizeof(char);
|
|
}
|
|
} else if(stream->desc->type == SXE_O_NAMED) { /* named entry streams */
|
|
/* each entry contains a small list of value order to keep
|
|
* space for data i.e. instead of send a value names,
|
|
* send order.
|
|
*/
|
|
list_len = 8*sizeof(char) + __entry_order_sexplen(stream->ent_buf, stream->desc);
|
|
if(list_len <= 8*sizeof(char)) {
|
|
r = SXE_FAILED; goto __fail;
|
|
}
|
|
list_for_each_safe(stream->ent_buf, iter, siter) {
|
|
nentry = container_of(iter, struct _stream_entry_node, node);
|
|
list_len += strlen(nentry->value);
|
|
if(iter != list_node_last(stream->ent_buf)) list_len += sizeof(char);
|
|
}
|
|
} else {
|
|
r = SXE_FAILED; goto __fail;
|
|
}
|
|
|
|
/* there are important to check up the buffer capability
|
|
* if it's possible to fit entry - we will fit it, otherwise
|
|
* leave this entry for the further read operation, and try to
|
|
* send what we has already.
|
|
*/
|
|
if((rbuf_len + list_len) >= MAX_RBBUF_LEN) { /* reach the end */
|
|
if(!idx) {
|
|
r = SXE_TOOLONG;
|
|
goto __fail;
|
|
} else break; /* buffer full, ready to send */
|
|
}
|
|
|
|
/* write to rapid buffer */
|
|
rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, "(");
|
|
if(stream->desc->type == SXE_O_NAMED) /* write order list info */
|
|
rbuf_len += __entry_order_tosexpstr(stream->ent_buf, stream->desc, rbuf + rbuf_len,
|
|
MAX_RBBUF_LEN - rbuf_len);
|
|
|
|
/* write entry data */
|
|
list_for_each_safe(stream->ent_buf, iter, siter) {
|
|
if(stream->desc->type == SXE_O_NAMED) {
|
|
nentry = container_of(iter, struct _stream_entry_node, node);
|
|
tentry = nentry->value;
|
|
} else {
|
|
entry = container_of(iter, struct _nn_stream_entry_node, node);
|
|
tentry = entry->value;
|
|
}
|
|
if(iter == list_node_last(stream->ent_buf))
|
|
rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, "%s",
|
|
tentry);
|
|
else
|
|
rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, "%s ",
|
|
tentry);
|
|
}
|
|
rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, ")");
|
|
idx++;
|
|
}
|
|
|
|
/* read next */
|
|
n = stream->desc->ops->s_eread(stream, 1, 0, &stream->ent_buf);
|
|
if(n <= 0) {
|
|
if(!idx) {
|
|
r = SXE_EOS;
|
|
goto __fail;
|
|
} else break;
|
|
}
|
|
}
|
|
rbuf_len += snprintf(rbuf + rbuf_len, MAX_RBBUF_LEN - rbuf_len, ")");
|
|
|
|
return sxmsg_rreply(msg, rbuf_len + sizeof(char));
|
|
} else r = SXE_EPERM;
|
|
} else r = SXE_NOSUCHSTREAMTYPE;
|
|
|
|
__fail:
|
|
return sxmsg_return(msg, r);
|
|
}
|
|
|
|
int _builtin_stream_bread(void *m, sexp_t *sx)
|
|
{
|
|
sxmsg_t *msg = (sxmsg_t *)m;
|
|
sxchnl_t *channel = msg->pch;
|
|
sxlink_t *link = NULL;
|
|
struct sxstream_opened *stream = NULL;
|
|
sexp_t *isx;
|
|
void *rbuf;
|
|
uint64_t sid = 0, offset;
|
|
size_t rdlen;
|
|
int r = SXE_FAILED, idx;
|
|
|
|
if(!channel) goto __fail;
|
|
link = channel->link;
|
|
|
|
if(!link || !channel->rpc_list) goto __fail; /* paranoid test */
|
|
|
|
/* parse input */
|
|
SEXP_ITERATE_LIST(sx, isx, idx) {
|
|
if(isx->ty == SEXP_LIST) {
|
|
__badproto:
|
|
r = SXE_BADPROTO;
|
|
goto __fail;
|
|
}
|
|
switch(idx) {
|
|
case 0: r = SXE_BADPROTO; break;
|
|
case 1: sid = strtoul(isx->val, NULL, 0); break;
|
|
case 2: offset = strtoul(isx->val, NULL, 0); break;
|
|
case 3: rdlen = strtoul(isx->val, NULL, 0); r = SXE_SUCCESS; break;
|
|
default: goto __badproto; break;
|
|
}
|
|
}
|
|
if(r != SXE_SUCCESS) goto __fail;
|
|
if(!rdlen) { /* in case of reading zero bytes - we will send nilreply error */
|
|
r = SXE_NILREPLY;
|
|
goto __fail;
|
|
}
|
|
if(rdlen >= MAX_RBBUF_LEN) rdlen = MAX_RBBUF_LEN - 1; /* check up for out of buffer reading */
|
|
|
|
/* check out if stream exists */
|
|
if(!(stream = link->streams[(int)sid])) {
|
|
r = SXE_NOSUCHSTREAMTYPE;
|
|
goto __fail;
|
|
}
|
|
|
|
/* check up if we have permission to do it */
|
|
if((stream->pin_channelid != channel->cid) || !(stream->flags & SXE_O_READ)) {
|
|
r = SXE_EPERM;
|
|
goto __fail;
|
|
}
|
|
|
|
/* check if the givn stream has binary capability */
|
|
if(stream->desc->type != SXE_O_BINARY) {
|
|
r = SXE_FAILED;
|
|
goto __fail;
|
|
}
|
|
|
|
rbuf = sxmsg_rapidbuf(msg);
|
|
|
|
rdlen = stream->desc->ops->s_bread(stream, rdlen, offset, rbuf);
|
|
if(!rdlen) {
|
|
r = SXE_EOS;
|
|
goto __fail;
|
|
} if(rdlen < 0) { /* nil reply */
|
|
r = SXE_NILREPLY;
|
|
goto __fail;
|
|
} else return sxmsg_rreply(msg, rdlen);
|
|
|
|
__fail:
|
|
return sxmsg_return(msg, r);
|
|
}
|
|
|
|
int _builtin_stream_bwrite(void *m, sexp_t *sx)
|
|
{
|
|
int r = SXE_FAILED;
|
|
sxmsg_t *msg = (sxmsg_t *)m;
|
|
|
|
return sxmsg_return(msg, r);
|
|
}
|