From 1cb8d3ad1597a176644c00b8e6bc567faa80ae59 Mon Sep 17 00:00:00 2001 From: Alexander Vdolainen Date: Wed, 17 Feb 2016 02:11:59 +0200 Subject: [PATCH] A few changes: a) some cleanup and renaming for better read, b) added initial streams stuff, c) builtin functions extension workaround, d) minor fixes; --- include/sxmp/errno.h | 1 + include/sxmp/sxmp.h | 10 +- lib/Makefile.am | 4 +- lib/error.c | 73 ++++++++ lib/{link.c => hub.c} | 72 ++++---- lib/internal.h | 21 +++ lib/message.c | 7 +- lib/stream.c | 393 +++++++++++++++++++++++++++++++++++++++++- lib/sxmplv2.c | 17 +- 9 files changed, 550 insertions(+), 48 deletions(-) create mode 100644 lib/error.c rename lib/{link.c => hub.c} (93%) diff --git a/include/sxmp/errno.h b/include/sxmp/errno.h index 8bd61ea..b7796c6 100644 --- a/include/sxmp/errno.h +++ b/include/sxmp/errno.h @@ -52,6 +52,7 @@ #define SXE_INVALINDEX 221 #define SXE_NOSUCHSTREAMTYPE 222 #define SXE_EOS 223 +#define SXE_NILREPLY 224 const char *sxmpl_errno2cstr(int); diff --git a/include/sxmp/sxmp.h b/include/sxmp/sxmp.h index 54a7948..948908e 100644 --- a/include/sxmp/sxmp.h +++ b/include/sxmp/sxmp.h @@ -220,6 +220,7 @@ typedef struct __sxhub_type { pthread_rwlock_t rwlock; char *rootca, *certpem, *certkey; /* path name to the certificates */ sxl_rpclist_t *system_rpc; + sxl_rpclist_t *stream_rpc; /* special functions pointers */ int (*validate_sslpem)(sxlink_t *); /** < this function used to validate SSL certificate while SSL handshake */ int (*secure_check)(sxlink_t *); /** < this function authorize user to login, @@ -287,17 +288,22 @@ typedef struct __sxtream_type { list_head_t *named; } sxstream_t; -struct _nn_steam_entry_node { +struct _nn_stream_entry_node { char *value; list_node_t node; }; -struct _steam_entry_node { +struct _stream_entry_node { char *name; char *value; list_node_t node; }; +struct _stream_list_node { + list_head_t *list; + list_node_t node; +}; + #ifdef __cplusplus extern "C" { #endif diff --git a/lib/Makefile.am b/lib/Makefile.am index c0d2b0c..5424fdd 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -14,8 +14,8 @@ lib_LTLIBRARIES = libsxmp.la libsxmp_la_SOURCES = \ - sxmplv2.c link.c channel.c message.c rpc.c \ - uuid.c stream.c + sxmplv2.c hub.c channel.c message.c rpc.c \ + uuid.c stream.c error.c libsxmp_la_LDFLAGS = diff --git a/lib/error.c b/lib/error.c new file mode 100644 index 0000000..9f165b4 --- /dev/null +++ b/lib/error.c @@ -0,0 +1,73 @@ +/* + * Secure X Message Passing Library v2 implementation. + * (sxmplv2) it superseed all versions before due to the: + * - memory consumption + * - new features such as pulse emitting + * - performance optimization + * + * (c) Askele Group 2013-2015 + * (c) Alexander Vdolainen 2013-2015 + * + * libsxmp is free software: you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * libsxmp is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see ."; + * + */ + +#include +#include +#include +#include +#include + +#include +#include + +/* errors */ +struct __scerrcode { + int code; + const char *desc; +}; + +static struct __scerrcode __lerr[] = { + {SXE_SUCCESS, "Success"}, + {SXE_FAILED, "Failed, invalid parameters given"}, + {SXE_ENOMEM, "Not enough memory"}, + {SXE_BADPROTO, "Bad protocol"}, + {SXE_ENORPC, "No such RPC exists"}, + {SXE_EPERM, "Permission denied"}, + {SXE_TOOLONG, "Message data payload too long to be sent with one message pass"}, + {SXE_EBUSY, "Index or working threads are busy"}, + {SXE_WOULDBLOCK, "Call will block operation"}, + {SXE_LINKERROR, "Connection link error"}, + {SXE_NOSUCHMSG, "No such message"}, + {SXE_NOSUCHCHAN, "No such channel"}, + {SXE_ETIMEDOUT, "Timeout exceed"}, + {SXE_IGNORED, "Function call was ignored"}, + {SXE_REPLYREQ, "Reply required to the message"}, + {SXE_RAPIDMSG, "Message is a rapid reply and dialog closed"}, + {SXE_ESSL, "SSL error occurs on connection link"}, + {SXE_NOCHANNELS, "No channels available"}, + {SXE_MCHANNELS, "Active channels limit exceed"}, + {SXE_MMESSAGES, "Active messages limit exceed"}, + {SXE_LINKBROKEN, "Connection link was broken"}, + {SXE_INVALINDEX, "Invalid index given"}, + {SXE_NOSUCHSTREAMTYPE, "No such stream type exist"}, + {SXE_EOS, "End of stream reached"}, + {SXE_NILREPLY, "Reply contains no data"}, +}; + +const char *sxmpl_errno2cstr(int ec) +{ + return __lerr[ec - __SXMP_EPREFIX].desc; +} + diff --git a/lib/link.c b/lib/hub.c similarity index 93% rename from lib/link.c rename to lib/hub.c index 8f9d323..4f6b4f5 100644 --- a/lib/link.c +++ b/lib/hub.c @@ -53,6 +53,8 @@ #include #include +#include "internal.h" + static int __insert_rpc_function(usrtc_t *tree, const char *name, int (*rpcf)(void *, sexp_t *)) { sxl_rpc_t *ent = malloc(sizeof(sxl_rpc_t)); @@ -501,6 +503,20 @@ static int __init_systemrpc_tree(usrtc_t *rtree) return ENOMEM; } +static int __init_builtinrpc_tree(usrtc_t *rtree) +{ + /* streams */ + if(__insert_rpc_function(rtree, _SXSTREAMOPEN_CMD, _builtin_stream_open)) goto __fail; + if(__insert_rpc_function(rtree, _SXSTREAMCLOSE_CMD, _builtin_stream_close)) goto __fail; + if(__insert_rpc_function(rtree, _SXSTREAMEREAD_CMD, _builtin_stream_eread)) goto __fail; + + return 0; + + __fail: + __destroy_rpc_list_tree(rtree); + return ENOMEM; +} + static long __cmp_cstr(const void *a, const void *b) { return (long)strcmp((const char *)a, (const char *)b); @@ -538,10 +554,28 @@ int sxhub_init(sxhub_t *ssys) } } + /* init builtin functions list */ + if(!(ssys->stream_rpc = malloc(sizeof(sxl_rpclist_t)))) { + r = ENOMEM; + goto __lfini; + } else { + if(!(ssys->stream_rpc->rpc_tree = malloc(sizeof(usrtc_t)))) { + r = ENOMEM; + goto __lfini; + } + usrtc_init(ssys->stream_rpc->rpc_tree, USRTC_SPLAY, 256, __cmp_cstr); + r = __init_builtinrpc_tree(ssys->stream_rpc->rpc_tree); + if(r) { + free(ssys->stream_rpc->rpc_tree); + goto __lfini; + } + } + return 0; __lfini: if(ssys->system_rpc) free(ssys->system_rpc); + if(ssys->stream_rpc) free(ssys->stream_rpc); pthread_rwlock_destroy(&(ssys->rwlock)); __fini: if(ssys->links) free(ssys->links); @@ -643,41 +677,3 @@ int sxhub_stream_register(sxhub_t *hub, const struct sxstream_description *s_des return 0; } -/* errors */ - -struct __scerrcode { - int code; - const char *desc; -}; - -static struct __scerrcode __lerr[] = { - {SXE_SUCCESS, "Success"}, - {SXE_FAILED, "Failed, invalid parameters given"}, - {SXE_ENOMEM, "Not enough memory"}, - {SXE_BADPROTO, "Bad protocol"}, - {SXE_ENORPC, "No such RPC exists"}, - {SXE_EPERM, "Permission denied"}, - {SXE_TOOLONG, "Message data payload too long to be sent with one message pass"}, - {SXE_EBUSY, "Index or working threads are busy"}, - {SXE_WOULDBLOCK, "Call will block operation"}, - {SXE_LINKERROR, "Connection link error"}, - {SXE_NOSUCHMSG, "No such message"}, - {SXE_NOSUCHCHAN, "No such channel"}, - {SXE_ETIMEDOUT, "Timeout exceed"}, - {SXE_IGNORED, "Function call was ignored"}, - {SXE_REPLYREQ, "Reply required to the message"}, - {SXE_RAPIDMSG, "Message is a rapid reply and dialog closed"}, - {SXE_ESSL, "SSL error occurs on connection link"}, - {SXE_NOCHANNELS, "No channels available"}, - {SXE_MCHANNELS, "Active channels limit exceed"}, - {SXE_MMESSAGES, "Active messages limit exceed"}, - {SXE_LINKBROKEN, "Connection link was broken"}, - {SXE_INVALINDEX, "Invalid index given"}, - {SXE_NOSUCHSTREAMTYPE, "No such stream type exist"}, - {SXE_EOS, "End of stream reached"}, -}; - -const char *sxmpl_errno2cstr(int ec) -{ - return __lerr[ec - __SXMP_EPREFIX].desc; -} diff --git a/lib/internal.h b/lib/internal.h index 336b053..d981b1b 100644 --- a/lib/internal.h +++ b/lib/internal.h @@ -20,6 +20,22 @@ #ifndef __SXMPL_INTERNAL_H__ #define __SXMPL_INTERNAL_H__ +/* builtin functions protocol description */ +#define _SXSTREAMOPEN_ADDMINLEN 18 +#define _SXSTREAMOPEN_ADDMAXLEN 24 +#define _SXSTREAMCLOSE_ADDLEN 12 +#define _SXSTREAMEREAD_ADDLEN 4 + +#define _SXSTREAMOPEN_CMD "!#s>" +#define _SXSTREAMOPEN_CMD_R "!#s<" +#define _SXSTREAMTYPE_PREFIX ":stid" +#define _SXSTREAMFLAGS_PREFIX ":flags" +#define _SXSTREAMOPT_PREFIX ":opt" + +#define _SXSTREAMCLOSE_CMD "!#-s>" + +#define _SXSTREAMEREAD_CMD "!#ers>" + /* link related */ int _sxmpl_writemsg(sxlink_t *co, sxmsg_t *msg); @@ -30,4 +46,9 @@ uint8_t _channel_close(sxlink_t *co, uint16_t chid); /* messages */ void _message_process(sxmsg_t *msg); +/* builtin RPC functions */ +int _builtin_stream_open(void *m, sexp_t *sx); +int _builtin_stream_close(void *m, sexp_t *sx); +int _builtin_stream_eread(void *m, sexp_t *sx); + #endif /* __SXMPL_INTERNAL_H__ */ diff --git a/lib/message.c b/lib/message.c index c2fa2f7..4c96350 100644 --- a/lib/message.c +++ b/lib/message.c @@ -46,8 +46,9 @@ void _message_process(sxmsg_t *msg) { sxchnl_t *chan = msg->pch; + sxhub_t *hub = chan->link->ssys; sexp_t *sx, *isx; - usrtc_t *listrpc = chan->rpc_list->rpc_tree; + usrtc_t *listrpc; usrtc_node_t *node; sxl_rpc_t *rpcc; int r; @@ -60,6 +61,10 @@ void _message_process(sxmsg_t *msg) if(isx->ty == SEXP_LIST) { r = SXE_BADPROTO; goto __return_err; } if(isx->aty != SEXP_BASIC) { r = SXE_BADPROTO; goto __return_err; } + /* in case of builtin RPC change rpclist */ + if(*(isx->val) == '!') listrpc = hub->stream_rpc->rpc_tree; + else listrpc = chan->rpc_list->rpc_tree; /* non builtin */ + node = usrtc_lookup(listrpc, (void *)isx->val); if(!node) { r = SXE_ENORPC; goto __return_err; } else rpcc = (sxl_rpc_t *)usrtc_node_getdata(node); diff --git a/lib/stream.c b/lib/stream.c index b308ef0..2e42b8e 100644 --- a/lib/stream.c +++ b/lib/stream.c @@ -47,6 +47,111 @@ #include "internal.h" +static void __free_listentry(struct _stream_list_node *listentry); + +static int __flag_check(int s_flag, int d_flag) +{ + int i; + + for(i = 1; i < 7; i++) + if((d_flag & (1 << i)) && !(s_flag & (1 << i))) return 1; + + return 0; +} + +static sxstream_t *__stream_open(sxlink_t *link, sxchnl_t *channel, const char *opt, + struct sxstream_description *desc, int stid, int flags) +{ + char *msgbuf = NULL; + size_t mlen = 0; + sxstream_t *ss = NULL; + sxmsg_t *msg = NULL; + char *buf = NULL; + sexp_t *sx, *isx; + uint64_t sid = 0; + int r = 0, idx; + + /* check flags */ + if(__flag_check(desc->flags, flags)) { + errno = SXE_EPERM; + goto __fini; + } + + /* time to alloc all stuff */ + mlen = strlen(_SXSTREAMOPEN_CMD) + strlen(_SXSTREAMTYPE_PREFIX) + strlen(_SXSTREAMFLAGS_PREFIX); + if(!opt) + mlen += (_SXSTREAMOPEN_ADDMINLEN + sizeof(char)); + else mlen += (_SXSTREAMOPEN_ADDMAXLEN + sizeof(char)) + strlen(opt); + + if(!(msgbuf = malloc(mlen*sizeof(char)))) { + errno = SXE_ENOMEM; + goto __fini; + } + + if(!opt) + mlen = snprintf(msgbuf, mlen, "(%s (%s %d)(%s %d))", _SXSTREAMOPEN_CMD, + _SXSTREAMTYPE_PREFIX, stid, _SXSTREAMFLAGS_PREFIX, flags); + else + mlen = snprintf(msgbuf, mlen, "(%s (%s %d)(%s %d)(%s \"%s\"))", _SXSTREAMOPEN_CMD, + _SXSTREAMTYPE_PREFIX, stid, _SXSTREAMFLAGS_PREFIX, flags, + _SXSTREAMOPT_PREFIX, opt); + + r = sxmsg_send(channel, msgbuf, mlen + sizeof(char), &msg); + if(r != SXE_RAPIDMSG) { + errno = r; + goto __fini; + } + + /* time to get the special id, put it to the link and clean up all */ + buf = sxmsg_rapidbuf(msg); + if(!(sx = parse_sexp(buf, strlen(buf)))) { + errno = SXE_ENOMEM; + sxmsg_clean(msg); + goto __fini; + } else sxmsg_clean(msg); + + SEXP_ITERATE_LIST(sx, isx, idx) { + if(isx->ty == SEXP_LIST) r = SXE_BADPROTO; + + if(!idx && strcmp(isx->val, _SXSTREAMOPEN_CMD_R)) r = SXE_BADPROTO; + if(idx && idx < 2) sid = strtoul(isx->val, NULL, 0); + if(idx >= 2) r = SXE_BADPROTO; + } + + destroy_sexp(sx); + + if(!r) { + if(!(ss = malloc(sizeof(sxstream_t)))) { + errno = SXE_ENOMEM; + goto __fini; + } + + memset(ss, 0, sizeof(sxstream_t)); + ss->flags = flags; + ss->sid = sid; + ss->link = link; + ss->channel = channel; + } else errno = r; + + __fini: + if(msgbuf) free(msgbuf); + + return ss; +} + +static struct sxstream_description *__get_stream_description(sxlink_t *link, int stid) +{ + usrtc_node_t *node = NULL; + struct sxstream_description *s_desc = NULL; + + if(!link->remote_streams) return NULL; + else node = usrtc_lookup(link->remote_streams, &stid); + + if(node) s_desc = (struct sxstream_description *)usrtc_node_getdata(node); + + return s_desc; +} + /* * NOTE: sxstream operations (except opening process i.e. few threads * can open different streams) are NOT _thread-safe_, please @@ -54,18 +159,107 @@ */ sxstream_t *sxstream_open(sxlink_t *link, const char *opt, int stid, int flags) { - return NULL; + sxchnl_t *channel = NULL; + struct sxstream_description *s_desc; + sxstream_t *ss = NULL; + int r = 0; + + if(!link) { + r = SXE_FAILED; + goto __fini; + } + if(!link->remote_streams) { + r = SXE_NOSUCHSTREAMTYPE; + goto __fini; + } + + /* ok, let's a deal with channel */ + if(!(s_desc = __get_stream_description(link, stid))) { + r = SXE_NOSUCHSTREAMTYPE; + goto __fini; + } else channel = sxchannel_open(link, s_desc->pcid); + + if(!channel) { + r = errno; + goto __fini; + } + + flags |= SXE_O_OCHANNELED; + ss = __stream_open(link, channel, opt, s_desc, stid, flags); + + __fini: + errno = r; + if(!ss && channel) sxchannel_close(channel); + + return ss; } sxstream_t *sxstream_openwch(sxlink_t *link, sxchnl_t *channel, const char *opt, int stid, int flags) { - return NULL; + struct sxstream_description *s_desc = NULL; + sxstream_t *ss = NULL; + int r = 0; + + if(!link || !channel) { + r = SXE_FAILED; + goto __fini; + } + if(!link->remote_streams) { + r = SXE_NOSUCHSTREAMTYPE; + goto __fini; + } + + /* get description */ + if(!(s_desc = __get_stream_description(link, stid))) { + r = SXE_NOSUCHSTREAMTYPE; + goto __fini; + } + + ss = __stream_open(link, channel, opt, s_desc, stid, flags); + + __fini: + errno = r; + return ss; } int sxstream_close(sxstream_t *stream) { - return SXE_SUCCESS; + sxmsg_t *msg = NULL; + char *mbuf; + list_node_t *iter, *s_iter; + struct _stream_list_node *listentry; + size_t mbuf_len = strlen(_SXSTREAMCLOSE_CMD) + _SXSTREAMCLOSE_ADDLEN; + int r = 0; + + if(!stream) return SXE_FAILED; + if(!stream->link || !stream->channel) return SXE_FAILED; + if(!(mbuf = malloc(mbuf_len))) return SXE_ENOMEM; + else mbuf_len = snprintf(mbuf, mbuf_len, "(%s %lu)", _SXSTREAMCLOSE_CMD, stream->sid); + + r = sxmsg_send(stream->channel, mbuf, mbuf_len + sizeof(char), &msg); + if(r == SXE_RAPIDMSG) sxmsg_clean(msg); + + if(r == SXE_SUCCESS) { + /* TODO: free all caches, buffers etc */ + if(!(stream->flags & SXE_O_BINARY)) { + if(!(stream->flags & SXE_O_NAMED) && stream->entries) { + list_for_each_safe(stream->entries, iter, s_iter) { + listentry = list_entry(iter, struct _stream_list_node, node); + list_del(iter); + __free_listentry(listentry); + } + } + } + if(stream->flags & SXE_O_OCHANNELED) + r = sxchannel_close(stream->channel); + + free(stream); + } + + free(mbuf); + + return r; } /* binary stream operations */ @@ -100,9 +294,180 @@ size_t sxstream_bsyncat(sxstream_t *stream, void *buf, size_t buf_len, off_t off * That means you should care about ridden values, since * it might be freed on the next read operation. */ +static struct _nn_stream_entry_node *__alloc_entry(const char *value) +{ + struct _nn_stream_entry_node *o = malloc(sizeof(struct _nn_stream_entry_node)); + + if(!o) return NULL; + if(!(o->value = strdup(value))) { + free(o); + return NULL; + } + + list_init_node(&o->node); + + return o; +} + +static void __free_entry(struct _nn_stream_entry_node *entry) +{ + free(entry->value); + free(entry); + + return; +} + +static struct _stream_list_node *__alloc_listentry(void) +{ + struct _stream_list_node *o = malloc(sizeof(struct _stream_list_node)); + + if(!o) return NULL; + if(!(o->list = malloc(sizeof(list_head_t)))) { + free(o); + return NULL; + } + + list_init_head(o->list); + list_init_node(&o->node); + + return o; +} + +static void __free_listentry(struct _stream_list_node *listentry) +{ + struct _nn_stream_entry_node *entry; + list_node_t *iter, *s_iter; + + list_for_each_safe(listentry->list, iter, s_iter) { + entry = list_entry(iter, struct _nn_stream_entry_node, node); + list_del(iter); + __free_entry(entry); + } + + free(listentry->list); + free(listentry); + + return; +} + /* entry nonamed stream ops */ list_head_t *sxstream_read(sxstream_t *stream) { + sxmsg_t *msg; + size_t mbuf_len; + char *mbuf = NULL; + list_node_t *cur = NULL, *iter, *s_iter; + struct _stream_list_node *listentry = NULL; + struct _nn_stream_entry_node *entry = NULL; + sexp_t *sx, *isx, *iisx; + int r = 0, idx, iidx; + + errno = SXE_FAILED; + if(!stream) goto __fail; + if(!stream->link || !stream->channel) goto __fail; + + if(stream->entries) { + if(stream->cur == list_node_last(stream->entries)) { + cur = stream->cur; + stream->cur = NULL; /* we need to read from remote again on the next operation */ + } else if(stream->cur && stream->cur != list_node_last(stream->entries)) { + cur = stream->cur; + if(!list_node_next_isbound(cur)) + stream->cur = stream->cur->next; + else stream->cur = NULL; + } else goto __read_seq; + + __success: + if(cur) { + errno = 0; + listentry = list_entry(cur, struct _stream_list_node, node); + return listentry->list; + } else return NULL; + } + + __read_seq: + errno = SXE_ENOMEM; + + if(stream->entries) { /* ok, clean up all stuff */ + list_for_each_safe(stream->entries, iter, s_iter) { + listentry = list_entry(iter, struct _stream_list_node, node); + list_del(iter); + __free_listentry(listentry); + } + } else { /* allocate it first time */ + if(!(stream->entries = malloc(sizeof(list_head_t)))) goto __fail; + + list_init_head(stream->entries); + } + + /* prepare a message */ + mbuf_len = strlen(_SXSTREAMEREAD_CMD) + _SXSTREAMEREAD_ADDLEN; + if(!(mbuf = malloc(mbuf_len))) goto __fail; + + mbuf_len = snprintf(mbuf, mbuf_len, "(%s)", _SXSTREAMEREAD_CMD); + r = sxmsg_send(stream->channel, mbuf, mbuf_len, &msg); + switch(r) { + case SXE_RAPIDMSG: /* get our data */ + sx = parse_sexp(sxmsg_payload(msg), strlen(sxmsg_payload(msg))); + sxmsg_clean(msg); + + if(!sx) { + __badproto: + if(sx) destroy_sexp(sx); + errno = SXE_BADPROTO; + goto __fail; + } + + SEXP_ITERATE_LIST(sx, isx, idx) { + if(isx->ty != SEXP_LIST) goto __badproto; + + /* allocate list entry and add it */ + if(!(listentry = __alloc_listentry())) { + __enomem: + errno = SXE_ENOMEM; + destroy_sexp(sx); + goto __fail; + } else list_add2tail(stream->entries, &listentry->node); + + SEXP_ITERATE_LIST(isx, iisx, iidx) { + if(isx->ty == SEXP_LIST) goto __badproto; + if(!(entry = __alloc_entry((const char *)iisx->val))) goto __enomem; + else list_add2tail(listentry->list, &entry->node); + } + } + + /* ok let's see what we got */ + if(idx) { + stream->cur = list_node_first(stream->entries); + cur = stream->cur; + + /* point to the next one if exists */ + if(!list_node_next_isbound(cur)) stream->cur = stream->cur->next; + else stream->cur = NULL; + } else { + errno = SXE_NILREPLY; + stream->cur = NULL; + cur = NULL; + } + + destroy_sexp(sx); + break; + case SXE_EOS: /* stream is over */ + errno = SXE_EOS; + cur = NULL; + break; + default: + errno = r; + goto __fail; + break; + } + + free(mbuf); + + goto __success; + + __fail: + if(mbuf) free(mbuf); return NULL; } @@ -112,3 +477,25 @@ list_head_t *sxstream_readnamed(sxstream_t *stream) return NULL; } +/* builtin functions for streams */ +int _builtin_stream_open(void *m, sexp_t *sx) +{ + sxmsg_t *msg = (sxmsg_t *)m; + + return sxmsg_return(msg, SXE_FAILED); +} + +int _builtin_stream_close(void *m, sexp_t *sx) +{ + sxmsg_t *msg = (sxmsg_t *)m; + + return sxmsg_return(msg, SXE_FAILED); +} + +int _builtin_stream_eread(void *m, sexp_t *sx) +{ + sxmsg_t *msg = (sxmsg_t *)m; + + return sxmsg_return(msg, SXE_FAILED); +} + diff --git a/lib/sxmplv2.c b/lib/sxmplv2.c index 41c5cd9..35c0556 100644 --- a/lib/sxmplv2.c +++ b/lib/sxmplv2.c @@ -505,13 +505,16 @@ static void __link_minimal_free(sxlink_t *co) return; } -static int __eval_syssexp(sxlink_t *co, sexp_t *sx) +static int __eval_sysrpc(sxlink_t *co, sexp_t *sx, int builtin) { - sxl_rpclist_t *rpc_list = co->ssys->system_rpc; + sxl_rpclist_t *rpc_list; usrtc_node_t *node; sxl_rpc_t *rentry; char *rpcf; + if(builtin) rpc_list = co->ssys->stream_rpc; + else rpc_list = co->ssys->system_rpc; + if(sx->ty == SEXP_LIST) rpcf = sx->list->val; else return SXE_BADPROTO; @@ -528,6 +531,16 @@ static int __eval_syssexp(sxlink_t *co, sexp_t *sx) return rentry->rpcf((void *)co, sx); } +static inline int __eval_syssexp(sxlink_t *co, sexp_t *sx) +{ + return __eval_sysrpc(co, sx, 0); +} + +static inline int __eval_builtinsexp(sxlink_t *co, sexp_t *sx) +{ + return __eval_sysrpc(co, sx, 1); +} + #ifdef _NO_SXMPMP #define _CONN_INUSE(co) (co)->usecount++; #define _CONN_NOTINUSE(co) (co)->usecount--;