@ -47,6 +47,111 @@
# include "internal.h"
static void __free_listentry ( struct _stream_list_node * listentry ) ;
static int __flag_check ( int s_flag , int d_flag )
{
int i ;
for ( i = 1 ; i < 7 ; i + + )
if ( ( d_flag & ( 1 < < i ) ) & & ! ( s_flag & ( 1 < < i ) ) ) return 1 ;
return 0 ;
}
static sxstream_t * __stream_open ( sxlink_t * link , sxchnl_t * channel , const char * opt ,
struct sxstream_description * desc , int stid , int flags )
{
char * msgbuf = NULL ;
size_t mlen = 0 ;
sxstream_t * ss = NULL ;
sxmsg_t * msg = NULL ;
char * buf = NULL ;
sexp_t * sx , * isx ;
uint64_t sid = 0 ;
int r = 0 , idx ;
/* check flags */
if ( __flag_check ( desc - > flags , flags ) ) {
errno = SXE_EPERM ;
goto __fini ;
}
/* time to alloc all stuff */
mlen = strlen ( _SXSTREAMOPEN_CMD ) + strlen ( _SXSTREAMTYPE_PREFIX ) + strlen ( _SXSTREAMFLAGS_PREFIX ) ;
if ( ! opt )
mlen + = ( _SXSTREAMOPEN_ADDMINLEN + sizeof ( char ) ) ;
else mlen + = ( _SXSTREAMOPEN_ADDMAXLEN + sizeof ( char ) ) + strlen ( opt ) ;
if ( ! ( msgbuf = malloc ( mlen * sizeof ( char ) ) ) ) {
errno = SXE_ENOMEM ;
goto __fini ;
}
if ( ! opt )
mlen = snprintf ( msgbuf , mlen , " (%s (%s %d)(%s %d)) " , _SXSTREAMOPEN_CMD ,
_SXSTREAMTYPE_PREFIX , stid , _SXSTREAMFLAGS_PREFIX , flags ) ;
else
mlen = snprintf ( msgbuf , mlen , " (%s (%s %d)(%s %d)(%s \" %s \" )) " , _SXSTREAMOPEN_CMD ,
_SXSTREAMTYPE_PREFIX , stid , _SXSTREAMFLAGS_PREFIX , flags ,
_SXSTREAMOPT_PREFIX , opt ) ;
r = sxmsg_send ( channel , msgbuf , mlen + sizeof ( char ) , & msg ) ;
if ( r ! = SXE_RAPIDMSG ) {
errno = r ;
goto __fini ;
}
/* time to get the special id, put it to the link and clean up all */
buf = sxmsg_rapidbuf ( msg ) ;
if ( ! ( sx = parse_sexp ( buf , strlen ( buf ) ) ) ) {
errno = SXE_ENOMEM ;
sxmsg_clean ( msg ) ;
goto __fini ;
} else sxmsg_clean ( msg ) ;
SEXP_ITERATE_LIST ( sx , isx , idx ) {
if ( isx - > ty = = SEXP_LIST ) r = SXE_BADPROTO ;
if ( ! idx & & strcmp ( isx - > val , _SXSTREAMOPEN_CMD_R ) ) r = SXE_BADPROTO ;
if ( idx & & idx < 2 ) sid = strtoul ( isx - > val , NULL , 0 ) ;
if ( idx > = 2 ) r = SXE_BADPROTO ;
}
destroy_sexp ( sx ) ;
if ( ! r ) {
if ( ! ( ss = malloc ( sizeof ( sxstream_t ) ) ) ) {
errno = SXE_ENOMEM ;
goto __fini ;
}
memset ( ss , 0 , sizeof ( sxstream_t ) ) ;
ss - > flags = flags ;
ss - > sid = sid ;
ss - > link = link ;
ss - > channel = channel ;
} else errno = r ;
__fini :
if ( msgbuf ) free ( msgbuf ) ;
return ss ;
}
static struct sxstream_description * __get_stream_description ( sxlink_t * link , int stid )
{
usrtc_node_t * node = NULL ;
struct sxstream_description * s_desc = NULL ;
if ( ! link - > remote_streams ) return NULL ;
else node = usrtc_lookup ( link - > remote_streams , & stid ) ;
if ( node ) s_desc = ( struct sxstream_description * ) usrtc_node_getdata ( node ) ;
return s_desc ;
}
/*
* NOTE : sxstream operations ( except opening process i . e . few threads
* can open different streams ) are NOT _thread - safe_ , please
@ -54,18 +159,107 @@
*/
sxstream_t * sxstream_open ( sxlink_t * link , const char * opt , int stid , int flags )
{
return NULL ;
sxchnl_t * channel = NULL ;
struct sxstream_description * s_desc ;
sxstream_t * ss = NULL ;
int r = 0 ;
if ( ! link ) {
r = SXE_FAILED ;
goto __fini ;
}
if ( ! link - > remote_streams ) {
r = SXE_NOSUCHSTREAMTYPE ;
goto __fini ;
}
/* ok, let's a deal with channel */
if ( ! ( s_desc = __get_stream_description ( link , stid ) ) ) {
r = SXE_NOSUCHSTREAMTYPE ;
goto __fini ;
} else channel = sxchannel_open ( link , s_desc - > pcid ) ;
if ( ! channel ) {
r = errno ;
goto __fini ;
}
flags | = SXE_O_OCHANNELED ;
ss = __stream_open ( link , channel , opt , s_desc , stid , flags ) ;
__fini :
errno = r ;
if ( ! ss & & channel ) sxchannel_close ( channel ) ;
return ss ;
}
sxstream_t * sxstream_openwch ( sxlink_t * link , sxchnl_t * channel , const char * opt ,
int stid , int flags )
{
return NULL ;
struct sxstream_description * s_desc = NULL ;
sxstream_t * ss = NULL ;
int r = 0 ;
if ( ! link | | ! channel ) {
r = SXE_FAILED ;
goto __fini ;
}
if ( ! link - > remote_streams ) {
r = SXE_NOSUCHSTREAMTYPE ;
goto __fini ;
}
/* get description */
if ( ! ( s_desc = __get_stream_description ( link , stid ) ) ) {
r = SXE_NOSUCHSTREAMTYPE ;
goto __fini ;
}
ss = __stream_open ( link , channel , opt , s_desc , stid , flags ) ;
__fini :
errno = r ;
return ss ;
}
int sxstream_close ( sxstream_t * stream )
{
return SXE_SUCCESS ;
sxmsg_t * msg = NULL ;
char * mbuf ;
list_node_t * iter , * s_iter ;
struct _stream_list_node * listentry ;
size_t mbuf_len = strlen ( _SXSTREAMCLOSE_CMD ) + _SXSTREAMCLOSE_ADDLEN ;
int r = 0 ;
if ( ! stream ) return SXE_FAILED ;
if ( ! stream - > link | | ! stream - > channel ) return SXE_FAILED ;
if ( ! ( mbuf = malloc ( mbuf_len ) ) ) return SXE_ENOMEM ;
else mbuf_len = snprintf ( mbuf , mbuf_len , " (%s %lu) " , _SXSTREAMCLOSE_CMD , stream - > sid ) ;
r = sxmsg_send ( stream - > channel , mbuf , mbuf_len + sizeof ( char ) , & msg ) ;
if ( r = = SXE_RAPIDMSG ) sxmsg_clean ( msg ) ;
if ( r = = SXE_SUCCESS ) {
/* TODO: free all caches, buffers etc */
if ( ! ( stream - > flags & SXE_O_BINARY ) ) {
if ( ! ( stream - > flags & SXE_O_NAMED ) & & stream - > entries ) {
list_for_each_safe ( stream - > entries , iter , s_iter ) {
listentry = list_entry ( iter , struct _stream_list_node , node ) ;
list_del ( iter ) ;
__free_listentry ( listentry ) ;
}
}
}
if ( stream - > flags & SXE_O_OCHANNELED )
r = sxchannel_close ( stream - > channel ) ;
free ( stream ) ;
}
free ( mbuf ) ;
return r ;
}
/* binary stream operations */
@ -100,9 +294,180 @@ size_t sxstream_bsyncat(sxstream_t *stream, void *buf, size_t buf_len, off_t off
* That means you should care about ridden values , since
* it might be freed on the next read operation .
*/
static struct _nn_stream_entry_node * __alloc_entry ( const char * value )
{
struct _nn_stream_entry_node * o = malloc ( sizeof ( struct _nn_stream_entry_node ) ) ;
if ( ! o ) return NULL ;
if ( ! ( o - > value = strdup ( value ) ) ) {
free ( o ) ;
return NULL ;
}
list_init_node ( & o - > node ) ;
return o ;
}
static void __free_entry ( struct _nn_stream_entry_node * entry )
{
free ( entry - > value ) ;
free ( entry ) ;
return ;
}
static struct _stream_list_node * __alloc_listentry ( void )
{
struct _stream_list_node * o = malloc ( sizeof ( struct _stream_list_node ) ) ;
if ( ! o ) return NULL ;
if ( ! ( o - > list = malloc ( sizeof ( list_head_t ) ) ) ) {
free ( o ) ;
return NULL ;
}
list_init_head ( o - > list ) ;
list_init_node ( & o - > node ) ;
return o ;
}
static void __free_listentry ( struct _stream_list_node * listentry )
{
struct _nn_stream_entry_node * entry ;
list_node_t * iter , * s_iter ;
list_for_each_safe ( listentry - > list , iter , s_iter ) {
entry = list_entry ( iter , struct _nn_stream_entry_node , node ) ;
list_del ( iter ) ;
__free_entry ( entry ) ;
}
free ( listentry - > list ) ;
free ( listentry ) ;
return ;
}
/* entry nonamed stream ops */
list_head_t * sxstream_read ( sxstream_t * stream )
{
sxmsg_t * msg ;
size_t mbuf_len ;
char * mbuf = NULL ;
list_node_t * cur = NULL , * iter , * s_iter ;
struct _stream_list_node * listentry = NULL ;
struct _nn_stream_entry_node * entry = NULL ;
sexp_t * sx , * isx , * iisx ;
int r = 0 , idx , iidx ;
errno = SXE_FAILED ;
if ( ! stream ) goto __fail ;
if ( ! stream - > link | | ! stream - > channel ) goto __fail ;
if ( stream - > entries ) {
if ( stream - > cur = = list_node_last ( stream - > entries ) ) {
cur = stream - > cur ;
stream - > cur = NULL ; /* we need to read from remote again on the next operation */
} else if ( stream - > cur & & stream - > cur ! = list_node_last ( stream - > entries ) ) {
cur = stream - > cur ;
if ( ! list_node_next_isbound ( cur ) )
stream - > cur = stream - > cur - > next ;
else stream - > cur = NULL ;
} else goto __read_seq ;
__success :
if ( cur ) {
errno = 0 ;
listentry = list_entry ( cur , struct _stream_list_node , node ) ;
return listentry - > list ;
} else return NULL ;
}
__read_seq :
errno = SXE_ENOMEM ;
if ( stream - > entries ) { /* ok, clean up all stuff */
list_for_each_safe ( stream - > entries , iter , s_iter ) {
listentry = list_entry ( iter , struct _stream_list_node , node ) ;
list_del ( iter ) ;
__free_listentry ( listentry ) ;
}
} else { /* allocate it first time */
if ( ! ( stream - > entries = malloc ( sizeof ( list_head_t ) ) ) ) goto __fail ;
list_init_head ( stream - > entries ) ;
}
/* prepare a message */
mbuf_len = strlen ( _SXSTREAMEREAD_CMD ) + _SXSTREAMEREAD_ADDLEN ;
if ( ! ( mbuf = malloc ( mbuf_len ) ) ) goto __fail ;
mbuf_len = snprintf ( mbuf , mbuf_len , " (%s) " , _SXSTREAMEREAD_CMD ) ;
r = sxmsg_send ( stream - > channel , mbuf , mbuf_len , & msg ) ;
switch ( r ) {
case SXE_RAPIDMSG : /* get our data */
sx = parse_sexp ( sxmsg_payload ( msg ) , strlen ( sxmsg_payload ( msg ) ) ) ;
sxmsg_clean ( msg ) ;
if ( ! sx ) {
__badproto :
if ( sx ) destroy_sexp ( sx ) ;
errno = SXE_BADPROTO ;
goto __fail ;
}
SEXP_ITERATE_LIST ( sx , isx , idx ) {
if ( isx - > ty ! = SEXP_LIST ) goto __badproto ;
/* allocate list entry and add it */
if ( ! ( listentry = __alloc_listentry ( ) ) ) {
__enomem :
errno = SXE_ENOMEM ;
destroy_sexp ( sx ) ;
goto __fail ;
} else list_add2tail ( stream - > entries , & listentry - > node ) ;
SEXP_ITERATE_LIST ( isx , iisx , iidx ) {
if ( isx - > ty = = SEXP_LIST ) goto __badproto ;
if ( ! ( entry = __alloc_entry ( ( const char * ) iisx - > val ) ) ) goto __enomem ;
else list_add2tail ( listentry - > list , & entry - > node ) ;
}
}
/* ok let's see what we got */
if ( idx ) {
stream - > cur = list_node_first ( stream - > entries ) ;
cur = stream - > cur ;
/* point to the next one if exists */
if ( ! list_node_next_isbound ( cur ) ) stream - > cur = stream - > cur - > next ;
else stream - > cur = NULL ;
} else {
errno = SXE_NILREPLY ;
stream - > cur = NULL ;
cur = NULL ;
}
destroy_sexp ( sx ) ;
break ;
case SXE_EOS : /* stream is over */
errno = SXE_EOS ;
cur = NULL ;
break ;
default :
errno = r ;
goto __fail ;
break ;
}
free ( mbuf ) ;
goto __success ;
__fail :
if ( mbuf ) free ( mbuf ) ;
return NULL ;
}
@ -112,3 +477,25 @@ list_head_t *sxstream_readnamed(sxstream_t *stream)
return NULL ;
}
/* builtin functions for streams */
int _builtin_stream_open ( void * m , sexp_t * sx )
{
sxmsg_t * msg = ( sxmsg_t * ) m ;
return sxmsg_return ( msg , SXE_FAILED ) ;
}
int _builtin_stream_close ( void * m , sexp_t * sx )
{
sxmsg_t * msg = ( sxmsg_t * ) m ;
return sxmsg_return ( msg , SXE_FAILED ) ;
}
int _builtin_stream_eread ( void * m , sexp_t * sx )
{
sxmsg_t * msg = ( sxmsg_t * ) m ;
return sxmsg_return ( msg , SXE_FAILED ) ;
}