/*
 * Secure X Message Passing Library v2 implementation.
 * (sxmplv2) it superseed all versions before due to the:
 * - memory consumption
 * - new features such as pulse emitting
 * - performance optimization
 *
 * (c) Askele Group 2013-2015 <http://askele.com>
 * (c) Alexander Vdolainen 2013-2015,2016 <avdolainen@zoho.com>
 *
 * libsxmp is free software: you can redistribute it and/or modify it
 * under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * libsxmp is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 * See the GNU Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.";
 *
 */

#ifndef __SXMP_SXMPLV2_H__
#define	__SXMP_SXMPLV2_H__

#include <stdint.h>
#include <time.h>
#include <sys/types.h>
#include <pthread.h>

#include <openssl/ssl.h>

#include <tdata/usrtc.h>
#include <tdata/idx_allocator.h>
#include <tdata/list.h>
#include <sexpr/sexp.h>
#include <sexpr/faststack.h>

#include <sxmp/errno.h>
#include <sxmp/version.h>
#include <sxmp/limits.h>

#define VERIFY_DEPTH  1  /* FIXME: */

typedef struct __session_context_type {
  char *login;
  char *passwd;
  uint64_t certid;
  struct in_addr *addr;
  void *priv;
} sxsession_ctx_t;

/* 8 byte header */
typedef struct __sxmplv2_head_type {
  uint16_t msgid;
  uint16_t payload_length;
  uint8_t attr;
  uint8_t opcode;
  uint16_t reserve;
}__attribute__((packed)) sxmplv2_head_t;

struct __sxhub_type;
struct __sxchannel_t;
struct __sxmsg_t;

/* flags for the connection link */
#define SXMP_BATCHMODE      (1 << 1)
#define SXMP_MESSAGINGMODE  (1 << 2)
#define SXMP_ALIVE          (1 << 3)
#define SXMP_CLOSED         (1 << 4)

/*
 * sxlink - the main abstraction for connection between peers
 */
typedef struct __sxlink_t {
  /* General section */
  struct __sxhub_type *hub; /* < hub subsystem */
  char *uuid; /** < uuid of the link */
  /* Channels section */
  idx_allocator_t idx_ch; /** < index allocation for channels */
  pthread_mutex_t idx_ch_lock; /** < mutex for allocating and deallocating channels */
  struct __sxchannel_t **channels; /** < channels O(1) storage */
  /* RPC section */
  usrtc_t *rpc_list; /** < search tree of possible RPC typed lists */
  /* SSL related section */
  SSL *ssl; /** < SSL connection */
  int ssl_data_index; /** < SSL index for the custom data */
  pthread_mutex_t sslinout[2]; /** < SSL related locks for in and out */
  /* Security section */
  sxsession_ctx_t *pctx; /** < higher layer authentification and session context */
  /* Messages section */
  struct __sxmsg_t **messages; /** < messages O(1) storage */
  idx_allocator_t idx_msg;
  pthread_mutex_t idx_msg_lock;
  list_head_t write_pending;  /** < list of messages waiting for write */
  pthread_mutex_t write_pending_lock;
  volatile uint8_t pending_messages; /** < pending message count */
  /* stream part */
  struct sxstream_opened **streams;
  idx_allocator_t idx_streams;
  pthread_mutex_t idx_streams_lock;
  usrtc_t *remote_streams; /** < streams from remote host */
  /* Other stuff */
  sxmp_proto_version_t cp_version;
  pthread_t thrd_poll[MAX_SXMPLTHREADS];
  volatile uint8_t flags; /** < flags of the connection */
  volatile uint8_t usecount; /** < use count for the connection link */
  usrtc_node_t csnode; /** < node to store the link within list */
} sxlink_t;

#define sxlink_getpctx(c)     (c)->pctx
#define sxlink_getpriv(c)     (c)->pctx->priv
#define sxlink_setpriv(c, p)  (c)->pctx->priv = (void *)(p)

struct __link_rpc_list_type;

typedef struct __pp_msg_type { /* postponed message entry */
  struct __sxmsg_t *msg;
  list_node_t node;
} sxppmsg_t;

typedef struct __sxchannel_t {
  uint16_t cid; /** < ID of the channel */
  sxlink_t *link; /** < pointer to the underlying link */
  struct __link_rpc_list_type *rpc_list; /** < rpc functions list */
  int flags; /** < flags of the channel */
  int type_id; /** < channel type id */
} sxchnl_t;

/* message flags */
#define SXMSG_OPEN      (1 << 1)
#define SXMSG_CLOSED    (1 << 2)
#define SXMSG_PROTO     (1 << 3)
#define SXMSG_LINK      (1 << 4)
#define SXMSG_REPLYREQ  (1 << 5)
#define SXMSG_PULSE     (1 << 6)
#define SXMSG_TIMEDOUT  (1 << 7)

/**
 * \brief Message used in sxmp message passing
 *
 * This structure used to manage a message within a channel
 * of the sxmp structure stack.
 */
typedef struct __sxmsg_t {
  sxchnl_t *pch; /** < channel of the message(if applicable) */
  pthread_mutex_t wait; /** < special wait mutex, used for pending list and sync */
  sxmplv2_head_t mhead; /** < last actual head of the message */
  void *payload; /** < payload */
} sxmsg_t;

#define sxmsg_payload(m)   (m)->payload
#define sxmsg_datalen(m)   (m)->mhead.payload_length
#define sxmsg_rapidbuf(m)  (m)->payload
#define sxmsg_retcode(m)   (m)->mhead.opcode
#define sxmsg_waitlock(m)  pthread_mutex_lock(&((m)->wait))
#define sxmsg_waitunlock(m)  pthread_mutex_unlock(&((m)->wait))

#ifdef __cplusplus
extern "C" {
#endif
  static inline sxlink_t *sxmsg_link(sxmsg_t *msg) {
    return msg->pch->link;
  }
#ifdef __cplusplus
}
#endif

typedef struct __link_rpc_entry_type {
  char *name;
  int (*rpcf)(void *, sexp_t *);
  usrtc_node_t node;
} sxl_rpc_t;

typedef struct __link_rpc_list_type {
  usrtc_t *rpc_tree; /** < search tree for the rpc lookup */
  char *opt_version; /** < reserved for future implementations */
} sxl_rpclist_t;

struct sxstream_ops;

/*
 * description of the stream
 * located on the master and client,
 * client will get it after sync
 */
struct sxstream_description {
  uint16_t stid; /** < stream type ID */
  uint16_t pcid; /** < pinned channel type ID */
  uint16_t type; /** < type: 0||SXE_O_NAMED||SXE_O_BINARY */
  uint16_t flags; /** < possible flags */
  struct sxstream_ops *ops; /** < operations */
  list_head_t *named; /** < named entries order */
  usrtc_node_t node; /** < internal node struct */
};

struct sxstream_opened {
  uint64_t sto_id;
  struct sxstream_description *desc;
  union {
    void *map_data;
    list_head_t *ent_buf;
  };
  void *priv;
  uint64_t off;
  uint16_t flags;
  uint16_t pin_channelid;
};

#define sxstream_opened_setpriv(n, p)   (n)->priv = (p)
#define sxstream_opened_getpriv(n)      (n)->priv
#define sxstream_opened_getelist(n)     (n)->ent_buf

struct sxstream_ops {
  int (*s_open)(sxlink_t *, struct sxstream_opened *, const char *);
  int (*s_close)(struct sxstream_opened *);
  union {
    size_t (*s_bread)(struct sxstream_opened *, size_t, uint64_t, void *);
    size_t (*s_eread)(struct sxstream_opened *, size_t, uint64_t, list_head_t **);
  };
  union {
    size_t (*s_bwrite)(struct sxstream_opened *, size_t, uint64_t, const void *);
    size_t (*s_ewrite)(struct sxstream_opened *, uint64_t, list_head_t *);
  };
};

typedef enum {
  SXCRITICAL_LOG = 0,
  SXERROR_LOG,
  SXWARNING_LOG,
  SXINFO_LOG,
  SXDEBUG_LOG,
} sxlogtype_t;

/**
 * \brief Hub subsystem structure.
 *
 * This structure used for management and control a set of a
 * determined links with the same RPC lists and the same
 * mode (server, client).
 *
 */
typedef struct __sxhub_type {
  usrtc_t *links;
  usrtc_t *streams;
  pthread_rwlock_t rwlock;
  char *rootca, *certpem, *certkey; /* path name to the certificates */
  sxl_rpclist_t *system_rpc;
  sxl_rpclist_t *stream_rpc;
  /* special functions pointers */
  int (*validate_sslpem)(sxlink_t *); /** < this function used to validate SSL certificate while SSL handshake */
  int (*secure_check)(sxlink_t *); /** < this function authorize user to login,
                                  * and also should check SSL cert and user, and already made sessions */
  usrtc_t* (*get_rpc_typed_list_tree)(sxlink_t *); /** < this function is used to set RPC list of the functions */
  int (*set_typed_list_callback)(sxlink_t *, int, char *); /** < this function is a callback
                                                          * during setting up a typed channel */
  void (*on_destroy)(sxlink_t *); /** < callback on connection destroy  */
  void (*on_pulse)(sxlink_t *, sexp_t *);  /** < callback on pulse emit */
  int (*log)(const sxlogtype_t, const char *, ...); /** < this function is used to output logs with user way */
  SSL_CTX *ctx; /** < SSL context */
  void *priv;
} sxhub_t;

#define sxhub_set_sslvalidate(c, f)  (c)->validate_sslpem = (f)
#define sxhub_set_authcheck(c, f)  (c)->secure_check = (f)
#define sxhub_set_rpcvalidator(c, f)  (c)->get_rpc_typed_list_tree = (f)
#define sxhub_set_channelcall(c, f)  (c)->set_typed_list_callback = (f)
#define sxhub_set_ondestroy(c, f)  (c)->on_destroy = (f)
#define sxhub_set_onpulse(c, f)  (c)->on_pulse = (f)
#define sxhub_set_priv(c, p)  (c)->priv = (p)
#define sxhub_get_priv(c)  (c)->priv
#define sxhub_set_logops(c, f)  (c)->log = (f)

/* this macro is used to make calling sxhub log function easier */
#define sxlink_log(c, type, fmt, ...)                                          \
  (c)->hub ? (c)->hub->log((type), (fmt), ## __VA_ARGS__) :                    \
  fprintf(stderr, (fmt), ## __VA_ARGS__);

typedef struct __rpc_typed_list_type {
  int type_id;
  char *description;
  sxl_rpclist_t *rpc_list;
  usrtc_node_t lnode;
} rpc_typed_list_t;

/* streams */

/* flags for streams */
#define SXE_O_BINARY         (1 << 1)
#define SXE_O_READ           (1 << 2)
#define SXE_O_WRITE          (1 << 3)
#define SXE_O_ASYNC          (1 << 4)
#define SXE_O_TRUNC          (1 << 5)
#define SXE_O_NAMED          (1 << 6)
#define SXE_O_OCHANNELED     (1 << 7)

/* sxstream_t used to access stream i.e. this is something returned on
 * open operations.
 * you should care about closing it, because this ones doesn't tracked.
 */
typedef struct __sxtream_type {
  sxlink_t *link;
  sxchnl_t *channel;
  int flags;
  uint64_t sid;
  union {
    sexp_t *sx;
    void *ebuf;
  };
  /* for reading */
  union {
    struct {
      void *data;
      uintptr_t cur_offset;
    };
    struct {
      list_head_t *entries;
      list_node_t *cur;
    };
  };
  list_head_t *named;
} sxstream_t;

struct _nn_stream_entry_node {
  char *value;
  list_node_t node;
};

struct _stream_entry_node {
  char *name;
  char *value;
  list_node_t node;
};

struct _stream_named_order_node {
  int order;
  char *name;
  list_node_t node;
};

struct _stream_list_node {
  list_head_t *list;
  list_node_t node;
};

#ifdef __cplusplus
extern "C" {
#endif

/* API */
int sxmp_init(void);

void sxmp_finalize(void);

int sxhub_init(sxhub_t *ssys);

sxhub_t *sxhub_create(void);

int sxhub_destroy(sxhub_t *ssys);

int sxhub_free(sxhub_t *ssys);

int sxhub_setsslserts(sxhub_t *ssys, const char *rootca,
                      const char *certpem, const char *certkey);

/* create links */
sxlink_t *sxlink_master_accept(sxhub_t *hub, int sck, struct in_addr *addr);
sxlink_t *sxlink_connect(sxhub_t *hub, const char *host,
                         int port, const char *SSL_cert, const char *login,
                         const char *passwd);
sxlink_t *sxlink_connect_at(sxhub_t *hub, const char *host,
                            int port, const char *SSL_cert, const char *login,
                            const char *passwd, const void *priv);

int sxlink_close(sxlink_t *co);

/* channels */
sxchnl_t *sxchannel_open(sxlink_t *co, int type);
int sxchannel_close(sxchnl_t *channel);

/* messages */
/*
 * creates a message with a payload.
 * Will return a error code, and, if applicable, pointer to message
 */
int sxmsg_send(sxchnl_t *channel, const char *data, size_t datalen, sxmsg_t **msg);
/* the same - postponed message i.e. will be written to the queue - not to write immendatly */
int sxmsg_send_pp(sxchnl_t *channel, const char *data, size_t datalen, sxmsg_t **msg);
/* send a pulse message */
int sxmsg_pulse(sxlink_t *co, const char *data, size_t datalen);
int sxmsg_reply(sxmsg_t *msg, const char *data, size_t datalen);
int sxmsg_reply_pp(sxmsg_t *msg, const char *data, size_t datalen);
int sxmsg_rreply(sxmsg_t *msg, size_t datalen);
int sxmsg_return(sxmsg_t *msg, int opcode);
int sxmsg_return_pp(sxmsg_t *msg, int opcode);
void sxmsg_clean(sxmsg_t *msg);

/* streams */
  /*
   * NOTE: sxstream operations (except opening process i.e. few threads
   *       can open different streams) are NOT _thread-safe_, please
   *       care about that in your own application.
   */
  sxstream_t *sxstream_open(sxlink_t *link, const char *opt, int stid, int flags);
  sxstream_t *sxstream_openwch(sxlink_t *link, sxchnl_t *channel, const char *opt, int stid, int flags);
  int sxstream_close(sxstream_t *stream);

  /* binary stream operations */
  size_t sxstream_bread(sxstream_t *stream, void *buf, size_t buf_len);
  size_t sxstream_breadat(sxstream_t *stream, void *buf, size_t buf_len, off_t off);
  size_t sxstream_bwriteat(sxstream_t *stream, const void *buf, size_t buf_len, off_t off);
  size_t sxstream_bsyncat(sxstream_t *stream, void *buf, size_t buf_len, off_t off);

  /*
   * NOTE: returning value for reading from entry streams
   *       shouldn't be managed, it's done by close operation.
   *       That means you should care about ridden values, since
   *       it might be freed on the next read operation.
   */
  /* entry nonamed stream ops */
  list_head_t *sxstream_read(sxstream_t *stream);

  /* entry named stream ops */
  list_head_t *sxstream_readnamed(sxstream_t *stream);

  /* provider-side functions for streams */
  int sxhub_stream_register(sxhub_t *hub, const struct sxstream_description *s_desc);

#ifdef __cplusplus
}
#endif

/* RPC List API */
#define SXMP_FILTER_INC  0xa
#define SXMP_FILTER_EXC  0xb
#define SXMP_FILTER_END  -1

#ifdef __cplusplus
extern "C" {
#endif

int sxmp_rpclist_init(usrtc_t *tree);

int sxmp_rpclist_add(usrtc_t *tree, int type, const char *description,
                     const char *version);

int sxmp_rpclist_add_function(usrtc_t *tree, int type, const char *fu_name,
                              int (*rpcf)(void *, sexp_t *));

int sxmp_rpclist_filter(usrtc_t *source, usrtc_t **dest, int flag, int *filter);

  /* generic sxstream list functions */
  list_head_t *sxstream_generic_slist_alloc(void);
  void sxstream_generic_slist_free(list_head_t *list);
  int sxstream_generic_slist_additem(list_head_t *list, const char *value);

  void sxstream_generic_named_free(list_head_t *list);
  int sxstream_generic_nmlist_additem(list_head_t *list, const char *name,
                                      int order);
  const char *sxstream_generic_named_lookupname(list_head_t *list, int order);
  int sxstream_generic_named_lookuporder(list_head_t *list, const char *name);

#ifdef __cplusplus
}
#endif

#define sxstream_generic_named_alloc() \
  sxstream_generic_slist_alloc()

#define blub(txt)  fprintf(stderr, "%s:%d in %s > %s\n", __FILE__, __LINE__, __FUNCTION__, txt)

#define dumphead(head) fprintf(stderr, "id: %d, opcode: %d, attr: %d, len = %d, reserve: %d\n", (head)->msgid, (head)->opcode, (head)->attr, (head)->payload_length, (head)->reserve)

#endif /* __SXMP_SXMPLV2_H__ */