/*
 * Secure X Message Passing Library v2 implementation.
 * (sxmplv2) it superseed all versions before due to the:
 * - memory consumption
 * - new features such as pulse emitting
 * - performance optimization
 *
 * This is a proprietary software. See COPYING for further details.
 *
 * (c) Askele Group 2013-2015 <http://askele.com>
 *
 */

#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/mman.h>
#include <fcntl.h>

#ifdef WIN32
#include <Winsock2.h>
#define EBADE 1
#define NETDB_SUCCESS 0
#else
#include <sys/select.h>
#include <netdb.h>
#include <unistd.h>
#include <uuid/uuid.h>
#endif

#include <openssl/ssl.h>
#include <openssl/err.h>
#include <openssl/engine.h>

#include <tdata/usrtc.h>
#include <tdata/list.h>
#include <sexpr/sexp.h>

#include <sxmp/limits.h>
#include <sxmp/sxmp.h>

#include "internal.h"

typedef struct __sxmpl_bundle_type {
  void *buf;
  sxlink_t *conn;
} sxmplv2_bundle_t;

/* networking helpers */
#ifndef WIN32
int __resolvehost(const char *hostname, char *buf, int buf_len,
                  struct hostent **rhp)
{
  struct hostent *hostbuf ;
  struct hostent *hp = *rhp = NULL;
  int herr = 0, hres = 0;

  hostbuf = malloc(sizeof(struct hostent));
  if(!hostbuf) return NO_ADDRESS;
  hres = gethostbyname_r(hostname, hostbuf,
                         buf, buf_len, &hp, &herr);

  if(hres) return NO_ADDRESS;
  *rhp = hp;

  return NETDB_SUCCESS;
}
#endif

static int __conn_read(sxlink_t *co, void *buf, size_t buf_len)
{
  int rfd = SSL_get_fd(co->ssl), r;
  fd_set readset, writeset;
  int ofcmode, read_blocked = 0, read_blocked_on_write = 0;

  /* First we make the socket nonblocking */
#ifndef WIN32
  ofcmode = fcntl(rfd, F_GETFL,0);
  ofcmode |= O_NDELAY;
  if(fcntl(rfd, F_SETFL, ofcmode))
    fprintf(stderr, "[sxmplv2] (RD)Couldn't make socket nonblocking");
#endif

 __retry:

  do {
  __try_again:
    if(co->flags & SXMP_CLOSED) {
      return -1;
    }
    r = SSL_read(co->ssl, buf, (int)buf_len);
    switch(SSL_get_error (co->ssl, r)) {
    case SSL_ERROR_NONE:
      return r;
      break;
    case SSL_ERROR_WANT_READ:
      /* get prepare to select */
      read_blocked = 1;
      break;
    case SSL_ERROR_WANT_WRITE: /* here we blocked on write */
      read_blocked_on_write = 1;
      break;
    case SSL_ERROR_SYSCALL:
      if(errno == EAGAIN || errno == EINTR) goto __try_again;
      else {
        fprintf(stderr, "[sxmplv2] (RD)SSL syscall error.\n");
        goto __close_conn;
      }
      break;
    case SSL_ERROR_WANT_CONNECT:
    case SSL_ERROR_WANT_ACCEPT:
      fprintf(stderr, "[sxmplv2] (RD)SSL negotiation required. Trying again.\n");
      goto __try_again;
      break;
    case SSL_ERROR_SSL:
      fprintf(stderr, "[sxmplv2] (RD)SSL error occured. Connection will be closed.\n");
      goto __close_conn;
      break;
    case SSL_ERROR_ZERO_RETURN:
      fprintf(stderr, "[sxmplv2] (RD)SSL connection is cleary closed.\n");
    default:
    __close_conn:
      ERR_free_strings();
      co->flags |= SXMP_CLOSED;
      fprintf(stderr, "[sxmplv2] (RD)Unknown error on %s (errno = %d)\n", co->uuid, errno);
      ERR_remove_state(0);
      return -1;
    }
  } while(SSL_pending(co->ssl) && !read_blocked);

 __select_retry:

  if(read_blocked) {
    FD_ZERO(&readset);
    FD_SET(rfd, &readset);
    /* waits until something will be ready to read */
    r = select(rfd + 1, &readset, NULL, NULL, NULL);
    if(r < 0) {
      if(errno == EINTR || errno == EAGAIN) goto __select_retry;
      fprintf(stderr, "[sxmplv2] (RD)Select (%d) on %s\n", errno, co->uuid);
      ERR_remove_state(0);
      return -1;
    }
    if(!r) {
      fprintf(stderr, "[sxmplv2] (RD)Nothing to wait for\n");
      ERR_remove_state(0);
      return 0;
    }
    read_blocked = 0;
    if(r && FD_ISSET(rfd, &readset)) goto __retry; /* try to read again */
  }
  if(read_blocked_on_write) { /* we was blocked on write */
    FD_ZERO(&readset);
    FD_ZERO(&writeset);
    FD_SET(rfd, &readset);
    FD_SET(rfd, &writeset);

    r = select(rfd + 1, &readset, &writeset, NULL, NULL);
    read_blocked_on_write = 0;
    if(r && FD_ISSET(rfd, &writeset)) goto __retry;
  }

  return r;
}

static int __conn_write(sxlink_t *co, void *buf, size_t buf_len)
{
  int r, rfd = SSL_get_fd(co->ssl);
  fd_set writeset;

 __retry:
  if(co->flags & SXMP_CLOSED) {
    return -1;
  }
  r = SSL_write(co->ssl, buf, (int)buf_len);
  switch(SSL_get_error(co->ssl, r)) {
  case SSL_ERROR_WANT_READ:
  case SSL_ERROR_WANT_WRITE:
    /* here we should block */
    FD_ZERO(&writeset);
    FD_SET(rfd, &writeset);
    r = select(rfd + 1, NULL, &writeset, NULL, NULL);
    if(r && FD_ISSET(rfd, &writeset)) goto __retry;
    break;
  case SSL_ERROR_SYSCALL:
    if(errno == EAGAIN || errno == EINTR) goto __retry;
    else goto __close_conn;
    break;
  default:
  __close_conn:
    if(r < 0) {
      /* set closed flag */
      ERR_free_strings();
      co->flags |= SXMP_CLOSED;
      fprintf(stderr, "[sxmplv2] (WR)Unknown error on %s (%d)\n", co->uuid, r);
      ERR_remove_state(0);
      return -1;
    } else return r;
  }

  return r;
}

int _sxmpl_writemsg(sxlink_t *co, sxmsg_t *msg)
{
  sxmplv2_head_t *head;
  size_t rd;
  int r;
  char *buf = NULL;

  if(!co || !msg) return SXE_FAILED;

  /* check message for validity */
  head = &msg->mhead;
  if(head->payload_length && !msg->payload) return SXE_FAILED;

  if(head->payload_length) {
    buf = malloc(sizeof(sxmplv2_head_t) + head->payload_length);
    memcpy(buf, head, sizeof(sxmplv2_head_t));
    memcpy(buf + sizeof(sxmplv2_head_t), msg->payload, head->payload_length);
  }

  /* write the head and payload if applicable */
  pthread_mutex_lock(&co->sslinout[1]);
  if(!buf)
    rd = __conn_write(co, head, sizeof(sxmplv2_head_t));
  else rd = __conn_write(co, buf, sizeof(sxmplv2_head_t) + head->payload_length);
  if(rd < 0) {
    co->flags |= SXMP_CLOSED;
    r = SXE_ESSL;
  }
  pthread_mutex_unlock(&co->sslinout[1]);

  if(!(co->flags & SXMP_CLOSED)) r = SXE_SUCCESS;
  if(buf) free(buf);
  return r;
}

static sxmplv2_bundle_t *__sxmpl_bundle_create(sxlink_t *co)
{
  sxmplv2_bundle_t *n = malloc(sizeof(sxmplv2_bundle_t));

  if(!n) return NULL;
  else memset(n, 0, sizeof(sxmplv2_bundle_t));

  n->buf = mmap(NULL, 65536, PROT_READ|PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
  if(n->buf == MAP_FAILED) {
    free(n);
    return NULL;
  }

  n->conn = co;

  return n;
}

static void __sxmpl_bundle_destroy(sxmplv2_bundle_t *n)
{
  munmap(n->buf, 65536);
  free(n);
  return;
}

static int ex_ssldata_index; /** < index used to work with additional data
                              * provided to the special call during SSL handshake */

/* this function is an ugly implementation to get C string with uuid */
extern char *__generate_uuid(void);

/* this is a callback to perform a custom SSL certs chain validation,
 * as I promised here the comments, a lot of ...
 * The first shit: 0 means validation failed, 1 otherwise
 * The second shit: X509 API, I guess u will love it ;-)
 * openssl calls this function for each certificate in chain,
 * since our case is a simple (depth of chain is one, since we're
 * don't care for public certificates lists or I cannot find any reasons to
 * do it ...), amount of calls reduced, and in this case we're interested
 * only in top of chain i.e. actual certificate used on client side,
 * the validity of signing for other certificates within chain is
 * guaranteed by the ssl itself.
 * u know, we need to lookup in database, or elsewhere... some information
 * about client certificate, and decide - is it valid, or not?, if so
 * yep I mean it's valid, we can assign it's long fucking number to
 * security context, to use in ongoing full scaled connection handshaking.
 */
static int __verify_certcall(int preverify_ok, X509_STORE_CTX *ctx)
{
  //  X509 *cert = X509_STORE_CTX_get_current_cert(ctx);
  int err = X509_STORE_CTX_get_error(ctx), depth = X509_STORE_CTX_get_error_depth(ctx);
  SSL *ssl = X509_STORE_CTX_get_ex_data(ctx, SSL_get_ex_data_X509_STORE_CTX_idx());
  sxlink_t *co = SSL_get_ex_data(ssl, ex_ssldata_index); /* this is a custom data we're set before */
  sxhub_t *ssys = co->ssys;

  /* now we need to check for certificates with a long chain,
   * so since we have a short one, reject long ones */
  if(depth > VERIFY_DEPTH) { /* longer than we expect */
    preverify_ok = 0; /* yep, 0 means error for those function callback in openssl, fucking set */
    err = X509_V_ERR_CERT_CHAIN_TOO_LONG;
    X509_STORE_CTX_set_error(ctx, err);
  }

  if(!preverify_ok) return 0;

  /* ok, now we're on top of SSL (depth == 0) certs chain,
   * and we can validate client certificate */
  if(!depth) {
    co->pctx->certid =
      ASN1_INTEGER_get((const ASN1_INTEGER *)X509_get_serialNumber(ctx->current_cert));
    //X509_STORE_CTX_free(ctx);
    //X509_free(ctx->current_cert);
    /* now we're need to check the ssl cert */
    if(ssys->validate_sslpem) {
      if(ssys->validate_sslpem(co)) return 0;
      else return 1;
    } else return 0;
  }

  return preverify_ok;
}

/* dummy just to check the server side */
static int __verify_certcall_dummy(int preverify_ok, X509_STORE_CTX *ctx)
{
  return preverify_ok;
}

static pthread_mutex_t *lock_cs;
static long *lock_count;

static void pthreads_locking_callback(int mode, int type, const char *file, int line)
{
  if (mode & CRYPTO_LOCK) {
    pthread_mutex_lock(&(lock_cs[type]));
    lock_count[type]++;
  } else {
    pthread_mutex_unlock(&(lock_cs[type]));
  }
}

static void pthreads_thread_id(CRYPTO_THREADID *tid)
{
#ifdef WIN32
  CRYPTO_THREADID_set_numeric(tid, (unsigned long)GetCurrentThreadId());
#else
  CRYPTO_THREADID_set_numeric(tid, (unsigned long)pthread_self());
#endif
}

int sxmp_init(void)
{
  int i;

  /* init SSL library */
  SSL_library_init();

  OpenSSL_add_all_algorithms();
  SSL_load_error_strings();

  ex_ssldata_index = SSL_get_ex_new_index(0, "__ssldata index", NULL, NULL, NULL);

  /* here we go - init all */
  lock_cs = OPENSSL_malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t));
  lock_count = OPENSSL_malloc(CRYPTO_num_locks() * sizeof(long));
  for (i = 0; i < CRYPTO_num_locks(); i++) {
    lock_count[i] = 0;
    pthread_mutex_init(&(lock_cs[i]), NULL);
  }

  CRYPTO_THREADID_set_callback(pthreads_thread_id);
  CRYPTO_set_locking_callback(pthreads_locking_callback);

  return 0;
}

void sxmp_finalize(void)
{
  int i;

  CRYPTO_set_locking_callback(NULL);

  for (i = 0; i < CRYPTO_num_locks(); i++) {
    pthread_mutex_destroy(&(lock_cs[i]));
  }
  OPENSSL_free(lock_cs);
  OPENSSL_free(lock_count);

  ERR_free_strings();
  ENGINE_cleanup();
  CRYPTO_cleanup_all_ex_data();
  EVP_cleanup();

  return;
}

sxlink_t *__link_minimal_alloc(struct in_addr *addr)
{
  sxlink_t *co = malloc(sizeof(sxlink_t));
  int r;

  if(!co) { r = ENOMEM; goto __fail; }
  else memset(co, 0, sizeof(sxlink_t));

  if(!(co->messages = malloc(sizeof(uintptr_t)*1024))) { r = ENOMEM; goto __fail; }
  else memset(co->messages, 0, sizeof(uintptr_t)*1024);

  if(!(co->pctx = malloc(sizeof(sxsession_ctx_t)))) { r = ENOMEM; goto __fail; }
  else memset(co->pctx, 0, sizeof(sxsession_ctx_t));
  if(addr) {
    if(!(co->pctx->addr = malloc(sizeof(struct in_addr)))) { r = ENOMEM; goto __fail; }

    memcpy(co->pctx->addr, addr, sizeof(struct in_addr));
  }

  if(!(co->uuid = __generate_uuid())) { r = ENOMEM; goto __fail; }

  return co;

 __fail:
  if(co) {
    if(co->pctx) {
      if(co->pctx->addr) free(co->pctx->addr);
      free(co->pctx);
    }
    if(co->messages) free(co->messages);
    free(co);
  }
  errno = r;
  return NULL;
}

static int __link_second_alloc(sxlink_t *co)
{
  usrtc_node_init(&co->csnode, co);

  memset(&co->idx_ch, 0, sizeof(idx_allocator_t));
  memset(&co->idx_msg, 0, sizeof(idx_allocator_t));
  if((idx_allocator_init(&co->idx_ch, 512, 0))) goto __fail;
  if((idx_allocator_init(&co->idx_msg, 1024, 0))) goto __fail;

  if(!(co->channels = malloc(sizeof(uintptr_t)*512))) goto __fail;
  else memset(co->channels, 0, sizeof(uintptr_t)*512);

  /* init mutexes */
  pthread_mutex_init(&co->idx_ch_lock, NULL);
  pthread_mutex_init(&co->idx_msg_lock, NULL);
  pthread_mutex_init(&co->write_pending_lock, NULL);
  pthread_mutex_init(&co->sslinout[0], NULL);
  pthread_mutex_init(&co->sslinout[1], NULL);

  /* init list */
  list_init_head(&co->write_pending);

  return SXE_SUCCESS;

 __fail:
  idx_allocator_destroy(&co->idx_msg);
  idx_allocator_destroy(&co->idx_ch);
  return SXE_ENOMEM;
}

static void __link_second_free(sxlink_t *co)
{
  if(co->channels) free(co->channels);
  idx_allocator_destroy(&co->idx_msg);
  idx_allocator_destroy(&co->idx_ch);

  pthread_mutex_destroy(&co->idx_ch_lock);
  pthread_mutex_destroy(&co->idx_msg_lock);
  pthread_mutex_destroy(&co->write_pending_lock);
  pthread_mutex_destroy(&co->sslinout[0]);
  pthread_mutex_destroy(&co->sslinout[1]);

  return;
}

static void __link_minimal_free(sxlink_t *co)
{
  if(co) {
    if(co->pctx) {
      if(co->pctx->addr) free(co->pctx->addr);
      free(co->pctx);
    }
    if(co->messages) free(co->messages);
    free(co->uuid);
    free(co);
  }

  return;
}

static int __eval_syssexp(sxlink_t *co, sexp_t *sx)
{
  sxl_rpclist_t *rpc_list = co->ssys->system_rpc;
  usrtc_node_t *node;
  sxl_rpc_t *rentry;
  char *rpcf;

  if(sx->ty == SEXP_LIST)
    rpcf = sx->list->val;
  else return SXE_BADPROTO;

  /* find an appropriate function */
  node = usrtc_lookup(rpc_list->rpc_tree, rpcf);

  if(!node)    return SXE_ENORPC;
  else rentry = (sxl_rpc_t *)usrtc_node_getdata(node);

  /* call it */
  return rentry->rpcf((void *)co, sx);
}

#ifdef _NO_SXMPMP
#define _CONN_INUSE(co)  (co)->usecount++;
#define _CONN_NOTINUSE(co)  (co)->usecount--;
#define _CONN_UCOUNT(co)  (co)->usecount
#else
static inline void _CONN_INUSE(sxlink_t *co) {
  pthread_rwlock_wrlock(&co->ssys->rwlock);
  co->usecount++;
  pthread_rwlock_unlock(&co->ssys->rwlock);
}

static inline void _CONN_NOTINUSE(sxlink_t *co) {
  pthread_rwlock_wrlock(&co->ssys->rwlock);
  co->usecount--;
  pthread_rwlock_unlock(&co->ssys->rwlock);
}

static inline int _CONN_UCOUNT(sxlink_t *co) {
  int r;
  pthread_rwlock_rdlock(&co->ssys->rwlock);
  r = co->usecount;
  pthread_rwlock_unlock(&co->ssys->rwlock);
  return r;
}
#endif

static void __link_destroy(sxlink_t *co)
{
  int i = 0, fd;
  sxmsg_t *msg, *omsg;
  sxppmsg_t *ppm;
  list_node_t *iter, *siter;
  sxchnl_t *chan;
  sxmplv2_head_t *head;
  sxhub_t *ssys = co->ssys;

  /* first we will unpin all messages and mark it as errors on */
  if(co->pending_messages) {
    pthread_mutex_lock(&co->write_pending_lock);
    list_for_each_safe(&co->write_pending, iter, siter) {
      ppm = container_of(iter, sxppmsg_t, node);
      omsg = ppm->msg;

      /* ok, now we're able to remove it from list */
      list_del(&ppm->node);
      if(omsg->mhead.attr & SXMSG_CLOSED) { /* message is closed - destroy it */
        pthread_mutex_unlock(&omsg->wait);
        pthread_mutex_destroy(&omsg->wait);
        free(omsg);
      } else { /* wake up */
        omsg->mhead.opcode = SXE_LINKERROR;
        pthread_mutex_unlock(&omsg->wait);
      }
      free(ppm);
      co->pending_messages--;
    }
    pthread_mutex_unlock(&co->write_pending_lock);
  }

  /* free queue */
  ERR_remove_state(0);
  ERR_remove_thread_state(0);
  ERR_free_strings();

  /* update use count */
  _CONN_NOTINUSE(co);

  /* ok, let's free other if we can */
  if(!_CONN_UCOUNT(co)) {
    /* go thru messages */
    pthread_mutex_lock(&co->idx_msg_lock);
    for(i = 0; i < 1024; i++) {
      msg = co->messages[i];
      if(!msg) continue;
      else head = &msg->mhead;
      head->opcode = SXE_LINKERROR;
      pthread_mutex_unlock(&msg->wait);
      pthread_mutex_destroy(&msg->wait);
      free(msg);
      co->messages[i] = NULL;
      idx_free(&co->idx_msg, i);
    }
    pthread_mutex_unlock(&co->idx_msg_lock);

    /* ok now we will free the channels */
    pthread_mutex_lock(&co->idx_ch_lock);
    for(i = 0; i < 512; i++) {
      chan = co->channels[i];
      if(!chan) continue;
      idx_free(&co->idx_ch, i);
      free(chan);
    }
    pthread_mutex_unlock(&co->idx_ch_lock);

    if(ssys->on_destroy) ssys->on_destroy(co);
    if(co->pctx->login) free(co->pctx->login);
    if(co->pctx->passwd) free(co->pctx->passwd);

    SSL_set_shutdown(co->ssl, SSL_RECEIVED_SHUTDOWN | SSL_SENT_SHUTDOWN);

    fd = SSL_get_fd(co->ssl);

    SSL_free(co->ssl);
    co->ssl = NULL;

    ERR_remove_thread_state(0);
    ERR_remove_state(0);


    ERR_free_strings();

    close(fd);
    __link_second_free(co);
    __link_minimal_free(co);
  }

  return;
}

static void *__sxmpl_thread(void *b)
{
  sxmplv2_bundle_t *bun = (sxmplv2_bundle_t *)b;
  sxlink_t *co = bun->conn;
  void *buf = bun->buf;
  char *bbuf = (char*)buf;
  sxmplv2_head_t *mhead = (sxmplv2_head_t *)buf;
  sxmsg_t *msg, *omsg;
  sexp_t *sx;
  sxchnl_t *channel;
  list_node_t *iter, *siter;
  sxppmsg_t *ppm;
  pthread_t self = pthread_self();
  struct timespec wtick;
  int dispatch = 0, e;
  size_t rd, wr;
  ulong_t mid;
#ifdef _PERFPROFILE
  struct timeval beg, end;
#endif
  /* byte buffer is following head */
  bbuf += sizeof(sxmplv2_head_t);

 __wait_alive:
  /* flag test - FIXME: make it atomic (it will works atomically on x86, btw on others not) */
  if(!(co->flags & SXMP_ALIVE)) {
    if(co->flags & SXMP_CLOSED) goto __finish;
    else {
      usleep(20);
      goto __wait_alive;
    }
  }

  /* check up a thread */
  if(pthread_equal(self, co->thrd_poll[7])) /* dispatcher */
    dispatch = 1;

  /* update use count */
  _CONN_INUSE(co);

  /* the following logic : (except dispatcher)
   * 1. check up pending write -> if exists write one and start again., otherwise go next
   * 2. read from ssl connection (we will sleep if other already acquire the lock)
   */
  while(1) {
  __again:
    if(co->flags & SXMP_CLOSED) goto __finish; /* go away if required asap */
    /* works with pending messages */
    if(co->pending_messages && !(co->flags & SXMP_CLOSED)) {
      pthread_mutex_lock(&co->write_pending_lock);
      list_for_each_safe(&co->write_pending, iter, siter) {
        ppm = container_of(iter, sxppmsg_t, node);
        omsg = ppm->msg;
        if(_sxmpl_writemsg(co, omsg) != SXE_SUCCESS) {
          pthread_mutex_unlock(&co->write_pending_lock);
          goto __finish; /* write failed - finishing ... */
        }

        /* ok, now we're able to remove it from list */
        list_del(&ppm->node);
        if(omsg->mhead.attr & SXMSG_CLOSED) { /* message is closed - destroy it */
          pthread_mutex_unlock(&omsg->wait);
          pthread_mutex_destroy(&omsg->wait);
          free(omsg);
        }
        free(ppm);
        co->pending_messages--;
      }
      pthread_mutex_unlock(&co->write_pending_lock);
    }

    if(!dispatch) pthread_mutex_lock(&(co->sslinout[0]));
    else { /* dispatch thread ticking every ət */
      wtick.tv_sec = time(NULL) + 1;
      e = pthread_mutex_timedlock(&(co->sslinout[0]), &wtick);
      if(e == ETIMEDOUT) goto __again;
    }
    if(co->flags & SXMP_CLOSED) {
      pthread_mutex_unlock(&(co->sslinout[0]));
      goto __finish;
    }
#ifdef _PERFPROFILE
    gettimeofday(&beg, NULL);
#endif
    rd = __conn_read(co, mhead, sizeof(sxmplv2_head_t));
#ifdef _PERFPROFILE
    gettimeofday(&end, NULL);

    if((end.tv_sec - beg.tv_sec) > 0) {
      printf("connread(head) Seconds: %ld ", end.tv_sec - beg.tv_sec);
      printf("µS: %ld\n", end.tv_usec + (1000000 - beg.tv_usec));
    } else printf("connread(head) µS: %ld\n", end.tv_usec - beg.tv_usec);
#endif

    if(co->flags & SXMP_CLOSED) {
      pthread_mutex_unlock(&(co->sslinout[0]));
      goto __finish; /* go away if required asap */
    }

#ifdef _VERBOSE_DEBUG
    dumphead(mhead);
#endif
    if(rd < 0) {
    __sslproto_error:
      co->flags |= SXMP_CLOSED;
      pthread_mutex_unlock(&(co->sslinout[0]));
      goto __finish;
    } else {
      /* check up if we can read or not */
      if(mhead->payload_length) {
#ifdef _PERFPROFILE
        gettimeofday(&beg, NULL);
#endif
        rd = __conn_read(co, bbuf, mhead->payload_length);
#ifdef _PERFPROFILE
        gettimeofday(&end, NULL);
        if((end.tv_sec - beg.tv_sec) > 0) {
          printf("connread(payload) Seconds: %ld ", end.tv_sec - beg.tv_sec);
          printf("µS: %ld\n", end.tv_usec + (1000000 - beg.tv_usec));
        } else printf("connread(payload) µS: %ld\n", end.tv_usec - beg.tv_usec);
#endif

        if(rd == -1) goto __sslproto_error;
        else pthread_mutex_unlock(&(co->sslinout[0]));

        if(rd != mhead->payload_length) {
          mid = mhead->msgid;
          /* if we're need to do something */
          if(mhead->msgid >= 1024) {
            mhead->opcode = SXE_INVALINDEX;
            goto __return_error;
          } else {
            //         pthread_mutex_lock(&co->idx_msg_lock);
            msg = co->messages[mid];
            //thread_mutex_unlock(&co->idx_msg_lock);
          }
          if(!msg) {
            if(mhead->attr & SXMSG_OPEN) mhead->opcode = SXE_BADPROTO;
            else {
              if((mhead->attr & SXMSG_PROTO) || (mhead->attr & SXMSG_LINK))
                mhead->opcode = SXE_BADPROTO;
              else mhead->opcode = SXE_NOSUCHMSG;
            }
          }
        __return_error:
          mhead->attr |= SXMSG_CLOSED;
          mhead->payload_length = 0;
          pthread_mutex_lock(&(co->sslinout[1]));
          wr = __conn_write(co, mhead, sizeof(sxmplv2_head_t));
          pthread_mutex_unlock(&(co->sslinout[1]));
          if(wr == -1) goto __finish;
          else goto __again;
        }
      } else pthread_mutex_unlock(&(co->sslinout[0]));
      /* take a message */
      if(mhead->attr & SXMSG_PROTO) { /* protocol message i.e. channel open/close */
        /* ok, check up the side */
        if(mhead->attr & SXMSG_REPLYREQ) { /* means we're not initiators and we don't need to allocate a message */
          if(mhead->attr & SXMSG_OPEN)
            mhead->opcode = _channel_open(co, &mhead->reserve);
          else mhead->opcode = _channel_close(co, mhead->reserve);

          /* set flags */
          mhead->payload_length = 0;
          mhead->attr &= ~SXMSG_REPLYREQ;
          pthread_mutex_lock(&(co->sslinout[1]));
          wr = __conn_write(co, mhead, sizeof(sxmplv2_head_t));
          pthread_mutex_unlock(&(co->sslinout[1]));
          if(wr < 0) goto __finish;
        } else { /* it's came back */
          /* reply came ... */
          if(mhead->msgid >= 1024) {
          __inval_idx_nor:
            fprintf(stderr, "[sxmplv2] Invalid index of the message.\n");
            goto __again;
          }
          mid = mhead->msgid;
          //hread_mutex_lock(&co->idx_msg_lock);
          msg = co->messages[mid];
          //hread_mutex_unlock(&co->idx_msg_lock);
          if(!msg) goto __inval_idx_nor;

          /* ok now we'are copy data and unlock wait mutex */
          memcpy(&msg->mhead, mhead, sizeof(sxmplv2_head_t));
          pthread_mutex_unlock(&msg->wait);
        }
      } else if(mhead->attr & SXMSG_LINK) { /* link layer messages */
        if(mhead->attr & SXMSG_CLOSED) goto __finish; /* close the link */
        if(mhead->attr & SXMSG_PULSE) { /* it's a link pulse messages */
          /* TODO: syncronization and so on */
          if(mhead->opcode == SXE_RAPIDMSG) { /* custom pulse */
            sx = parse_sexp(bbuf, mhead->payload_length);
            if(sx && co->ssys->on_pulse) co->ssys->on_pulse(co, sx);
            if(sx) destroy_sexp(sx);
          }
        }
      } else { /* regular messages */
        if((mhead->attr & SXMSG_OPEN) && (mhead->attr & SXMSG_REPLYREQ)) { /* dialog initiation */
          channel = co->channels[mhead->reserve];
          if(!channel) { /* ok, we'are failed */
            mhead->opcode = SXE_NOSUCHCHAN;
          __ret_regerr:
            mhead->payload_length = 0;
            mhead->attr &= ~SXMSG_REPLYREQ;
            mhead->attr &= ~SXMSG_OPEN;
            mhead->attr |= SXMSG_CLOSED;
            pthread_mutex_lock(&(co->sslinout[1]));
            wr = __conn_write(co, mhead, sizeof(sxmplv2_head_t));
            pthread_mutex_unlock(&(co->sslinout[1]));
            if(wr < 0) goto __finish;
            else goto __again;
          }
          /* if message is busy - fails */
          mid = mhead->msgid;
          msg = co->messages[mid];
          if(msg) { mhead->opcode = SXE_EBUSY; goto __ret_regerr; }

          /* now we will take a deal */
          if(!(msg = malloc(sizeof(sxmsg_t)))) {
            mhead->opcode = SXE_ENOMEM; goto __ret_regerr;
          } else {
            /* set mutex and channel */
            pthread_mutex_init(&msg->wait, NULL);
            pthread_mutex_lock(&msg->wait);
            msg->pch = channel;
            /* copy header only */
            memcpy(&msg->mhead, mhead, sizeof(sxmplv2_head_t));
            if(mhead->payload_length) msg->payload = bbuf;
          }

          pthread_mutex_lock(&co->idx_msg_lock);
          idx_reserve(&co->idx_msg, mid);
          co->messages[mid] = msg;
          pthread_mutex_unlock(&co->idx_msg_lock);

          /* now we are able to process the message */
          _message_process(msg);
        } else if(mhead->attr & SXMSG_CLOSED) {
          /* check for the message */
          if(mhead->msgid >= 1024) goto __inval_idx_nor;
          mid = mhead->msgid;
          pthread_mutex_lock(&co->idx_msg_lock);
          msg = co->messages[mid];
          if(!msg) {
            pthread_mutex_unlock(&co->idx_msg_lock); goto __inval_idx_nor; }

          /* message dialog is closed - remove this right now */

          idx_free(&co->idx_msg, mid);
          co->messages[mid] = NULL;
          pthread_mutex_unlock(&co->idx_msg_lock);

          if(msg->mhead.attr & SXMSG_TIMEDOUT) { /* nobody wait for it */
            /* now just free it */
            pthread_mutex_unlock(&msg->wait);
            pthread_mutex_destroy(&msg->wait);
            free(msg);
          } else {
            memcpy(&msg->mhead, mhead, sizeof(sxmplv2_head_t));
            if(mhead->payload_length) {
              msg->payload = malloc(mhead->payload_length);
              if(msg->payload) memcpy(msg->payload, bbuf, mhead->payload_length);
              else msg->mhead.opcode = SXE_ENOMEM;
            }

            pthread_mutex_unlock(&msg->wait); /* wake up thread waiting for */
          }
        } else if((!(mhead->attr & SXMSG_CLOSED) && !(mhead->attr & SXMSG_OPEN)) &&
                  (mhead->attr & SXMSG_REPLYREQ)) { /* ongoing dialog */
          /* check for the message */
          if(mhead->msgid >= 1024) goto __inval_idx_nor;
          mid = mhead->msgid;
          msg = co->messages[mid];
          if(!msg) goto __inval_idx_nor;

          if(msg->mhead.attr & SXMSG_TIMEDOUT) { /* nobody wait for it */
            pthread_mutex_lock(&co->idx_msg_lock);
            idx_free(&co->idx_msg, mid);
            co->messages[mid] = NULL;
            pthread_mutex_unlock(&co->idx_msg_lock);

            /* now just free it */
            pthread_mutex_destroy(&msg->wait);
            free(msg);

            /* we must reply */
            mhead->opcode = SXE_ETIMEDOUT;
            goto __ret_regerr;
          } else {
            memcpy(&msg->mhead, mhead, sizeof(sxmplv2_head_t));
            if(mhead->payload_length) {
              msg->payload = malloc(mhead->payload_length);
              if(msg->payload) memcpy(msg->payload, bbuf, mhead->payload_length);
              else {
                mhead->opcode = msg->mhead.opcode = SXE_ENOMEM; /* we will return it to waitee */
                msg->mhead.attr &= ~SXMSG_REPLYREQ; /* doesn't need to reply */
                /* reply here now */
                mhead->payload_length = 0;
                mhead->attr &= ~SXMSG_REPLYREQ;
                mhead->attr &= ~SXMSG_OPEN;
                mhead->attr |= SXMSG_CLOSED;
                pthread_mutex_lock(&(co->sslinout[1]));
                wr = __conn_write(co, mhead, sizeof(sxmplv2_head_t));
                pthread_mutex_unlock(&(co->sslinout[1]));
                if(wr < 0) goto __finish;
              }
            }
            pthread_mutex_unlock(&msg->wait); /* wake up thread waiting for */
          }
        } else { mhead->opcode = SXE_BADPROTO; goto __ret_regerr; }
      }
    }
  }

 __finish:

  co->flags |= SXMP_CLOSED;
  __link_destroy(co);
  __sxmpl_bundle_destroy(b); /* destroy bundle */
  return NULL;
}

sxlink_t *sxlink_master_accept(sxhub_t *ssys, int sck, struct in_addr *addr)
{
  void *buf = NULL;
  char *bbuf;
  sxlink_t *co = __link_minimal_alloc(addr);
  sxmsg_t *msg = NULL;
  sxmplv2_head_t *head;
  sxmplv2_bundle_t *bundle;
  sexp_t *sx;
  size_t rd;
  int r = SXE_FAILED, i;

  if(!co) {
    errno = SXE_ENOMEM;
    return NULL;
  }

  /* ok, now we need to init ssl stuff */
  co->ssys = ssys;

  /* check up - do we need to initialize SSL context? */
  if(!ssys->ctx) {
    /* init SSL certificates and context */
    ssys->ctx = SSL_CTX_new(TLSv1_2_server_method());
    if(!ssys->ctx) { r = SXE_ENOMEM; goto __fail; }
    else {
      /* set verify context */
      SSL_CTX_set_verify(ssys->ctx, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT,
                         __verify_certcall);
      /* set verify depth */
      SSL_CTX_set_verify_depth(ssys->ctx, VERIFY_DEPTH);

      /* set cache policy */
      SSL_CTX_set_session_cache_mode(ssys->ctx, SSL_SESS_CACHE_OFF);
      SSL_CTX_set_mode(ssys->ctx, SSL_MODE_RELEASE_BUFFERS);
    }

    /* load certificates */
    SSL_CTX_load_verify_locations(ssys->ctx, ssys->rootca, NULL);
    /* set the local certificate from CertFile */
    if(SSL_CTX_use_certificate_file(ssys->ctx, ssys->certpem,
                                    SSL_FILETYPE_PEM)<=0) {
      r = SXE_ESSL;
      goto __fail;
    }
    /* set the private key from KeyFile (may be the same as CertFile) */
    if(SSL_CTX_use_PrivateKey_file(ssys->ctx, ssys->certkey,
                                   SSL_FILETYPE_PEM)<=0) {
      r = SXE_ESSL;
      goto __fail;
    }
    /* verify private key */
    if (!SSL_CTX_check_private_key(ssys->ctx)) {
      r = SXE_ESSL;
      goto __fail;
    }

  }

  /* now we will create an SSL connection */
  co->ssl = SSL_new(ssys->ctx);
  if(!co->ssl) { r = SXE_ENOMEM; goto __fail; }
  else SSL_set_fd(co->ssl, sck); /* attach connected socket */

  /* set the context to verify ssl connection */
  SSL_set_ex_data(co->ssl, ex_ssldata_index, (void *)co);
  SSL_set_accept_state(co->ssl);
  if(SSL_accept(co->ssl) == -1) { r = SXE_EPERM; goto __fail; } /* leak here ? */
  //  SSL_do_handshake(co->ssl);

  /* ok, now we are able to allocate and so on */
  /* set connection to the batch mode */
  co->flags |= SXMP_BATCHMODE;
  /* allocate our first buffer */
  buf = mmap(NULL, 65536, PROT_READ|PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
  if(buf == MAP_FAILED) { r = SXE_ENOMEM; goto __fail2; }
  /* allocate first message */
  if(!(msg = malloc(sizeof(sxmsg_t)))) { r = SXE_ENOMEM; goto __fail2; }
  else {
    memset(msg, 0, sizeof(sxmsg_t));
    co->messages[0] = msg;
  }
  bbuf = (char *)buf;
  bbuf += sizeof(sxmplv2_head_t);

  while(co->flags & SXMP_BATCHMODE) {
    rd = __conn_read(co, buf, sizeof(sxmplv2_head_t));
    if(rd == sizeof(sxmplv2_head_t)) {
      head = (sxmplv2_head_t *)buf;

      /* check for returns */
      if(head->opcode != SXE_SUCCESS) { r = head->opcode; goto __fail3; }
      else { /* opcode is fine */
        /* if we're ready for messaging mode, turn off batch mode */
        if(co->flags & SXMP_MESSAGINGMODE) {
          co->flags &= ~SXMP_BATCHMODE;
          break;
        }
      }

      if(!head->payload_length) continue; /* pass the following check up */

      rd = __conn_read(co, bbuf, head->payload_length);
      if(rd == -1) { r = SXE_LINKERROR; goto __fail3; }
      if(rd != head->payload_length) { r = SXE_LINKERROR; goto __fail3; }
      bbuf[rd] = '\0';
      sx = parse_sexp(bbuf, rd);
      if(!sx) goto __fail3;

      /* initialize message */
      msg->payload = bbuf;
      msg->mhead.payload_length = 0;
      /* deal with it */
      r = __eval_syssexp(co, sx);
      memcpy(head, &msg->mhead, sizeof(sxmplv2_head_t));
      head->opcode = r;
      if(r != SXE_SUCCESS) { /* we finish */
        head->payload_length = 0;
        __conn_write(co, head, sizeof(sxmplv2_head_t));
        destroy_sexp(sx);
        goto __fail3;
      }
      rd = __conn_write(co, buf, sizeof(sxmplv2_head_t) + msg->mhead.payload_length);
      if(rd == -1) { r = SXE_LINKERROR; goto __fail3; }
      if(rd != sizeof(sxmplv2_head_t) + msg->mhead.payload_length) {
        destroy_sexp(sx);
        goto __fail3;
      }

      destroy_sexp(sx);
    } else { r = SXE_LINKERROR; goto __fail3; }
  }

  /* if we're there - negotiation is done, going to init messaging mode */
  r = __link_second_alloc(co);
  if(r != SXE_SUCCESS)  goto __fail3;

  /* free message */
  co->messages[0] = NULL;
  free(msg);

  /* and now we're need to create a thread poll */
  if(!(bundle = malloc(sizeof(sxmplv2_bundle_t)))) { r = SXE_ENOMEM; goto __fail4; }
  else {
    bundle->buf = buf;
    bundle->conn = co;
  }

  for(i = 0; i < MAX_SXMPLTHREADS; i++) {
    if(bundle == (void *)0xdead) bundle = __sxmpl_bundle_create(co);
    if(!bundle) goto __fail5;
    r = pthread_create(&co->thrd_poll[i], NULL, __sxmpl_thread, bundle); /* and here, alloc tls */
    if(r) goto __fail5;
    else {
      bundle = (void *)0xdead;
      pthread_detach(co->thrd_poll[i]);
    }
  }

  /* all is done, connection now ready */
  co->flags |= SXMP_ALIVE;

  r = SXE_SUCCESS;
  errno = r;

  /* free context for this thread */
  ERR_remove_state(0);

  return co;

 __fail5:
  r = SXE_ENOMEM;
  /* bundles will be freed by the threads when SSL_read will fails. */
 __fail4:
  __link_second_free(co);
 __fail3:
  if(ssys->on_destroy) ssys->on_destroy(co);
 __fail2:
  if(msg) free(msg);
  if(buf != MAP_FAILED) munmap(buf, 65536);
  SSL_shutdown(co->ssl);
 __fail:
  if(co) {
    if(co->ssl) {
      ERR_remove_thread_state(0);
      ERR_remove_state(0);
      ERR_free_strings();

      SSL_free(co->ssl);
    }
    __link_minimal_free(co);
  }
  close(sck);
  errno = r;

  return NULL;
}

sxlink_t *sxlink_connect(sxhub_t *ssys, const char *host,
                         int port, const char *SSL_cert, const char *login,
                         const char *passwd)
{
  sxlink_t *co = __link_minimal_alloc(NULL);
  struct hostent *host_;
  struct sockaddr_in addr;
  int r = SXE_SUCCESS, sck;
#ifdef WIN32
  WSADATA wsaData;
#endif
  char hostbuf[2048];
  void *buf = NULL;
  char *bbuf;
  sxmplv2_head_t *head;
  sxmplv2_bundle_t *bundle;
  sxmsg_t *msg;
  size_t rd, wr;
  int i;

  r = SXE_IGNORED;
  if(!host || !SSL_cert) goto __fail;
  if(!co) { r = SXE_ENOMEM; goto __fail; }

#ifdef WIN32
  WSAStartup(MAKEWORD(2, 2), &wsaData);
#endif

  /* ok, now we need to init ssl stuff */
  co->ssys = ssys;

  /* check up ssl context */
  if(!ssys->ctx) {
    /* init SSL certificates and context */
    ssys->ctx = SSL_CTX_new(TLSv1_2_client_method());
    if(!ssys->ctx) { r = SXE_ENOMEM; goto __fail; }
    else {
      /* set verify context */
      SSL_CTX_set_verify(ssys->ctx, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT,
                         __verify_certcall_dummy);
      /* set verify depth */
      SSL_CTX_set_verify_depth(ssys->ctx, VERIFY_DEPTH);
    }

    /* load certificates */
    SSL_CTX_load_verify_locations(ssys->ctx, ssys->rootca, NULL);
    /* set the local certificate from CertFile */
    if(SSL_CTX_use_certificate_file(ssys->ctx, SSL_cert,
                                    SSL_FILETYPE_PEM)<=0) {
      r = SXE_ESSL;
      goto __fail;
    }
    /* set the private key from KeyFile (may be the same as CertFile) */
    if(SSL_CTX_use_PrivateKey_file(ssys->ctx, SSL_cert,
                                   SSL_FILETYPE_PEM)<=0) {
      r = SXE_ESSL;
      goto __fail;
    }
    /* verify private key */
    if (!SSL_CTX_check_private_key(ssys->ctx)) {
      r = SXE_ESSL;
      goto __fail;
    }
  } else {
    /* set the local certificate from CertFile */
    if(SSL_CTX_use_certificate_file(ssys->ctx, SSL_cert,
                                    SSL_FILETYPE_PEM)<=0) {
      r = SXE_ESSL;
      goto __fail;
    }
    /* set the private key from KeyFile (may be the same as CertFile) */
    if(SSL_CTX_use_PrivateKey_file(ssys->ctx, SSL_cert,
                                   SSL_FILETYPE_PEM)<=0) {
      r = SXE_ESSL;
      goto __fail;
    }
    /* verify private key */
    if (!SSL_CTX_check_private_key(ssys->ctx)) {
      r = SXE_ESSL;
      goto __fail;
    }
  }

  /* resolve host */
#ifdef WIN32
  host_ = gethostbyname(host);
  if(!host_) {
    r = SXE_FAILED;
    goto __fail;
  }
#else
  r = __resolvehost(host, hostbuf, 2048, &host_);
  if(r) {
    r = SXE_FAILED;
    goto __fail;
  }
#endif

  /* create a socket */
  sck = socket(PF_INET, SOCK_STREAM, 0);
  memset(&addr, 0, sizeof(addr));

  /* try to connect it */
  addr.sin_family = AF_INET;
  addr.sin_port = htons(port);
  addr.sin_addr.s_addr = *(uint32_t*)(host_->h_addr);
  r = connect(sck, (struct sockaddr*)&addr, sizeof(addr));
  if(r) {
    close(sck);
    r = SXE_FAILED; /* couldn't connect to the desired host */
    goto __fail;
  }

  /* SSL handshake */
  co->ssl = SSL_new(ssys->ctx); /* TODO: checkout for it */
  SSL_set_fd(co->ssl, sck); /* attach connected socket */
  SSL_set_connect_state(co->ssl);
  if(SSL_connect(co->ssl) == -1) {
    r = SXE_EPERM;
    /* shutdown connection */
    goto __fail;
  } /* if success we're ready to use established SSL channel */

  /* set connection to the batch mode */
  co->flags |= SXMP_BATCHMODE;

  /* allocate our first buffer */
  buf = mmap(NULL, 65536, PROT_READ|PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
  if(buf == MAP_FAILED) { r = SXE_ENOMEM; goto __fail2; }
  /* allocate first message */
  if(!(msg = malloc(sizeof(sxmsg_t)))) { r = SXE_ENOMEM; goto __fail2; }
  else {
    memset(msg, 0, sizeof(sxmsg_t));
    co->messages[0] = msg;
  }
  bbuf = (char *)buf;
  bbuf += sizeof(sxmplv2_head_t);
  head = (sxmplv2_head_t *)buf;

  sexp_t *sx;
  size_t ln;
  while(co->flags & SXMP_BATCHMODE) {
    /* form a message -- credentials */
    ln = snprintf(bbuf, 65535 - sizeof(sxmplv2_head_t), "(auth-set-credentials \"%s\" \"%s\")",
                  login ? login : "nil", passwd ? passwd : "nil");

    head->opcode = SXE_SUCCESS;
    head->payload_length = ln;
    wr = __conn_write(co, buf, ln + sizeof(sxmplv2_head_t));
    if(wr < 0) goto __fail2;

    rd = __conn_read(co, head, sizeof(sxmplv2_head_t));
    if(rd < 0) goto __fail2;
    if(head->opcode != SXE_SUCCESS) {
      r = head->opcode;
      goto __fail2;
    }

    /* ok, get available channels */
    head->opcode = SXE_SUCCESS;
    ln = snprintf(bbuf, 65535 - sizeof(sxmplv2_head_t), "(get-channels-list)");
    head->payload_length = ln;
    wr = __conn_write(co, buf, ln + sizeof(sxmplv2_head_t));
    if(wr < 0) goto __fail2;

    rd = __conn_read(co, head, sizeof(sxmplv2_head_t));
    if(rd < 0) goto __fail2;
    if(head->opcode != SXE_SUCCESS) goto __fail2;
    if(!head->payload_length) goto __fail2;
    rd = __conn_read(co, bbuf, head->payload_length);
    if(rd < 0) goto __fail2;

    /* perform a parsing of the desired message */
    bbuf[rd] = '\0';
    sx = parse_sexp(bbuf, rd);
    if(!sx) { r = SXE_BADPROTO; goto __fail2; }
    r = __eval_syssexp(co, sx);
    if(!r) r = SXE_SUCCESS;
    destroy_sexp(sx);

    /* write back */
    head->opcode = r;
    head->payload_length = 0;
    wr = __conn_write(co, head, sizeof(sxmplv2_head_t));
    if(wr < 0) {
      r = SXE_LINKERROR; goto __fail2;}
    if(r != SXE_SUCCESS) { r = SXE_LINKERROR; goto __fail2;}
  }

  /* if we're there - negotiation is done, going to init messaging mode */
  r = __link_second_alloc(co);
  if(r != SXE_SUCCESS)  goto __fail3;

  /* free message */
  co->messages[0] = NULL;
  free(msg);

  /* and now we're need to create a thread poll */
  if(!(bundle = malloc(sizeof(sxmplv2_bundle_t)))) { r = SXE_ENOMEM; goto __fail4; }
  else {
    bundle->buf = buf;
    bundle->conn = co;
  }
  for(i = 0; i < MAX_SXMPLTHREADS; i++) {
    if(bundle == (void *)0xdead) bundle = __sxmpl_bundle_create(co);
    if(!bundle) goto __fail5;
    r = pthread_create(&co->thrd_poll[i], NULL, __sxmpl_thread, bundle);
    if(r) goto __fail5;
    else {
      pthread_detach(co->thrd_poll[i]);
      bundle = (void *)0xdead;
    }
  }

  /* all is done, connection now ready */
  co->flags |= SXMP_ALIVE;

  return co;

 __fail5:
  r = SXE_ENOMEM;
  /* bundles will be freed by the threads when SSL_read will fails. */
 __fail4:
  __link_second_free(co);
 __fail3:
  if(ssys->on_destroy) ssys->on_destroy(co);
 __fail2:
  if(buf != MAP_FAILED) munmap(buf, 65536);
  SSL_shutdown(co->ssl);
  ERR_remove_thread_state(0);
  ERR_remove_state(0);

  close(sck);
 __fail:
  if(co) {
    if(co->ssl) SSL_free(co->ssl);
    __link_minimal_free(co);
  }
  errno = r;
  return NULL;
}

int sxlink_close(sxlink_t *co)
{
  sxmplv2_head_t mhead;

  memset(&mhead, 0, sizeof(sxmplv2_head_t));
  /* setup header */
  mhead.attr = SXMSG_LINK | SXMSG_CLOSED;

  pthread_mutex_lock(&(co->sslinout[1]));
  __conn_write(co, &mhead, sizeof(sxmplv2_head_t));
  pthread_mutex_unlock(&(co->sslinout[1]));

  /* we will not wait anything */
  co->flags |= SXMP_CLOSED;

  /* TODO: wait until all threads will finish */
  usleep(20000);

  return SXE_SUCCESS;
}