[core] builtin blob stream read added, stub for write added also;

v0.5.xx
Alexander Vdolainen 9 years ago
parent 350aa0321f
commit e36f738f37

@ -1 +1,2 @@
nobase_include_HEADERS = sxmp/sxmp.h sxmp/errno.h sxmp/limits.h sxmp/version.h nobase_include_HEADERS = sxmp/sxmp.h sxmp/errno.h sxmp/limits.h sxmp/version.h \
sxmp/base64.h

@ -0,0 +1,33 @@
/*
* Base64 encode/decode functions used in sxmp
* openssl stuff wasn't used to avoid additional memleaks and allocation,
* deallocation of BIO.
*
* (c) Alexander Vdolainen 2016 <avdolainen@zoho.com>
*
* libsxmp is free software: you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* libsxmp is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.";
*/
#ifndef __SXMP_BASE64_H__
#define __SXMP_BASE64_H__
size_t sx_rawlen2b64len(size_t raw_length);
size_t sx_b64encode_in(const char *data, char *bdata, size_t data_len);
char *sx_b64encode(const char *data, size_t data_len);
size_t sx_b64decode_in(const char *idata, size_t idata_len, char *data, size_t data_len);
#endif /* __SXMP_BASE64_H__ */

@ -15,7 +15,8 @@ lib_LTLIBRARIES = libsxmp.la
libsxmp_la_SOURCES = \ libsxmp_la_SOURCES = \
sxmplv2.c hub.c channel.c message.c rpc.c \ sxmplv2.c hub.c channel.c message.c rpc.c \
uuid.c stream_generic_listops.c stream.c error.c uuid.c stream_generic_listops.c stream.c \
base64.c error.c
libsxmp_la_LDFLAGS = libsxmp_la_LDFLAGS =

@ -0,0 +1,110 @@
/*
* Base64 encode/decode functions used in sxmp
* openssl stuff wasn't used to avoid additional memleaks and allocation,
* deallocation of BIO.
*
* (c) Alexander Vdolainen 2016 <avdolainen@zoho.com>
*
* libsxmp is free software: you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* libsxmp is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.";
*/
#include <stdint.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sxmp/base64.h>
static const char b64[]="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
static inline void encodeblock(unsigned char *in, unsigned char *out, int len)
{
out[0] = (unsigned char) b64[ (int)(in[0] >> 2) ];
out[1] = (unsigned char) b64[ (int)(((in[0] & 0x03) << 4) | ((in[1] & 0xf0) >> 4)) ];
out[2] = (unsigned char) (len > 1 ? b64[ (int)(((in[1] & 0x0f) << 2) | ((in[2] & 0xc0) >> 6)) ] : '=');
out[3] = (unsigned char) (len > 2 ? b64[ (int)(in[2] & 0x3f) ] : '=');
}
static inline void decodeblock(unsigned char *in, unsigned char *out)
{
out[0] = (unsigned char) (in[0] << 2 | in[1] >> 4);
out[1] = (unsigned char) (in[1] << 4 | in[2] >> 2);
out[2] = (unsigned char) (((in[2] << 6) & 0xc0) | in[3]);
}
size_t sx_rawlen2b64len(size_t raw_length)
{
return 4 * ((raw_length + 2) / 3);
}
size_t sx_b64encode_in(const char *data, char *bdata, size_t data_len)
{
size_t bdata_len = 0, c = 0, n = 0;
int len = 0, nil = 0, i;
unsigned char ib[4];
if(!bdata) return -1;
if(!data || !data_len) return -1;
else bdata_len = sx_rawlen2b64len(data_len);
while(n != bdata_len) {
if((c + 3) <= data_len) {
len = 3;
encodeblock((unsigned char *)data + c, (unsigned char *)bdata + n, len);
n += 4; c += 3;
} else {
nil = (c + 3) - data_len;
len = 0;
for(i = 0; i < 3; i++) {
if(i < nil) {
ib[i] = *((unsigned char *)data + (c + i));
len++;
} else ib[i] = (unsigned char)0;
}
encodeblock(ib, (unsigned char *)bdata + n, len);
}
}
return bdata_len;
}
char *sx_b64encode(const char *data, size_t data_len)
{
char *bdata = NULL;
size_t bdata_len = 0;
if(!data || !data_len) return NULL;
else bdata_len = sx_rawlen2b64len(data_len);
if(!(bdata = malloc(bdata_len))) return NULL;
sx_b64encode_in(data, bdata, data_len);
return bdata;
}
size_t sx_b64decode_in(const char *idata, size_t idata_len, char *data, size_t data_len)
{
size_t enc = 0, dec = 0;
if(!idata || !idata_len) return -1;
if(!data || !data_len) return -1;
for(enc = 0; enc != idata_len; enc += 4) {
if(dec >= data_len) return dec;
decodeblock((unsigned char *)idata + enc, (unsigned char *)data + dec);
dec += 3;
}
return dec;
}

@ -509,6 +509,8 @@ static int __init_builtinrpc_tree(usrtc_t *rtree)
if(__insert_rpc_function(rtree, _SXSTREAMOPEN_CMD, _builtin_stream_open)) goto __fail; if(__insert_rpc_function(rtree, _SXSTREAMOPEN_CMD, _builtin_stream_open)) goto __fail;
if(__insert_rpc_function(rtree, _SXSTREAMCLOSE_CMD, _builtin_stream_close)) goto __fail; if(__insert_rpc_function(rtree, _SXSTREAMCLOSE_CMD, _builtin_stream_close)) goto __fail;
if(__insert_rpc_function(rtree, _SXSTREAMEREAD_CMD, _builtin_stream_eread)) goto __fail; if(__insert_rpc_function(rtree, _SXSTREAMEREAD_CMD, _builtin_stream_eread)) goto __fail;
if(__insert_rpc_function(rtree, _SXSTREAMBREAD_CMD, _builtin_stream_bread)) goto __fail;
if(__insert_rpc_function(rtree, _SXSTREAMBWRITE_CMD, _builtin_stream_bwrite)) goto __fail;
return 0; return 0;

@ -2,6 +2,7 @@
* *
* (c) Askele Group 2013-2015 <http://askele.com> * (c) Askele Group 2013-2015 <http://askele.com>
* (c) Alexander Vdolainen 2013-2015 <avdolainen@gmail.com> * (c) Alexander Vdolainen 2013-2015 <avdolainen@gmail.com>
* (c) Alexander Vdolainen 2016 <avdolainen@zoho.com>
* *
* libsxmp is free software: you can redistribute it and/or modify it * libsxmp is free software: you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published * under the terms of the GNU Lesser General Public License as published
@ -35,6 +36,8 @@
#define _SXSTREAMCLOSE_CMD "!#-s>" #define _SXSTREAMCLOSE_CMD "!#-s>"
#define _SXSTREAMEREAD_CMD "!#ers>" #define _SXSTREAMEREAD_CMD "!#ers>"
#define _SXSTREAMBREAD_CMD "!#brs>"
#define _SXSTREAMBWRITE_CMD "!#bws>"
/* link related */ /* link related */
int _sxmpl_writemsg(sxlink_t *co, sxmsg_t *msg); int _sxmpl_writemsg(sxlink_t *co, sxmsg_t *msg);
@ -51,5 +54,7 @@ void _message_process(sxmsg_t *msg);
int _builtin_stream_open(void *m, sexp_t *sx); int _builtin_stream_open(void *m, sexp_t *sx);
int _builtin_stream_close(void *m, sexp_t *sx); int _builtin_stream_close(void *m, sexp_t *sx);
int _builtin_stream_eread(void *m, sexp_t *sx); int _builtin_stream_eread(void *m, sexp_t *sx);
int _builtin_stream_bread(void *m, sexp_t *sx);
int _builtin_stream_bwrite(void *m, sexp_t *sx);
#endif /* __SXMPL_INTERNAL_H__ */ #endif /* __SXMPL_INTERNAL_H__ */

@ -44,6 +44,7 @@
#include <sxmp/limits.h> #include <sxmp/limits.h>
#include <sxmp/sxmp.h> #include <sxmp/sxmp.h>
#include <sxmp/base64.h>
#include "internal.h" #include "internal.h"
@ -345,27 +346,119 @@ int sxstream_close(sxstream_t *stream)
/* binary stream operations */ /* binary stream operations */
/* /*
* NOTE: it will be done with with a copy paste, * NOTE: it will be done with with a copy paste,
* there are todo - implement it with oilite style. * there are todo - implement it with iolite style.
*/ */
size_t sxstream_bread(sxstream_t *stream, void *buf, size_t buf_len) size_t sxstream_bread(sxstream_t *stream, void *buf, size_t buf_len)
{ {
off_t offset = 0;
size_t rd = 0;
errno = SXE_FAILED;
if(!stream || !buf) goto __fail;
offset = (off_t)stream->cur_offset;
rd = sxstream_breadat(stream, buf, buf_len, offset);
if(rd > 0) stream->cur_offset += rd;
return rd;
__fail:
return -1; return -1;
} }
size_t sxstream_breadat(sxstream_t *stream, void *buf, size_t buf_len, off_t off) size_t sxstream_breadat(sxstream_t *stream, void *buf, size_t buf_len, off_t off)
{ {
return -1; sxmsg_t *msg;
char *mbuf = NULL;
size_t msglen = 0, r = -1;
int n = 0;
errno = SXE_ENOMEM;
msglen = strlen(_SXSTREAMBREAD_CMD) + sizeof(char)*6 + 32*sizeof(char);
if(!(mbuf = malloc(msglen))) return -1;
errno = SXE_FAILED;
if(!stream || !buf) goto __fail;
if(!stream->link || !stream->channel) goto __fail;
if(!buf_len || off < 0) goto __fail;
msglen = snprintf(mbuf, msglen, "(%s %lu %lu %lu)", _SXSTREAMBREAD_CMD, stream->sid, off, buf_len);
n = sxmsg_send(stream->channel, mbuf, msglen, &msg);
if(n == SXE_RAPIDMSG) {
errno = 0;
r = sxmsg_datalen(msg);
memcpy(buf, sxmsg_payload(msg), r);
sxmsg_clean(msg);
} else if(n == SXE_EOS) {
r = 0;
errno = SXE_EOS;
} else {
r = -1;
errno = n;
}
__fail:
if(mbuf) free(mbuf);
return r;
} }
size_t sxstream_bwriteat(sxstream_t *stream, const void *buf, size_t sxstream_bwriteat(sxstream_t *stream, const void *buf,
size_t buf_len, off_t off) size_t buf_len, off_t off)
{ {
return -1; sxmsg_t *msg;
char *mbuf = NULL;
sexp_t *sx = NULL, *isx;
size_t msglen = 0, wr = -1;
int idx, r;
msglen = strlen(_SXSTREAMBWRITE_CMD) + sizeof(char)*8 + 32*sizeof(char);
if(MAX_RBBUF_LEN - msglen < sx_rawlen2b64len(buf_len)) /* cut the data */
buf_len = ((MAX_RBBUF_LEN - msglen)/4)*3 - 2;
msglen += sx_rawlen2b64len(buf_len);
if(!(mbuf = malloc(msglen))) {
errno = SXE_ENOMEM;
goto __fail;
}
msglen = snprintf(mbuf, msglen, "(%s %lu %lu %lu \"", _SXSTREAMBWRITE_CMD, stream->sid,
off, buf_len);
wr = sx_b64encode_in(buf, mbuf + msglen, buf_len);
if(wr < 0) goto __fail;
else msglen += wr;
msglen += snprintf(mbuf + msglen, MAX_RBBUF_LEN - msglen, "\")");
r = sxmsg_send(stream->channel, mbuf, msglen, &msg);
if(r == SXE_RAPIDMSG) {
sx = parse_sexp(sxmsg_rapidbuf(msg), strlen(sxmsg_rapidbuf(msg)));
sxmsg_clean(msg);
if(!sx) {
__badproto:
wr = -1;
errno = SXE_BADPROTO;
goto __fail;
}
SEXP_ITERATE_LIST(sx, isx, idx) {
if(isx->ty == SEXP_LIST) goto __badproto;
if(!idx) wr = strtoul(isx->val, NULL, 0);
else goto __badproto;
}
} else {
wr = -1;
errno = r;
}
__fail:
if(sx) destroy_sexp(sx);
if(mbuf) free(mbuf);
return wr;
} }
size_t sxstream_bsyncat(sxstream_t *stream, void *buf, size_t buf_len, off_t off) size_t sxstream_bsyncat(sxstream_t *stream, void *buf, size_t buf_len, off_t off)
{ {
return -1; /* TODO: for 0.4.4 */
return SXE_IGNORED;
} }
/* /*
@ -1055,3 +1148,82 @@ int _builtin_stream_eread(void *m, sexp_t *sx)
return sxmsg_return(msg, r); return sxmsg_return(msg, r);
} }
int _builtin_stream_bread(void *m, sexp_t *sx)
{
sxmsg_t *msg = (sxmsg_t *)m;
sxchnl_t *channel = msg->pch;
sxlink_t *link = NULL;
struct sxstream_opened *stream = NULL;
sexp_t *isx;
void *rbuf;
uint64_t sid = 0, offset;
size_t rdlen;
int r = SXE_FAILED, idx;
if(!channel) goto __fail;
link = channel->link;
if(!link || !channel->rpc_list) goto __fail; /* paranoid test */
/* parse input */
SEXP_ITERATE_LIST(sx, isx, idx) {
if(isx->ty == SEXP_LIST) {
__badproto:
r = SXE_BADPROTO;
goto __fail;
}
switch(idx) {
case 0: r = SXE_BADPROTO; break;
case 1: sid = strtoul(isx->val, NULL, 0); break;
case 2: offset = strtoul(isx->val, NULL, 0); break;
case 3: rdlen = strtoul(isx->val, NULL, 0); r = SXE_SUCCESS; break;
default: goto __badproto; break;
}
}
if(r != SXE_SUCCESS) goto __fail;
if(!rdlen) { /* in case of reading zero bytes - we will send nilreply error */
r = SXE_NILREPLY;
goto __fail;
}
if(rdlen >= MAX_RBBUF_LEN) rdlen = MAX_RBBUF_LEN - 1; /* check up for out of buffer reading */
/* check out if stream exists */
if(!(stream = link->streams[(int)sid])) {
r = SXE_NOSUCHSTREAMTYPE;
goto __fail;
}
/* check up if we have permission to do it */
if((stream->pin_channelid != channel->cid) || !(stream->flags & SXE_O_READ)) {
r = SXE_EPERM;
goto __fail;
}
/* check if the givn stream has binary capability */
if(stream->desc->type != SXE_O_BINARY) {
r = SXE_FAILED;
goto __fail;
}
rbuf = sxmsg_rapidbuf(msg);
rdlen = stream->desc->ops->s_bread(stream, rdlen, offset, rbuf);
if(!rdlen) {
r = SXE_EOS;
goto __fail;
} if(rdlen < 0) { /* nil reply */
r = SXE_NILREPLY;
goto __fail;
} else return sxmsg_rreply(msg, rdlen);
__fail:
return sxmsg_return(msg, r);
}
int _builtin_stream_bwrite(void *m, sexp_t *sx)
{
int r = SXE_FAILED;
sxmsg_t *msg = (sxmsg_t *)m;
return sxmsg_return(msg, r);
}

Loading…
Cancel
Save