You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
libsxmp/include/sxmp/sxmp.h

480 lines
15 KiB
C

/*
* Secure X Message Passing Library v2 implementation.
* (sxmplv2) it superseed all versions before due to the:
* - memory consumption
* - new features such as pulse emitting
* - performance optimization
*
* (c) Askele Group 2013-2015 <http://askele.com>
* (c) Alexander Vdolainen 2013-2015,2016 <avdolainen@zoho.com>
*
* libsxmp is free software: you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* libsxmp is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.";
*
*/
#ifndef __SXMP_SXMPLV2_H__
#define __SXMP_SXMPLV2_H__
#include <stdint.h>
#include <time.h>
#include <sys/types.h>
#include <pthread.h>
#include <openssl/ssl.h>
#include <tdata/usrtc.h>
#include <tdata/idx_allocator.h>
#include <tdata/list.h>
#include <sexpr/sexp.h>
#include <sexpr/faststack.h>
#include <sxmp/errno.h>
#include <sxmp/version.h>
#include <sxmp/limits.h>
#define VERIFY_DEPTH 1 /* FIXME: */
typedef struct __session_context_type {
char *login;
char *passwd;
uint64_t certid;
struct in_addr *addr;
void *priv;
} sxsession_ctx_t;
/* 8 byte header */
typedef struct __sxmplv2_head_type {
uint16_t msgid;
uint16_t payload_length;
uint8_t attr;
uint8_t opcode;
uint16_t reserve;
}__attribute__((packed)) sxmplv2_head_t;
struct __sxhub_type;
struct __sxchannel_t;
struct __sxmsg_t;
/* flags for the connection link */
#define SXMP_BATCHMODE (1 << 1)
#define SXMP_MESSAGINGMODE (1 << 2)
#define SXMP_ALIVE (1 << 3)
#define SXMP_CLOSED (1 << 4)
/*
* sxlink - the main abstraction for connection between peers
*/
typedef struct __sxlink_t {
/* General section */
struct __sxhub_type *hub; /* < hub subsystem */
char *uuid; /** < uuid of the link */
/* Channels section */
idx_allocator_t idx_ch; /** < index allocation for channels */
pthread_mutex_t idx_ch_lock; /** < mutex for allocating and deallocating channels */
struct __sxchannel_t **channels; /** < channels O(1) storage */
/* RPC section */
usrtc_t *rpc_list; /** < search tree of possible RPC typed lists */
/* SSL related section */
SSL *ssl; /** < SSL connection */
int ssl_data_index; /** < SSL index for the custom data */
pthread_mutex_t sslinout[2]; /** < SSL related locks for in and out */
/* Security section */
sxsession_ctx_t *pctx; /** < higher layer authentification and session context */
/* Messages section */
struct __sxmsg_t **messages; /** < messages O(1) storage */
idx_allocator_t idx_msg;
pthread_mutex_t idx_msg_lock;
list_head_t write_pending; /** < list of messages waiting for write */
pthread_mutex_t write_pending_lock;
volatile uint8_t pending_messages; /** < pending message count */
/* stream part */
struct sxstream_opened **streams;
idx_allocator_t idx_streams;
pthread_mutex_t idx_streams_lock;
usrtc_t *remote_streams; /** < streams from remote host */
/* Other stuff */
sxmp_proto_version_t cp_version;
pthread_t thrd_poll[MAX_SXMPLTHREADS];
volatile uint8_t flags; /** < flags of the connection */
volatile uint8_t usecount; /** < use count for the connection link */
usrtc_node_t csnode; /** < node to store the link within list */
} sxlink_t;
#define sxlink_getpctx(c) (c)->pctx
#define sxlink_getpriv(c) (c)->pctx->priv
#define sxlink_setpriv(c, p) (c)->pctx->priv = (void *)(p)
struct __link_rpc_list_type;
typedef struct __pp_msg_type { /* postponed message entry */
struct __sxmsg_t *msg;
list_node_t node;
} sxppmsg_t;
typedef struct __sxchannel_t {
uint16_t cid; /** < ID of the channel */
sxlink_t *link; /** < pointer to the underlying link */
struct __link_rpc_list_type *rpc_list; /** < rpc functions list */
int flags; /** < flags of the channel */
int type_id; /** < channel type id */
} sxchnl_t;
/* message flags */
#define SXMSG_OPEN (1 << 1)
#define SXMSG_CLOSED (1 << 2)
#define SXMSG_PROTO (1 << 3)
#define SXMSG_LINK (1 << 4)
#define SXMSG_REPLYREQ (1 << 5)
#define SXMSG_PULSE (1 << 6)
#define SXMSG_TIMEDOUT (1 << 7)
/**
* \brief Message used in sxmp message passing
*
* This structure used to manage a message within a channel
* of the sxmp structure stack.
*/
typedef struct __sxmsg_t {
sxchnl_t *pch; /** < channel of the message(if applicable) */
pthread_mutex_t wait; /** < special wait mutex, used for pending list and sync */
sxmplv2_head_t mhead; /** < last actual head of the message */
void *payload; /** < payload */
} sxmsg_t;
#define sxmsg_payload(m) (m)->payload
#define sxmsg_datalen(m) (m)->mhead.payload_length
#define sxmsg_rapidbuf(m) (m)->payload
#define sxmsg_retcode(m) (m)->mhead.opcode
#define sxmsg_waitlock(m) pthread_mutex_lock(&((m)->wait))
#define sxmsg_waitunlock(m) pthread_mutex_unlock(&((m)->wait))
#ifdef __cplusplus
extern "C" {
#endif
static inline sxlink_t *sxmsg_link(sxmsg_t *msg) {
return msg->pch->link;
}
#ifdef __cplusplus
}
#endif
typedef struct __link_rpc_entry_type {
char *name;
int (*rpcf)(void *, sexp_t *);
usrtc_node_t node;
} sxl_rpc_t;
typedef struct __link_rpc_list_type {
usrtc_t *rpc_tree; /** < search tree for the rpc lookup */
char *opt_version; /** < reserved for future implementations */
} sxl_rpclist_t;
struct sxstream_ops;
/*
* description of the stream
* located on the master and client,
* client will get it after sync
*/
struct sxstream_description {
uint16_t stid; /** < stream type ID */
uint16_t pcid; /** < pinned channel type ID */
uint16_t type; /** < type: 0||SXE_O_NAMED||SXE_O_BINARY */
uint16_t flags; /** < possible flags */
struct sxstream_ops *ops; /** < operations */
list_head_t *named; /** < named entries order */
usrtc_node_t node; /** < internal node struct */
};
struct sxstream_opened {
uint64_t sto_id;
struct sxstream_description *desc;
union {
void *map_data;
list_head_t *ent_buf;
};
void *priv;
uint64_t off;
uint16_t flags;
uint16_t pin_channelid;
};
#define sxstream_opened_setpriv(n, p) (n)->priv = (p)
#define sxstream_opened_getpriv(n) (n)->priv
#define sxstream_opened_getelist(n) (n)->ent_buf
struct sxstream_ops {
int (*s_open)(sxlink_t *, struct sxstream_opened *, const char *);
int (*s_close)(struct sxstream_opened *);
union {
size_t (*s_bread)(struct sxstream_opened *, size_t, uint64_t, void *);
size_t (*s_eread)(struct sxstream_opened *, size_t, uint64_t, list_head_t **);
};
union {
size_t (*s_bwrite)(struct sxstream_opened *, size_t, uint64_t, const void *);
size_t (*s_ewrite)(struct sxstream_opened *, uint64_t, list_head_t *);
};
};
typedef enum {
SXCRITICAL_LOG = 0,
SXERROR_LOG,
SXWARNING_LOG,
SXINFO_LOG,
SXDEBUG_LOG,
} sxlogtype_t;
/**
* \brief Hub subsystem structure.
*
* This structure used for management and control a set of a
* determined links with the same RPC lists and the same
* mode (server, client).
*
*/
typedef struct __sxhub_type {
usrtc_t *links;
usrtc_t *streams;
pthread_rwlock_t rwlock;
char *rootca, *certpem, *certkey; /* path name to the certificates */
sxl_rpclist_t *system_rpc;
sxl_rpclist_t *stream_rpc;
/* special functions pointers */
int (*validate_sslpem)(sxlink_t *); /** < this function used to validate SSL certificate while SSL handshake */
int (*secure_check)(sxlink_t *); /** < this function authorize user to login,
* and also should check SSL cert and user, and already made sessions */
usrtc_t* (*get_rpc_typed_list_tree)(sxlink_t *); /** < this function is used to set RPC list of the functions */
int (*set_typed_list_callback)(sxlink_t *, int, char *); /** < this function is a callback
* during setting up a typed channel */
void (*on_destroy)(sxlink_t *); /** < callback on connection destroy */
void (*on_pulse)(sxlink_t *, sexp_t *); /** < callback on pulse emit */
int (*log)(const sxlogtype_t, const char *, ...); /** < this function is used to output logs with user way */
SSL_CTX *ctx; /** < SSL context */
void *priv;
} sxhub_t;
#define sxhub_set_sslvalidate(c, f) (c)->validate_sslpem = (f)
#define sxhub_set_authcheck(c, f) (c)->secure_check = (f)
#define sxhub_set_rpcvalidator(c, f) (c)->get_rpc_typed_list_tree = (f)
#define sxhub_set_channelcall(c, f) (c)->set_typed_list_callback = (f)
#define sxhub_set_ondestroy(c, f) (c)->on_destroy = (f)
#define sxhub_set_onpulse(c, f) (c)->on_pulse = (f)
#define sxhub_set_priv(c, p) (c)->priv = (p)
#define sxhub_get_priv(c) (c)->priv
#define sxhub_set_logops(c, f) (c)->log = (f)
/* this macro is used to make calling sxhub log function easier */
#define sxlink_log(c, type, fmt, ...) \
(c)->hub ? (c)->hub->log((type), (fmt), ## __VA_ARGS__) : \
fprintf(stderr, (fmt), ## __VA_ARGS__);
typedef struct __rpc_typed_list_type {
int type_id;
char *description;
sxl_rpclist_t *rpc_list;
usrtc_node_t lnode;
} rpc_typed_list_t;
/* streams */
/* flags for streams */
#define SXE_O_BINARY (1 << 1)
#define SXE_O_READ (1 << 2)
#define SXE_O_WRITE (1 << 3)
#define SXE_O_ASYNC (1 << 4)
#define SXE_O_TRUNC (1 << 5)
#define SXE_O_NAMED (1 << 6)
#define SXE_O_OCHANNELED (1 << 7)
/* sxstream_t used to access stream i.e. this is something returned on
* open operations.
* you should care about closing it, because this ones doesn't tracked.
*/
typedef struct __sxtream_type {
sxlink_t *link;
sxchnl_t *channel;
int flags;
uint64_t sid;
union {
sexp_t *sx;
void *ebuf;
};
/* for reading */
union {
struct {
void *data;
uintptr_t cur_offset;
};
struct {
list_head_t *entries;
list_node_t *cur;
};
};
list_head_t *named;
} sxstream_t;
struct _nn_stream_entry_node {
char *value;
list_node_t node;
};
struct _stream_entry_node {
char *name;
char *value;
list_node_t node;
};
struct _stream_named_order_node {
int order;
char *name;
list_node_t node;
};
struct _stream_list_node {
list_head_t *list;
list_node_t node;
};
#ifdef __cplusplus
extern "C" {
#endif
/* API */
int sxmp_init(void);
void sxmp_finalize(void);
int sxhub_init(sxhub_t *ssys);
sxhub_t *sxhub_create(void);
int sxhub_destroy(sxhub_t *ssys);
int sxhub_free(sxhub_t *ssys);
int sxhub_setsslserts(sxhub_t *ssys, const char *rootca,
const char *certpem, const char *certkey);
/* create links */
sxlink_t *sxlink_master_accept(sxhub_t *hub, int sck, struct in_addr *addr);
sxlink_t *sxlink_connect(sxhub_t *hub, const char *host,
int port, const char *SSL_cert, const char *login,
const char *passwd);
sxlink_t *sxlink_connect_at(sxhub_t *hub, const char *host,
int port, const char *SSL_cert, const char *login,
const char *passwd, const void *priv);
int sxlink_close(sxlink_t *co);
/* channels */
sxchnl_t *sxchannel_open(sxlink_t *co, int type);
int sxchannel_close(sxchnl_t *channel);
/* messages */
/*
* creates a message with a payload.
* Will return a error code, and, if applicable, pointer to message
*/
int sxmsg_send(sxchnl_t *channel, const char *data, size_t datalen, sxmsg_t **msg);
/* the same - postponed message i.e. will be written to the queue - not to write immendatly */
int sxmsg_send_pp(sxchnl_t *channel, const char *data, size_t datalen, sxmsg_t **msg);
/* send a pulse message */
int sxmsg_pulse(sxlink_t *co, const char *data, size_t datalen);
int sxmsg_reply(sxmsg_t *msg, const char *data, size_t datalen);
int sxmsg_reply_pp(sxmsg_t *msg, const char *data, size_t datalen);
int sxmsg_rreply(sxmsg_t *msg, size_t datalen);
int sxmsg_return(sxmsg_t *msg, int opcode);
int sxmsg_return_pp(sxmsg_t *msg, int opcode);
void sxmsg_clean(sxmsg_t *msg);
/* streams */
/*
* NOTE: sxstream operations (except opening process i.e. few threads
* can open different streams) are NOT _thread-safe_, please
* care about that in your own application.
*/
sxstream_t *sxstream_open(sxlink_t *link, const char *opt, int stid, int flags);
sxstream_t *sxstream_openwch(sxlink_t *link, sxchnl_t *channel, const char *opt, int stid, int flags);
int sxstream_close(sxstream_t *stream);
/* binary stream operations */
size_t sxstream_bread(sxstream_t *stream, void *buf, size_t buf_len);
size_t sxstream_breadat(sxstream_t *stream, void *buf, size_t buf_len, off_t off);
size_t sxstream_bwriteat(sxstream_t *stream, const void *buf, size_t buf_len, off_t off);
size_t sxstream_bsyncat(sxstream_t *stream, void *buf, size_t buf_len, off_t off);
/*
* NOTE: returning value for reading from entry streams
* shouldn't be managed, it's done by close operation.
* That means you should care about ridden values, since
* it might be freed on the next read operation.
*/
/* entry nonamed stream ops */
list_head_t *sxstream_read(sxstream_t *stream);
/* entry named stream ops */
list_head_t *sxstream_readnamed(sxstream_t *stream);
/* provider-side functions for streams */
int sxhub_stream_register(sxhub_t *hub, const struct sxstream_description *s_desc);
#ifdef __cplusplus
}
#endif
/* RPC List API */
#define SXMP_FILTER_INC 0xa
#define SXMP_FILTER_EXC 0xb
#define SXMP_FILTER_END -1
#ifdef __cplusplus
extern "C" {
#endif
int sxmp_rpclist_init(usrtc_t *tree);
int sxmp_rpclist_add(usrtc_t *tree, int type, const char *description,
const char *version);
int sxmp_rpclist_add_function(usrtc_t *tree, int type, const char *fu_name,
int (*rpcf)(void *, sexp_t *));
int sxmp_rpclist_filter(usrtc_t *source, usrtc_t **dest, int flag, int *filter);
/* generic sxstream list functions */
list_head_t *sxstream_generic_slist_alloc(void);
void sxstream_generic_slist_free(list_head_t *list);
int sxstream_generic_slist_additem(list_head_t *list, const char *value);
void sxstream_generic_named_free(list_head_t *list);
int sxstream_generic_nmlist_additem(list_head_t *list, const char *name,
int order);
const char *sxstream_generic_named_lookupname(list_head_t *list, int order);
int sxstream_generic_named_lookuporder(list_head_t *list, const char *name);
#ifdef __cplusplus
}
#endif
#define sxstream_generic_named_alloc() \
sxstream_generic_slist_alloc()
#define blub(txt) fprintf(stderr, "%s:%d in %s > %s\n", __FILE__, __LINE__, __FUNCTION__, txt)
#define dumphead(head) fprintf(stderr, "id: %d, opcode: %d, attr: %d, len = %d, reserve: %d\n", (head)->msgid, (head)->opcode, (head)->attr, (head)->payload_length, (head)->reserve)
#endif /* __SXMP_SXMPLV2_H__ */