added version query with support for the old clients, added stream types sync functions, btw it's not tested;
This commit is contained in:
parent
ac98fb231f
commit
e0644e7c5e
@ -33,6 +33,8 @@
|
|||||||
|
|
||||||
#define MAX_SXMPLTHREADS 8
|
#define MAX_SXMPLTHREADS 8
|
||||||
|
|
||||||
|
#define MAX_STREAMS_TYPES 32767
|
||||||
|
|
||||||
#define MAX_LINKS 32768
|
#define MAX_LINKS 32768
|
||||||
|
|
||||||
#endif /* __SXMP_LIMITS_H__ */
|
#endif /* __SXMP_LIMITS_H__ */
|
||||||
|
@ -99,7 +99,12 @@ typedef struct __sxlink_t {
|
|||||||
list_head_t write_pending; /** < list of messages waiting for write */
|
list_head_t write_pending; /** < list of messages waiting for write */
|
||||||
pthread_mutex_t write_pending_lock;
|
pthread_mutex_t write_pending_lock;
|
||||||
volatile uint8_t pending_messages; /** < pending message count */
|
volatile uint8_t pending_messages; /** < pending message count */
|
||||||
|
/* stream part */
|
||||||
|
pthread_rwlock_t stream_rwlock;
|
||||||
|
usrtc_t *stream_list;
|
||||||
|
usrtc_t *remote_streams;
|
||||||
/* Other stuff */
|
/* Other stuff */
|
||||||
|
sxmp_proto_version_t cp_version;
|
||||||
pthread_t thrd_poll[8];
|
pthread_t thrd_poll[8];
|
||||||
volatile uint8_t flags; /** < flags of the connection */
|
volatile uint8_t flags; /** < flags of the connection */
|
||||||
volatile uint8_t usecount; /** < use count for the connection link */
|
volatile uint8_t usecount; /** < use count for the connection link */
|
||||||
@ -164,6 +169,43 @@ typedef struct __link_rpc_list_type {
|
|||||||
char *opt_version; /** < reserved for future implementations */
|
char *opt_version; /** < reserved for future implementations */
|
||||||
} sxl_rpclist_t;
|
} sxl_rpclist_t;
|
||||||
|
|
||||||
|
struct sxstream_ops;
|
||||||
|
|
||||||
|
struct sxstream_description {
|
||||||
|
uint16_t stid;
|
||||||
|
uint16_t pcid;
|
||||||
|
uint16_t type;
|
||||||
|
uint16_t flags;
|
||||||
|
struct sxstream_ops *ops;
|
||||||
|
usrtc_node_t node;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct sxstream_opened {
|
||||||
|
uint64_t sto_id;
|
||||||
|
struct sxstream_description *desc;
|
||||||
|
union {
|
||||||
|
void *map_data;
|
||||||
|
char *ent_buf;
|
||||||
|
};
|
||||||
|
void *priv;
|
||||||
|
uint64_t off;
|
||||||
|
uint16_t flags;
|
||||||
|
usrtc_node_t node;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct sxstream_ops {
|
||||||
|
int (*s_open)(struct sxstream_opened *, const char *);
|
||||||
|
int (*s_close)(struct sxstream_opened *);
|
||||||
|
union {
|
||||||
|
size_t (*s_bread)(struct sxstream_opened *, size_t, uint64_t, void *);
|
||||||
|
size_t (*s_eread)(struct sxstream_opened *, size_t, uint64_t, char *);
|
||||||
|
};
|
||||||
|
union {
|
||||||
|
size_t (*s_bwrite)(struct sxstream_opened *, size_t, uint64_t, const void *);
|
||||||
|
size_t (*s_ewrite)(struct sxstream_opened *, uint64_t, list_head_t *);
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* \brief Hub subsystem structure.
|
* \brief Hub subsystem structure.
|
||||||
*
|
*
|
||||||
@ -174,6 +216,7 @@ typedef struct __link_rpc_list_type {
|
|||||||
*/
|
*/
|
||||||
typedef struct __sxhub_type {
|
typedef struct __sxhub_type {
|
||||||
usrtc_t *links;
|
usrtc_t *links;
|
||||||
|
usrtc_t *streams;
|
||||||
pthread_rwlock_t rwlock;
|
pthread_rwlock_t rwlock;
|
||||||
char *rootca, *certpem, *certkey; /* path name to the certificates */
|
char *rootca, *certpem, *certkey; /* path name to the certificates */
|
||||||
sxl_rpclist_t *system_rpc;
|
sxl_rpclist_t *system_rpc;
|
||||||
@ -206,6 +249,55 @@ typedef struct __rpc_typed_list_type {
|
|||||||
usrtc_node_t lnode;
|
usrtc_node_t lnode;
|
||||||
} rpc_typed_list_t;
|
} rpc_typed_list_t;
|
||||||
|
|
||||||
|
/* streams */
|
||||||
|
|
||||||
|
/* flags for streams */
|
||||||
|
#define SXE_O_BINARY (1 << 1)
|
||||||
|
#define SXE_O_READ (1 << 2)
|
||||||
|
#define SXE_O_WRITE (1 << 3)
|
||||||
|
#define SXE_O_ASYNC (1 << 4)
|
||||||
|
#define SXE_O_TRUNC (1 << 5)
|
||||||
|
#define SXE_O_NAMED (1 << 6)
|
||||||
|
#define SXE_O_OCHANNELED (1 << 7)
|
||||||
|
|
||||||
|
/* sxstream_t used to access stream i.e. this is something returned on
|
||||||
|
* open operations.
|
||||||
|
* you should care about closing it, because this ones doesn't tracked.
|
||||||
|
*/
|
||||||
|
typedef struct __sxtream_type {
|
||||||
|
sxlink_t *link;
|
||||||
|
sxchnl_t *channel;
|
||||||
|
int flags;
|
||||||
|
uint64_t sid;
|
||||||
|
union {
|
||||||
|
sexp_t *sx;
|
||||||
|
void *ebuf;
|
||||||
|
};
|
||||||
|
/* for reading */
|
||||||
|
union {
|
||||||
|
struct {
|
||||||
|
void *data;
|
||||||
|
uintptr_t cur_offset;
|
||||||
|
};
|
||||||
|
struct {
|
||||||
|
list_head_t *entries;
|
||||||
|
list_node_t *cur;
|
||||||
|
};
|
||||||
|
};
|
||||||
|
list_head_t *named;
|
||||||
|
} sxstream_t;
|
||||||
|
|
||||||
|
struct _nn_steam_entry_node {
|
||||||
|
char *value;
|
||||||
|
list_node_t node;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct _steam_entry_node {
|
||||||
|
char *name;
|
||||||
|
char *value;
|
||||||
|
list_node_t node;
|
||||||
|
};
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
@ -254,6 +346,37 @@ int sxmsg_return(sxmsg_t *msg, int opcode);
|
|||||||
int sxmsg_return_pp(sxmsg_t *msg, int opcode);
|
int sxmsg_return_pp(sxmsg_t *msg, int opcode);
|
||||||
void sxmsg_clean(sxmsg_t *msg);
|
void sxmsg_clean(sxmsg_t *msg);
|
||||||
|
|
||||||
|
/* streams */
|
||||||
|
/*
|
||||||
|
* NOTE: sxstream operations (except opening process i.e. few threads
|
||||||
|
* can open different streams) are NOT _thread-safe_, please
|
||||||
|
* care about that in your own application.
|
||||||
|
*/
|
||||||
|
sxstream_t *sxstream_open(sxlink_t *link, const char *opt, int stid, int flags);
|
||||||
|
sxstream_t *sxstream_openwch(sxlink_t *link, sxchnl_t *channel, const char *opt, int stid, int flags);
|
||||||
|
int sxstream_close(sxstream_t *stream);
|
||||||
|
|
||||||
|
/* binary stream operations */
|
||||||
|
size_t sxstream_bread(sxstream_t *stream, void *buf, size_t buf_len);
|
||||||
|
size_t sxstream_breadat(sxstream_t *stream, void *buf, size_t buf_len, off_t off);
|
||||||
|
size_t sxstream_bwriteat(sxstream_t *stream, const void *buf, size_t buf_len, off_t off);
|
||||||
|
size_t sxstream_bsyncat(sxstream_t *stream, void *buf, size_t buf_len, off_t off);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* NOTE: returning value for reading from entry streams
|
||||||
|
* shouldn't be managed, it's done by close operation.
|
||||||
|
* That means you should care about ridden values, since
|
||||||
|
* it might be freed on the next read operation.
|
||||||
|
*/
|
||||||
|
/* entry nonamed stream ops */
|
||||||
|
list_head_t *sxstream_read(sxstream_t *stream);
|
||||||
|
|
||||||
|
/* entry named stream ops */
|
||||||
|
list_head_t *sxstream_readnamed(sxstream_t *stream);
|
||||||
|
|
||||||
|
/* provider-side functions for streams */
|
||||||
|
int sxhub_stream_register(sxhub_t *hub, const struct sxstream_description *s_desc);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
@ -30,5 +30,11 @@
|
|||||||
#define SXMP_MAJOR 4
|
#define SXMP_MAJOR 4
|
||||||
#define SXMP_MINOR 2
|
#define SXMP_MINOR 2
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
V2 = 0,
|
||||||
|
V2_1,
|
||||||
|
V_UNKNOWN = 200,
|
||||||
|
} sxmp_proto_version_t;
|
||||||
|
|
||||||
#endif /* __SXMP_VERSION_H__ */
|
#endif /* __SXMP_VERSION_H__ */
|
||||||
|
|
||||||
|
323
lib/link.c
323
lib/link.c
@ -137,7 +137,10 @@ static int __get_channels_list(void *cctx, sexp_t *sx)
|
|||||||
size_t maxlen = 65535 - sizeof(sxmplv2_head_t);
|
size_t maxlen = 65535 - sizeof(sxmplv2_head_t);
|
||||||
size_t ulen = 0;
|
size_t ulen = 0;
|
||||||
|
|
||||||
/* we will avoid S-exp scanning here */
|
/* determine how this function was called */
|
||||||
|
if(!strcmp(sx->list->val, "get-channels-list"))
|
||||||
|
co->cp_version = V2;
|
||||||
|
else co->cp_version = V2_1; /* last supported in this version */
|
||||||
|
|
||||||
/* call the function */
|
/* call the function */
|
||||||
if(ssys->get_rpc_typed_list_tree)
|
if(ssys->get_rpc_typed_list_tree)
|
||||||
@ -154,12 +157,235 @@ static int __get_channels_list(void *cctx, sexp_t *sx)
|
|||||||
ulen += snprintf(buf + ulen, maxlen - ulen, ")");
|
ulen += snprintf(buf + ulen, maxlen - ulen, ")");
|
||||||
msg->mhead.payload_length = ulen + 1;
|
msg->mhead.payload_length = ulen + 1;
|
||||||
|
|
||||||
/* we're ready for messaging mode */
|
if(co->cp_version == V2) {
|
||||||
co->flags |= SXMP_MESSAGINGMODE;
|
/* we're ready for messaging mode */
|
||||||
|
co->flags |= SXMP_MESSAGINGMODE;
|
||||||
|
}
|
||||||
|
|
||||||
return SXE_SUCCESS;
|
return SXE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int __get_streams(void *cctx, sexp_t *sx)
|
||||||
|
{
|
||||||
|
sxlink_t *link = (sxlink_t *)cctx;
|
||||||
|
sxhub_t *hub = link->ssys;
|
||||||
|
sxmsg_t *msg = link->messages[0];
|
||||||
|
char *buf = msg->payload;
|
||||||
|
usrtc_node_t *node, *rpc_node;
|
||||||
|
struct sxstream_description *s_desc;
|
||||||
|
size_t maxlen = 65535 - sizeof(sxmplv2_head_t);
|
||||||
|
size_t ulen = 0, sts = 0;
|
||||||
|
int tcid = 0;
|
||||||
|
|
||||||
|
if(link->cp_version == V_UNKNOWN) /* oops, version doesn't a happy one */
|
||||||
|
return SXE_FAILED;
|
||||||
|
|
||||||
|
if(!hub->streams) { /* no streams provided */
|
||||||
|
__return_nil:
|
||||||
|
ulen = snprintf(buf, maxlen, "(!@s< nil)");
|
||||||
|
msg->mhead.payload_length = ulen + 1;
|
||||||
|
/* and now we're ready to exit from batch mode */
|
||||||
|
link->flags |= SXMP_MESSAGINGMODE;
|
||||||
|
|
||||||
|
return SXE_SUCCESS;
|
||||||
|
} else { /* set all streams available */
|
||||||
|
ulen += snprintf(buf + ulen, maxlen - ulen, "(!@s< ");
|
||||||
|
|
||||||
|
for(node = usrtc_first(hub->streams); node != NULL; node = usrtc_next(hub->streams, node)) {
|
||||||
|
s_desc = (struct sxstream_description *)usrtc_node_getdata(node);
|
||||||
|
tcid = s_desc->pcid;
|
||||||
|
if((rpc_node = usrtc_lookup(link->rpc_list, &tcid))) { /* channel allowed */
|
||||||
|
/* ids */
|
||||||
|
ulen += snprintf(buf + ulen, maxlen - ulen, "(:tcid %d :stid %d :t ", s_desc->pcid,
|
||||||
|
s_desc->stid);
|
||||||
|
/* type */
|
||||||
|
switch(s_desc->type) {
|
||||||
|
case 0:
|
||||||
|
ulen += snprintf(buf + ulen, maxlen - ulen, "e ");
|
||||||
|
break;
|
||||||
|
case SXE_O_NAMED:
|
||||||
|
ulen += snprintf(buf + ulen, maxlen - ulen, "n ");
|
||||||
|
break;
|
||||||
|
case SXE_O_BINARY:
|
||||||
|
ulen += snprintf(buf + ulen, maxlen - ulen, "b ");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
/* flags */
|
||||||
|
ulen += snprintf(buf + ulen, maxlen - ulen, ":a ");
|
||||||
|
if(s_desc->flags & SXE_O_READ)
|
||||||
|
ulen += snprintf(buf + ulen, maxlen - ulen, "r");
|
||||||
|
else ulen += snprintf(buf + ulen, maxlen - ulen, "-");
|
||||||
|
if(s_desc->flags & SXE_O_WRITE)
|
||||||
|
ulen += snprintf(buf + ulen, maxlen - ulen, "w");
|
||||||
|
else ulen += snprintf(buf + ulen, maxlen - ulen, "-");
|
||||||
|
if(s_desc->flags & SXE_O_TRUNC)
|
||||||
|
ulen += snprintf(buf + ulen, maxlen - ulen, "t");
|
||||||
|
else ulen += snprintf(buf + ulen, maxlen - ulen, "-");
|
||||||
|
if(s_desc->flags & SXE_O_ASYNC)
|
||||||
|
ulen += snprintf(buf + ulen, maxlen - ulen, "a");
|
||||||
|
else ulen += snprintf(buf + ulen, maxlen - ulen, "-");
|
||||||
|
|
||||||
|
ulen += snprintf(buf + ulen, maxlen - ulen, ")");
|
||||||
|
|
||||||
|
sts++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if(!sts) goto __return_nil;
|
||||||
|
|
||||||
|
ulen += snprintf(buf + ulen, maxlen - ulen, ")");
|
||||||
|
msg->mhead.payload_length = ulen + 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
link->flags |= SXMP_MESSAGINGMODE;
|
||||||
|
|
||||||
|
return SXE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static long __cmp_uint16(const void *a, const void *b)
|
||||||
|
{
|
||||||
|
return (long) *(uint16_t*)a - *(uint16_t*)b;
|
||||||
|
}
|
||||||
|
|
||||||
|
#define _MOD_STID 0xa
|
||||||
|
#define _MOD_TCID 0xb
|
||||||
|
#define _MOD_TYPE 0xc
|
||||||
|
#define _MOD_FLAGS 0xd
|
||||||
|
#define _MOD_UNKWN 0x0
|
||||||
|
|
||||||
|
static int __set_streams(void *cctx, sexp_t *sx)
|
||||||
|
{
|
||||||
|
register int idx, iidx;
|
||||||
|
usrtc_t *tree = NULL;
|
||||||
|
sxlink_t *link = (sxlink_t *)cctx;
|
||||||
|
sexp_t *isx, *iisx;
|
||||||
|
struct sxstream_description *s_desc;
|
||||||
|
usrtc_node_t *node = NULL;
|
||||||
|
int modp = 0, r, value_mod = 0, mod = _MOD_UNKWN;
|
||||||
|
int flags, type, stid, pcid;
|
||||||
|
|
||||||
|
if(!link->remote_streams) {
|
||||||
|
tree = malloc(sizeof(usrtc_t));
|
||||||
|
if(!tree) return SXE_ENOMEM;
|
||||||
|
usrtc_init(tree, USRTC_SPLAY, MAX_STREAMS_TYPES, __cmp_uint16);
|
||||||
|
link->remote_streams = tree;
|
||||||
|
}
|
||||||
|
|
||||||
|
SEXP_ITERATE_LIST(sx, isx, idx) {
|
||||||
|
if(!idx) continue;
|
||||||
|
|
||||||
|
if(isx->ty != SEXP_LIST) {
|
||||||
|
free(tree);
|
||||||
|
link->remote_streams = NULL;
|
||||||
|
if(!strcmp(isx->val, "nil")) goto __fini;
|
||||||
|
else return SXE_BADPROTO;
|
||||||
|
} else {
|
||||||
|
SEXP_ITERATE_LIST(isx, iisx, iidx) {
|
||||||
|
if(iisx->ty == SEXP_LIST) {
|
||||||
|
r = SXE_BADPROTO;
|
||||||
|
__clean_up_on_error:
|
||||||
|
/* clean up all stuff */
|
||||||
|
for(node = usrtc_first(tree); node != NULL; node = usrtc_first(tree)) {
|
||||||
|
s_desc = (struct sxstream_description *)usrtc_node_getdata(node);
|
||||||
|
usrtc_delete(tree, node);
|
||||||
|
free(s_desc);
|
||||||
|
}
|
||||||
|
free(tree);
|
||||||
|
link->remote_streams = NULL;
|
||||||
|
|
||||||
|
return r;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(iisx->val[0] == ':' && value_mod) {
|
||||||
|
r = SXE_BADPROTO;
|
||||||
|
goto __clean_up_on_error;
|
||||||
|
} else if(iisx->val[0] == ':' && !value_mod) {
|
||||||
|
value_mod = 1;
|
||||||
|
if(!strcmp(iisx->val, ":stid")) mod = _MOD_STID;
|
||||||
|
else if(!strcmp(iisx->val, ":tcid")) mod = _MOD_TCID;
|
||||||
|
else if(!strcmp(iisx->val, ":t")) mod = _MOD_TYPE;
|
||||||
|
else if(!strcmp(iisx->val, ":a")) mod = _MOD_FLAGS;
|
||||||
|
else {
|
||||||
|
r = SXE_BADPROTO;
|
||||||
|
goto __clean_up_on_error;
|
||||||
|
}
|
||||||
|
} else if(iisx->val[0] != ':' && value_mod) {
|
||||||
|
switch(mod) {
|
||||||
|
case _MOD_STID:
|
||||||
|
stid = atoi(iisx->val);
|
||||||
|
modp++;
|
||||||
|
break;
|
||||||
|
case _MOD_TCID:
|
||||||
|
pcid = atoi(iisx->val);
|
||||||
|
modp++;
|
||||||
|
break;
|
||||||
|
case _MOD_TYPE:
|
||||||
|
switch(iisx->val[0]) {
|
||||||
|
case 'e': type = 0; break;
|
||||||
|
case 'b': type = SXE_O_BINARY; break;
|
||||||
|
case 'n': type = SXE_O_NAMED; break;
|
||||||
|
default:
|
||||||
|
r = SXE_BADPROTO;
|
||||||
|
goto __clean_up_on_error;
|
||||||
|
}
|
||||||
|
modp++;
|
||||||
|
break;
|
||||||
|
case _MOD_FLAGS:
|
||||||
|
flags = 0;
|
||||||
|
if(iisx->val[0] == 'r') flags |= SXE_O_READ;
|
||||||
|
else if(iisx->val[1] == 'w') flags |= SXE_O_WRITE;
|
||||||
|
else if(iisx->val[2] == 't') flags |= SXE_O_TRUNC;
|
||||||
|
else if(iisx->val[3] == 'a') flags |= SXE_O_ASYNC;
|
||||||
|
else {
|
||||||
|
r = SXE_BADPROTO;
|
||||||
|
goto __clean_up_on_error;
|
||||||
|
}
|
||||||
|
modp++;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
value_mod = 0;
|
||||||
|
} else {
|
||||||
|
r = SXE_BADPROTO;
|
||||||
|
goto __clean_up_on_error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/* here we go */
|
||||||
|
if(modp < 4) {
|
||||||
|
r = SXE_BADPROTO;
|
||||||
|
goto __clean_up_on_error;
|
||||||
|
} else if(!(s_desc = malloc(sizeof(struct sxstream_description)))) {
|
||||||
|
r = SXE_ENOMEM;
|
||||||
|
goto __clean_up_on_error;
|
||||||
|
}
|
||||||
|
|
||||||
|
usrtc_node_init(&s_desc->node, s_desc);
|
||||||
|
s_desc->ops = NULL;
|
||||||
|
|
||||||
|
s_desc->flags = (uint16_t)flags;
|
||||||
|
s_desc->type = (uint16_t)type;
|
||||||
|
s_desc->pcid = (uint16_t)pcid;
|
||||||
|
s_desc->stid = (uint16_t)stid;
|
||||||
|
|
||||||
|
usrtc_insert(tree, &s_desc->node, &s_desc->stid);
|
||||||
|
|
||||||
|
/* finish */
|
||||||
|
modp = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
__fini:
|
||||||
|
link->flags |= SXMP_MESSAGINGMODE;
|
||||||
|
link->flags &= ~SXMP_BATCHMODE;
|
||||||
|
|
||||||
|
return SXE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
#undef _MOD_STID
|
||||||
|
#undef _MOD_TCID
|
||||||
|
#undef _MOD_TYPE
|
||||||
|
#undef _MOD_FLAGS
|
||||||
|
#undef _MOD_UNKWN
|
||||||
|
|
||||||
static int __set_channels_list(void *cctx, sexp_t *sx)
|
static int __set_channels_list(void *cctx, sexp_t *sx)
|
||||||
{
|
{
|
||||||
register int idx;
|
register int idx;
|
||||||
@ -193,9 +419,45 @@ static int __set_channels_list(void *cctx, sexp_t *sx)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* we're ready for messaging mode */
|
if(co->cp_version == V2) {
|
||||||
co->flags |= SXMP_MESSAGINGMODE;
|
/* we're ready for messaging mode */
|
||||||
co->flags &= ~SXMP_BATCHMODE;
|
co->flags |= SXMP_MESSAGINGMODE;
|
||||||
|
co->flags &= ~SXMP_BATCHMODE;
|
||||||
|
}
|
||||||
|
|
||||||
|
return SXE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int __my_version_ack(void *cctx, sexp_t *sx)
|
||||||
|
{
|
||||||
|
register int idx;
|
||||||
|
sxlink_t *link = (sxlink_t *)cctx;
|
||||||
|
sexp_t *isx;
|
||||||
|
|
||||||
|
SEXP_ITERATE_LIST(sx, isx, idx) {
|
||||||
|
if(!idx) continue;
|
||||||
|
if(idx > 2) return SXE_BADPROTO;
|
||||||
|
if(isx->ty == SEXP_LIST) return SXE_BADPROTO;
|
||||||
|
if(!strcmp(isx->val, "v2_1")) link->cp_version = V2_1;
|
||||||
|
else link->cp_version = V_UNKNOWN;
|
||||||
|
}
|
||||||
|
|
||||||
|
return SXE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int __my_version_set(void *cctx, sexp_t *sx)
|
||||||
|
{
|
||||||
|
register int idx;
|
||||||
|
sxlink_t *link = (sxlink_t *)cctx;
|
||||||
|
sexp_t *isx;
|
||||||
|
|
||||||
|
SEXP_ITERATE_LIST(sx, isx, idx) {
|
||||||
|
if(!idx) continue;
|
||||||
|
if(idx > 2) return SXE_BADPROTO;
|
||||||
|
if(isx->ty == SEXP_LIST) return SXE_BADPROTO;
|
||||||
|
if(!strcmp(isx->val, "v2_1")) link->cp_version = V2_1;
|
||||||
|
else return SXE_FAILED; /* failed to set another version */
|
||||||
|
}
|
||||||
|
|
||||||
return SXE_SUCCESS;
|
return SXE_SUCCESS;
|
||||||
}
|
}
|
||||||
@ -204,9 +466,21 @@ static int __init_systemrpc_tree(usrtc_t *rtree)
|
|||||||
{
|
{
|
||||||
/* batch mode negotiation context functions */
|
/* batch mode negotiation context functions */
|
||||||
if(__insert_rpc_function(rtree, "auth-set-credentials", __set_credentials)) goto __fail;
|
if(__insert_rpc_function(rtree, "auth-set-credentials", __set_credentials)) goto __fail;
|
||||||
if(__insert_rpc_function(rtree, "get-channels-list", __get_channels_list)) goto __fail;
|
if(__insert_rpc_function(rtree, "get-channels-list", __get_channels_list)) goto __fail; /* old V2 (v1 also) */
|
||||||
if(__insert_rpc_function(rtree, "set-channels-list", __set_channels_list)) goto __fail;
|
if(__insert_rpc_function(rtree, "set-channels-list", __set_channels_list)) goto __fail;
|
||||||
|
|
||||||
|
/* sync functions */
|
||||||
|
/* channels */
|
||||||
|
if(__insert_rpc_function(rtree, "!@c>", __get_channels_list)) goto __fail;
|
||||||
|
if(__insert_rpc_function(rtree, "!@c<", __set_channels_list)) goto __fail;
|
||||||
|
/* version */
|
||||||
|
if(__insert_rpc_function(rtree, "!@v>", __my_version_ack)) goto __fail;
|
||||||
|
if(__insert_rpc_function(rtree, "!@v<", __my_version_ack)) goto __fail;
|
||||||
|
if(__insert_rpc_function(rtree, "!@V>", __my_version_set)) goto __fail;
|
||||||
|
/* streams */
|
||||||
|
if(__insert_rpc_function(rtree, "!@s>", __get_streams)) goto __fail;
|
||||||
|
if(__insert_rpc_function(rtree, "!@s<", __set_streams)) goto __fail;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
__fail:
|
__fail:
|
||||||
@ -264,9 +538,19 @@ int sxhub_init(sxhub_t *ssys)
|
|||||||
|
|
||||||
int sxhub_free(sxhub_t *ssys)
|
int sxhub_free(sxhub_t *ssys)
|
||||||
{
|
{
|
||||||
|
usrtc_node_t *node;
|
||||||
|
|
||||||
__destroy_rpc_list_tree(ssys->system_rpc->rpc_tree);
|
__destroy_rpc_list_tree(ssys->system_rpc->rpc_tree);
|
||||||
free(ssys->system_rpc->rpc_tree);
|
free(ssys->system_rpc->rpc_tree);
|
||||||
free(ssys->system_rpc);
|
free(ssys->system_rpc);
|
||||||
|
|
||||||
|
/* free streams description tree */
|
||||||
|
if(ssys->streams) {
|
||||||
|
for(node = usrtc_first(ssys->streams); node != NULL; node = usrtc_first(ssys->streams))
|
||||||
|
usrtc_delete(ssys->streams, node);
|
||||||
|
free(ssys->streams);
|
||||||
|
}
|
||||||
|
|
||||||
free(ssys->links);
|
free(ssys->links);
|
||||||
pthread_rwlock_destroy(&(ssys->rwlock));
|
pthread_rwlock_destroy(&(ssys->rwlock));
|
||||||
SSL_CTX_free(ssys->ctx);
|
SSL_CTX_free(ssys->ctx);
|
||||||
@ -323,6 +607,31 @@ int sxhub_setsslserts(sxhub_t *ssys, const char *rootca,
|
|||||||
return r;
|
return r;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int sxhub_stream_register(sxhub_t *hub, const struct sxstream_description *s_desc)
|
||||||
|
{
|
||||||
|
usrtc_t *tree = NULL;
|
||||||
|
usrtc_node_t *node = NULL;
|
||||||
|
|
||||||
|
if(!hub->streams) {
|
||||||
|
tree = malloc(sizeof(usrtc_t));
|
||||||
|
if(!tree) return ENOMEM;
|
||||||
|
usrtc_init(tree, USRTC_SPLAY, MAX_STREAMS_TYPES, __cmp_uint16);
|
||||||
|
hub->streams = tree;
|
||||||
|
} else {
|
||||||
|
tree = hub->streams;
|
||||||
|
node = usrtc_lookup(tree, &s_desc->stid);
|
||||||
|
if(node) return EEXIST;
|
||||||
|
}
|
||||||
|
|
||||||
|
node = (usrtc_node_t *)&s_desc->node;
|
||||||
|
usrtc_node_init(node, (void *)s_desc);
|
||||||
|
usrtc_insert(tree, node, &s_desc->stid);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* errors */
|
||||||
|
|
||||||
struct __scerrcode {
|
struct __scerrcode {
|
||||||
int code;
|
int code;
|
||||||
const char *desc;
|
const char *desc;
|
||||||
|
@ -516,6 +516,8 @@ static int __eval_syssexp(sxlink_t *co, sexp_t *sx)
|
|||||||
rpcf = sx->list->val;
|
rpcf = sx->list->val;
|
||||||
else return SXE_BADPROTO;
|
else return SXE_BADPROTO;
|
||||||
|
|
||||||
|
/* TODO: add builtin functions processing */
|
||||||
|
|
||||||
/* find an appropriate function */
|
/* find an appropriate function */
|
||||||
node = usrtc_lookup(rpc_list->rpc_tree, rpcf);
|
node = usrtc_lookup(rpc_list->rpc_tree, rpcf);
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user