/* * Secure X Message Passing Library v2 implementation. * (sxmplv2) it superseed all versions before due to the: * - memory consumption * - new features such as pulse emitting * - performance optimization * * (c) Askele Group 2013-2015 * (c) Alexander Vdolainen 2013-2015,2016 * * libsxmp is free software: you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as published * by the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * libsxmp is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * See the GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with this program. If not, see ."; * */ #ifndef __SXMP_SXMPLV2_H__ #define __SXMP_SXMPLV2_H__ #include #include #include #include #include #include #include #include #include #include #include #include #include #define VERIFY_DEPTH 1 /* FIXME: */ typedef struct __session_context_type { char *login; char *passwd; uint64_t certid; struct in_addr *addr; void *priv; } sxsession_ctx_t; /* 8 byte header */ typedef struct __sxmplv2_head_type { uint16_t msgid; uint16_t payload_length; uint8_t attr; uint8_t opcode; uint16_t reserve; }__attribute__((packed)) sxmplv2_head_t; struct __sxhub_type; struct __sxchannel_t; struct __sxmsg_t; /* flags for the connection link */ #define SXMP_BATCHMODE (1 << 1) #define SXMP_MESSAGINGMODE (1 << 2) #define SXMP_ALIVE (1 << 3) #define SXMP_CLOSED (1 << 4) /* * sxlink - the main abstraction for connection between peers */ typedef struct __sxlink_t { /* General section */ struct __sxhub_type *hub; /* < hub subsystem */ char *uuid; /** < uuid of the link */ /* Channels section */ idx_allocator_t idx_ch; /** < index allocation for channels */ pthread_mutex_t idx_ch_lock; /** < mutex for allocating and deallocating channels */ struct __sxchannel_t **channels; /** < channels O(1) storage */ /* RPC section */ usrtc_t *rpc_list; /** < search tree of possible RPC typed lists */ /* SSL related section */ SSL *ssl; /** < SSL connection */ int ssl_data_index; /** < SSL index for the custom data */ pthread_mutex_t sslinout[2]; /** < SSL related locks for in and out */ /* Security section */ sxsession_ctx_t *pctx; /** < higher layer authentification and session context */ /* Messages section */ struct __sxmsg_t **messages; /** < messages O(1) storage */ idx_allocator_t idx_msg; pthread_mutex_t idx_msg_lock; list_head_t write_pending; /** < list of messages waiting for write */ pthread_mutex_t write_pending_lock; volatile uint8_t pending_messages; /** < pending message count */ /* stream part */ struct sxstream_opened **streams; idx_allocator_t idx_streams; pthread_mutex_t idx_streams_lock; usrtc_t *remote_streams; /** < streams from remote host */ /* Other stuff */ sxmp_proto_version_t cp_version; pthread_t thrd_poll[MAX_SXMPLTHREADS]; volatile uint8_t flags; /** < flags of the connection */ volatile uint8_t usecount; /** < use count for the connection link */ usrtc_node_t csnode; /** < node to store the link within list */ } sxlink_t; #define sxlink_getpctx(c) (c)->pctx #define sxlink_getpriv(c) (c)->pctx->priv #define sxlink_setpriv(c, p) (c)->pctx->priv = (void *)(p) struct __link_rpc_list_type; typedef struct __pp_msg_type { /* postponed message entry */ struct __sxmsg_t *msg; list_node_t node; } sxppmsg_t; typedef struct __sxchannel_t { uint16_t cid; /** < ID of the channel */ sxlink_t *link; /** < pointer to the underlying link */ struct __link_rpc_list_type *rpc_list; /** < rpc functions list */ int flags; /** < flags of the channel */ int type_id; /** < channel type id */ } sxchnl_t; /* message flags */ #define SXMSG_OPEN (1 << 1) #define SXMSG_CLOSED (1 << 2) #define SXMSG_PROTO (1 << 3) #define SXMSG_LINK (1 << 4) #define SXMSG_REPLYREQ (1 << 5) #define SXMSG_PULSE (1 << 6) #define SXMSG_TIMEDOUT (1 << 7) /** * \brief Message used in sxmp message passing * * This structure used to manage a message within a channel * of the sxmp structure stack. */ typedef struct __sxmsg_t { sxchnl_t *pch; /** < channel of the message(if applicable) */ pthread_mutex_t wait; /** < special wait mutex, used for pending list and sync */ sxmplv2_head_t mhead; /** < last actual head of the message */ void *payload; /** < payload */ } sxmsg_t; #define sxmsg_payload(m) (m)->payload #define sxmsg_datalen(m) (m)->mhead.payload_length #define sxmsg_rapidbuf(m) (m)->payload #define sxmsg_retcode(m) (m)->mhead.opcode #define sxmsg_waitlock(m) pthread_mutex_lock(&((m)->wait)) #define sxmsg_waitunlock(m) pthread_mutex_unlock(&((m)->wait)) #ifdef __cplusplus extern "C" { #endif static inline sxlink_t *sxmsg_link(sxmsg_t *msg) { return msg->pch->link; } #ifdef __cplusplus } #endif typedef struct __link_rpc_entry_type { char *name; int (*rpcf)(void *, sexp_t *); usrtc_node_t node; } sxl_rpc_t; typedef struct __link_rpc_list_type { usrtc_t *rpc_tree; /** < search tree for the rpc lookup */ char *opt_version; /** < reserved for future implementations */ } sxl_rpclist_t; struct sxstream_ops; /* * description of the stream * located on the master and client, * client will get it after sync */ struct sxstream_description { uint16_t stid; /** < stream type ID */ uint16_t pcid; /** < pinned channel type ID */ uint16_t type; /** < type: 0||SXE_O_NAMED||SXE_O_BINARY */ uint16_t flags; /** < possible flags */ struct sxstream_ops *ops; /** < operations */ list_head_t *named; /** < named entries order */ usrtc_node_t node; /** < internal node struct */ }; struct sxstream_opened { uint64_t sto_id; struct sxstream_description *desc; union { void *map_data; list_head_t *ent_buf; }; void *priv; uint64_t off; uint16_t flags; uint16_t pin_channelid; }; #define sxstream_opened_setpriv(n, p) (n)->priv = (p) #define sxstream_opened_getpriv(n) (n)->priv #define sxstream_opened_getelist(n) (n)->ent_buf struct sxstream_ops { int (*s_open)(sxlink_t *, struct sxstream_opened *, const char *); int (*s_close)(struct sxstream_opened *); union { size_t (*s_bread)(struct sxstream_opened *, size_t, uint64_t, void *); size_t (*s_eread)(struct sxstream_opened *, size_t, uint64_t, list_head_t **); }; union { size_t (*s_bwrite)(struct sxstream_opened *, size_t, uint64_t, const void *); size_t (*s_ewrite)(struct sxstream_opened *, uint64_t, list_head_t *); }; }; typedef enum { SXCRITICAL_LOG = 0, SXERROR_LOG, SXWARNING_LOG, SXINFO_LOG, SXDEBUG_LOG, } sxlogtype_t; /** * \brief Hub subsystem structure. * * This structure used for management and control a set of a * determined links with the same RPC lists and the same * mode (server, client). * */ typedef struct __sxhub_type { usrtc_t *links; usrtc_t *streams; pthread_rwlock_t rwlock; char *rootca, *certpem, *certkey; /* path name to the certificates */ sxl_rpclist_t *system_rpc; sxl_rpclist_t *stream_rpc; /* special functions pointers */ int (*validate_sslpem)(sxlink_t *); /** < this function used to validate SSL certificate while SSL handshake */ int (*secure_check)(sxlink_t *); /** < this function authorize user to login, * and also should check SSL cert and user, and already made sessions */ usrtc_t* (*get_rpc_typed_list_tree)(sxlink_t *); /** < this function is used to set RPC list of the functions */ int (*set_typed_list_callback)(sxlink_t *, int, char *); /** < this function is a callback * during setting up a typed channel */ void (*on_destroy)(sxlink_t *); /** < callback on connection destroy */ void (*on_pulse)(sxlink_t *, sexp_t *); /** < callback on pulse emit */ int (*log)(const sxlogtype_t, const char *, ...); /** < this function is used to output logs with user way */ SSL_CTX *ctx; /** < SSL context */ void *priv; } sxhub_t; #define sxhub_set_sslvalidate(c, f) (c)->validate_sslpem = (f) #define sxhub_set_authcheck(c, f) (c)->secure_check = (f) #define sxhub_set_rpcvalidator(c, f) (c)->get_rpc_typed_list_tree = (f) #define sxhub_set_channelcall(c, f) (c)->set_typed_list_callback = (f) #define sxhub_set_ondestroy(c, f) (c)->on_destroy = (f) #define sxhub_set_onpulse(c, f) (c)->on_pulse = (f) #define sxhub_set_priv(c, p) (c)->priv = (p) #define sxhub_get_priv(c) (c)->priv #define sxhub_set_logops(c, f) (c)->log = (f) /* this macro is used to make calling sxhub log function easier */ #define sxlink_log(c, type, fmt, ...) \ (c)->hub ? (c)->hub->log((type), (fmt), ## __VA_ARGS__) : \ fprintf(stderr, (fmt), ## __VA_ARGS__); typedef struct __rpc_typed_list_type { int type_id; char *description; sxl_rpclist_t *rpc_list; usrtc_node_t lnode; } rpc_typed_list_t; /* streams */ /* flags for streams */ #define SXE_O_BINARY (1 << 1) #define SXE_O_READ (1 << 2) #define SXE_O_WRITE (1 << 3) #define SXE_O_ASYNC (1 << 4) #define SXE_O_TRUNC (1 << 5) #define SXE_O_NAMED (1 << 6) #define SXE_O_OCHANNELED (1 << 7) /* sxstream_t used to access stream i.e. this is something returned on * open operations. * you should care about closing it, because this ones doesn't tracked. */ typedef struct __sxtream_type { sxlink_t *link; sxchnl_t *channel; int flags; uint64_t sid; union { sexp_t *sx; void *ebuf; }; /* for reading */ union { struct { void *data; uintptr_t cur_offset; }; struct { list_head_t *entries; list_node_t *cur; }; }; list_head_t *named; } sxstream_t; struct _nn_stream_entry_node { char *value; list_node_t node; }; struct _stream_entry_node { char *name; char *value; list_node_t node; }; struct _stream_named_order_node { int order; char *name; list_node_t node; }; struct _stream_list_node { list_head_t *list; list_node_t node; }; #ifdef __cplusplus extern "C" { #endif /* API */ int sxmp_init(void); void sxmp_finalize(void); int sxhub_init(sxhub_t *ssys); sxhub_t *sxhub_create(void); int sxhub_destroy(sxhub_t *ssys); int sxhub_free(sxhub_t *ssys); int sxhub_setsslserts(sxhub_t *ssys, const char *rootca, const char *certpem, const char *certkey); /* create links */ sxlink_t *sxlink_master_accept(sxhub_t *hub, int sck, struct in_addr *addr); sxlink_t *sxlink_connect(sxhub_t *hub, const char *host, int port, const char *SSL_cert, const char *login, const char *passwd); sxlink_t *sxlink_connect_at(sxhub_t *hub, const char *host, int port, const char *SSL_cert, const char *login, const char *passwd, const void *priv); int sxlink_close(sxlink_t *co); /* channels */ sxchnl_t *sxchannel_open(sxlink_t *co, int type); int sxchannel_close(sxchnl_t *channel); /* messages */ /* * creates a message with a payload. * Will return a error code, and, if applicable, pointer to message */ int sxmsg_send(sxchnl_t *channel, const char *data, size_t datalen, sxmsg_t **msg); /* the same - postponed message i.e. will be written to the queue - not to write immendatly */ int sxmsg_send_pp(sxchnl_t *channel, const char *data, size_t datalen, sxmsg_t **msg); /* send a pulse message */ int sxmsg_pulse(sxlink_t *co, const char *data, size_t datalen); int sxmsg_reply(sxmsg_t *msg, const char *data, size_t datalen); int sxmsg_reply_pp(sxmsg_t *msg, const char *data, size_t datalen); int sxmsg_rreply(sxmsg_t *msg, size_t datalen); int sxmsg_return(sxmsg_t *msg, int opcode); int sxmsg_return_pp(sxmsg_t *msg, int opcode); void sxmsg_clean(sxmsg_t *msg); /* streams */ /* * NOTE: sxstream operations (except opening process i.e. few threads * can open different streams) are NOT _thread-safe_, please * care about that in your own application. */ sxstream_t *sxstream_open(sxlink_t *link, const char *opt, int stid, int flags); sxstream_t *sxstream_openwch(sxlink_t *link, sxchnl_t *channel, const char *opt, int stid, int flags); int sxstream_close(sxstream_t *stream); /* binary stream operations */ size_t sxstream_bread(sxstream_t *stream, void *buf, size_t buf_len); size_t sxstream_breadat(sxstream_t *stream, void *buf, size_t buf_len, off_t off); size_t sxstream_bwriteat(sxstream_t *stream, const void *buf, size_t buf_len, off_t off); size_t sxstream_bsyncat(sxstream_t *stream, void *buf, size_t buf_len, off_t off); /* * NOTE: returning value for reading from entry streams * shouldn't be managed, it's done by close operation. * That means you should care about ridden values, since * it might be freed on the next read operation. */ /* entry nonamed stream ops */ list_head_t *sxstream_read(sxstream_t *stream); /* entry named stream ops */ list_head_t *sxstream_readnamed(sxstream_t *stream); /* provider-side functions for streams */ int sxhub_stream_register(sxhub_t *hub, const struct sxstream_description *s_desc); #ifdef __cplusplus } #endif /* RPC List API */ #define SXMP_FILTER_INC 0xa #define SXMP_FILTER_EXC 0xb #define SXMP_FILTER_END -1 #ifdef __cplusplus extern "C" { #endif int sxmp_rpclist_init(usrtc_t *tree); int sxmp_rpclist_add(usrtc_t *tree, int type, const char *description, const char *version); int sxmp_rpclist_add_function(usrtc_t *tree, int type, const char *fu_name, int (*rpcf)(void *, sexp_t *)); int sxmp_rpclist_filter(usrtc_t *source, usrtc_t **dest, int flag, int *filter); /* generic sxstream list functions */ list_head_t *sxstream_generic_slist_alloc(void); void sxstream_generic_slist_free(list_head_t *list); int sxstream_generic_slist_additem(list_head_t *list, const char *value); void sxstream_generic_named_free(list_head_t *list); int sxstream_generic_nmlist_additem(list_head_t *list, const char *name, int order); const char *sxstream_generic_named_lookupname(list_head_t *list, int order); int sxstream_generic_named_lookuporder(list_head_t *list, const char *name); #ifdef __cplusplus } #endif #define sxstream_generic_named_alloc() \ sxstream_generic_slist_alloc() #define blub(txt) fprintf(stderr, "%s:%d in %s > %s\n", __FILE__, __LINE__, __FUNCTION__, txt) #define dumphead(head) fprintf(stderr, "id: %d, opcode: %d, attr: %d, len = %d, reserve: %d\n", (head)->msgid, (head)->opcode, (head)->attr, (head)->payload_length, (head)->reserve) #endif /* __SXMP_SXMPLV2_H__ */