From e36f738f37444cc5a653acde474201ee7f2b1fb6 Mon Sep 17 00:00:00 2001 From: Alexander Vdolainen Date: Wed, 2 Mar 2016 03:21:32 +0200 Subject: [PATCH] [core] builtin blob stream read added, stub for write added also; --- include/Makefile.am | 3 +- include/sxmp/base64.h | 33 ++++++++ lib/Makefile.am | 3 +- lib/base64.c | 110 ++++++++++++++++++++++++++ lib/hub.c | 2 + lib/internal.h | 5 ++ lib/stream.c | 180 +++++++++++++++++++++++++++++++++++++++++- 7 files changed, 330 insertions(+), 6 deletions(-) create mode 100644 include/sxmp/base64.h create mode 100644 lib/base64.c diff --git a/include/Makefile.am b/include/Makefile.am index 77e2bad..2b23563 100644 --- a/include/Makefile.am +++ b/include/Makefile.am @@ -1 +1,2 @@ -nobase_include_HEADERS = sxmp/sxmp.h sxmp/errno.h sxmp/limits.h sxmp/version.h +nobase_include_HEADERS = sxmp/sxmp.h sxmp/errno.h sxmp/limits.h sxmp/version.h \ + sxmp/base64.h diff --git a/include/sxmp/base64.h b/include/sxmp/base64.h new file mode 100644 index 0000000..7a34d79 --- /dev/null +++ b/include/sxmp/base64.h @@ -0,0 +1,33 @@ +/* + * Base64 encode/decode functions used in sxmp + * openssl stuff wasn't used to avoid additional memleaks and allocation, + * deallocation of BIO. + * + * (c) Alexander Vdolainen 2016 + * + * libsxmp is free software: you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * libsxmp is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see ."; + */ + +#ifndef __SXMP_BASE64_H__ +#define __SXMP_BASE64_H__ + +size_t sx_rawlen2b64len(size_t raw_length); + +size_t sx_b64encode_in(const char *data, char *bdata, size_t data_len); + +char *sx_b64encode(const char *data, size_t data_len); + +size_t sx_b64decode_in(const char *idata, size_t idata_len, char *data, size_t data_len); + +#endif /* __SXMP_BASE64_H__ */ diff --git a/lib/Makefile.am b/lib/Makefile.am index f3fe7ea..6c1c204 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -15,7 +15,8 @@ lib_LTLIBRARIES = libsxmp.la libsxmp_la_SOURCES = \ sxmplv2.c hub.c channel.c message.c rpc.c \ - uuid.c stream_generic_listops.c stream.c error.c + uuid.c stream_generic_listops.c stream.c \ + base64.c error.c libsxmp_la_LDFLAGS = diff --git a/lib/base64.c b/lib/base64.c new file mode 100644 index 0000000..e29a765 --- /dev/null +++ b/lib/base64.c @@ -0,0 +1,110 @@ +/* + * Base64 encode/decode functions used in sxmp + * openssl stuff wasn't used to avoid additional memleaks and allocation, + * deallocation of BIO. + * + * (c) Alexander Vdolainen 2016 + * + * libsxmp is free software: you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * libsxmp is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see ."; + */ + +#include +#include +#include + +#include + +static const char b64[]="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + +static inline void encodeblock(unsigned char *in, unsigned char *out, int len) +{ + out[0] = (unsigned char) b64[ (int)(in[0] >> 2) ]; + out[1] = (unsigned char) b64[ (int)(((in[0] & 0x03) << 4) | ((in[1] & 0xf0) >> 4)) ]; + out[2] = (unsigned char) (len > 1 ? b64[ (int)(((in[1] & 0x0f) << 2) | ((in[2] & 0xc0) >> 6)) ] : '='); + out[3] = (unsigned char) (len > 2 ? b64[ (int)(in[2] & 0x3f) ] : '='); +} + +static inline void decodeblock(unsigned char *in, unsigned char *out) +{ + out[0] = (unsigned char) (in[0] << 2 | in[1] >> 4); + out[1] = (unsigned char) (in[1] << 4 | in[2] >> 2); + out[2] = (unsigned char) (((in[2] << 6) & 0xc0) | in[3]); +} + +size_t sx_rawlen2b64len(size_t raw_length) +{ + return 4 * ((raw_length + 2) / 3); +} + +size_t sx_b64encode_in(const char *data, char *bdata, size_t data_len) +{ + size_t bdata_len = 0, c = 0, n = 0; + int len = 0, nil = 0, i; + unsigned char ib[4]; + + if(!bdata) return -1; + if(!data || !data_len) return -1; + else bdata_len = sx_rawlen2b64len(data_len); + + while(n != bdata_len) { + if((c + 3) <= data_len) { + len = 3; + encodeblock((unsigned char *)data + c, (unsigned char *)bdata + n, len); + n += 4; c += 3; + } else { + nil = (c + 3) - data_len; + len = 0; + for(i = 0; i < 3; i++) { + if(i < nil) { + ib[i] = *((unsigned char *)data + (c + i)); + len++; + } else ib[i] = (unsigned char)0; + } + encodeblock(ib, (unsigned char *)bdata + n, len); + } + } + + return bdata_len; +} + +char *sx_b64encode(const char *data, size_t data_len) +{ + char *bdata = NULL; + size_t bdata_len = 0; + + if(!data || !data_len) return NULL; + else bdata_len = sx_rawlen2b64len(data_len); + + if(!(bdata = malloc(bdata_len))) return NULL; + + sx_b64encode_in(data, bdata, data_len); + + return bdata; +} + +size_t sx_b64decode_in(const char *idata, size_t idata_len, char *data, size_t data_len) +{ + size_t enc = 0, dec = 0; + + if(!idata || !idata_len) return -1; + if(!data || !data_len) return -1; + + for(enc = 0; enc != idata_len; enc += 4) { + if(dec >= data_len) return dec; + decodeblock((unsigned char *)idata + enc, (unsigned char *)data + dec); + dec += 3; + } + + return dec; +} diff --git a/lib/hub.c b/lib/hub.c index 4f6b4f5..5e18a32 100644 --- a/lib/hub.c +++ b/lib/hub.c @@ -509,6 +509,8 @@ static int __init_builtinrpc_tree(usrtc_t *rtree) if(__insert_rpc_function(rtree, _SXSTREAMOPEN_CMD, _builtin_stream_open)) goto __fail; if(__insert_rpc_function(rtree, _SXSTREAMCLOSE_CMD, _builtin_stream_close)) goto __fail; if(__insert_rpc_function(rtree, _SXSTREAMEREAD_CMD, _builtin_stream_eread)) goto __fail; + if(__insert_rpc_function(rtree, _SXSTREAMBREAD_CMD, _builtin_stream_bread)) goto __fail; + if(__insert_rpc_function(rtree, _SXSTREAMBWRITE_CMD, _builtin_stream_bwrite)) goto __fail; return 0; diff --git a/lib/internal.h b/lib/internal.h index 0a4794f..39f60b6 100644 --- a/lib/internal.h +++ b/lib/internal.h @@ -2,6 +2,7 @@ * * (c) Askele Group 2013-2015 * (c) Alexander Vdolainen 2013-2015 + * (c) Alexander Vdolainen 2016 * * libsxmp is free software: you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as published @@ -35,6 +36,8 @@ #define _SXSTREAMCLOSE_CMD "!#-s>" #define _SXSTREAMEREAD_CMD "!#ers>" +#define _SXSTREAMBREAD_CMD "!#brs>" +#define _SXSTREAMBWRITE_CMD "!#bws>" /* link related */ int _sxmpl_writemsg(sxlink_t *co, sxmsg_t *msg); @@ -51,5 +54,7 @@ void _message_process(sxmsg_t *msg); int _builtin_stream_open(void *m, sexp_t *sx); int _builtin_stream_close(void *m, sexp_t *sx); int _builtin_stream_eread(void *m, sexp_t *sx); +int _builtin_stream_bread(void *m, sexp_t *sx); +int _builtin_stream_bwrite(void *m, sexp_t *sx); #endif /* __SXMPL_INTERNAL_H__ */ diff --git a/lib/stream.c b/lib/stream.c index 89c30fb..07e7194 100644 --- a/lib/stream.c +++ b/lib/stream.c @@ -44,6 +44,7 @@ #include #include +#include #include "internal.h" @@ -345,27 +346,119 @@ int sxstream_close(sxstream_t *stream) /* binary stream operations */ /* * NOTE: it will be done with with a copy paste, - * there are todo - implement it with oilite style. + * there are todo - implement it with iolite style. */ size_t sxstream_bread(sxstream_t *stream, void *buf, size_t buf_len) { + off_t offset = 0; + size_t rd = 0; + + errno = SXE_FAILED; + if(!stream || !buf) goto __fail; + offset = (off_t)stream->cur_offset; + + rd = sxstream_breadat(stream, buf, buf_len, offset); + if(rd > 0) stream->cur_offset += rd; + + return rd; + + __fail: return -1; } size_t sxstream_breadat(sxstream_t *stream, void *buf, size_t buf_len, off_t off) { - return -1; + sxmsg_t *msg; + char *mbuf = NULL; + size_t msglen = 0, r = -1; + int n = 0; + + errno = SXE_ENOMEM; + + msglen = strlen(_SXSTREAMBREAD_CMD) + sizeof(char)*6 + 32*sizeof(char); + if(!(mbuf = malloc(msglen))) return -1; + + errno = SXE_FAILED; + if(!stream || !buf) goto __fail; + if(!stream->link || !stream->channel) goto __fail; + if(!buf_len || off < 0) goto __fail; + + msglen = snprintf(mbuf, msglen, "(%s %lu %lu %lu)", _SXSTREAMBREAD_CMD, stream->sid, off, buf_len); + n = sxmsg_send(stream->channel, mbuf, msglen, &msg); + if(n == SXE_RAPIDMSG) { + errno = 0; + r = sxmsg_datalen(msg); + memcpy(buf, sxmsg_payload(msg), r); + sxmsg_clean(msg); + } else if(n == SXE_EOS) { + r = 0; + errno = SXE_EOS; + } else { + r = -1; + errno = n; + } + + __fail: + if(mbuf) free(mbuf); + return r; } size_t sxstream_bwriteat(sxstream_t *stream, const void *buf, size_t buf_len, off_t off) { - return -1; + sxmsg_t *msg; + char *mbuf = NULL; + sexp_t *sx = NULL, *isx; + size_t msglen = 0, wr = -1; + int idx, r; + + msglen = strlen(_SXSTREAMBWRITE_CMD) + sizeof(char)*8 + 32*sizeof(char); + if(MAX_RBBUF_LEN - msglen < sx_rawlen2b64len(buf_len)) /* cut the data */ + buf_len = ((MAX_RBBUF_LEN - msglen)/4)*3 - 2; + + msglen += sx_rawlen2b64len(buf_len); + if(!(mbuf = malloc(msglen))) { + errno = SXE_ENOMEM; + goto __fail; + } + msglen = snprintf(mbuf, msglen, "(%s %lu %lu %lu \"", _SXSTREAMBWRITE_CMD, stream->sid, + off, buf_len); + wr = sx_b64encode_in(buf, mbuf + msglen, buf_len); + if(wr < 0) goto __fail; + else msglen += wr; + msglen += snprintf(mbuf + msglen, MAX_RBBUF_LEN - msglen, "\")"); + + r = sxmsg_send(stream->channel, mbuf, msglen, &msg); + if(r == SXE_RAPIDMSG) { + sx = parse_sexp(sxmsg_rapidbuf(msg), strlen(sxmsg_rapidbuf(msg))); + sxmsg_clean(msg); + if(!sx) { + __badproto: + wr = -1; + errno = SXE_BADPROTO; + goto __fail; + } + + SEXP_ITERATE_LIST(sx, isx, idx) { + if(isx->ty == SEXP_LIST) goto __badproto; + if(!idx) wr = strtoul(isx->val, NULL, 0); + else goto __badproto; + } + } else { + wr = -1; + errno = r; + } + + __fail: + if(sx) destroy_sexp(sx); + if(mbuf) free(mbuf); + return wr; } size_t sxstream_bsyncat(sxstream_t *stream, void *buf, size_t buf_len, off_t off) { - return -1; + /* TODO: for 0.4.4 */ + return SXE_IGNORED; } /* @@ -1055,3 +1148,82 @@ int _builtin_stream_eread(void *m, sexp_t *sx) return sxmsg_return(msg, r); } +int _builtin_stream_bread(void *m, sexp_t *sx) +{ + sxmsg_t *msg = (sxmsg_t *)m; + sxchnl_t *channel = msg->pch; + sxlink_t *link = NULL; + struct sxstream_opened *stream = NULL; + sexp_t *isx; + void *rbuf; + uint64_t sid = 0, offset; + size_t rdlen; + int r = SXE_FAILED, idx; + + if(!channel) goto __fail; + link = channel->link; + + if(!link || !channel->rpc_list) goto __fail; /* paranoid test */ + + /* parse input */ + SEXP_ITERATE_LIST(sx, isx, idx) { + if(isx->ty == SEXP_LIST) { + __badproto: + r = SXE_BADPROTO; + goto __fail; + } + switch(idx) { + case 0: r = SXE_BADPROTO; break; + case 1: sid = strtoul(isx->val, NULL, 0); break; + case 2: offset = strtoul(isx->val, NULL, 0); break; + case 3: rdlen = strtoul(isx->val, NULL, 0); r = SXE_SUCCESS; break; + default: goto __badproto; break; + } + } + if(r != SXE_SUCCESS) goto __fail; + if(!rdlen) { /* in case of reading zero bytes - we will send nilreply error */ + r = SXE_NILREPLY; + goto __fail; + } + if(rdlen >= MAX_RBBUF_LEN) rdlen = MAX_RBBUF_LEN - 1; /* check up for out of buffer reading */ + + /* check out if stream exists */ + if(!(stream = link->streams[(int)sid])) { + r = SXE_NOSUCHSTREAMTYPE; + goto __fail; + } + + /* check up if we have permission to do it */ + if((stream->pin_channelid != channel->cid) || !(stream->flags & SXE_O_READ)) { + r = SXE_EPERM; + goto __fail; + } + + /* check if the givn stream has binary capability */ + if(stream->desc->type != SXE_O_BINARY) { + r = SXE_FAILED; + goto __fail; + } + + rbuf = sxmsg_rapidbuf(msg); + + rdlen = stream->desc->ops->s_bread(stream, rdlen, offset, rbuf); + if(!rdlen) { + r = SXE_EOS; + goto __fail; + } if(rdlen < 0) { /* nil reply */ + r = SXE_NILREPLY; + goto __fail; + } else return sxmsg_rreply(msg, rdlen); + + __fail: + return sxmsg_return(msg, r); +} + +int _builtin_stream_bwrite(void *m, sexp_t *sx) +{ + int r = SXE_FAILED; + sxmsg_t *msg = (sxmsg_t *)m; + + return sxmsg_return(msg, r); +}