/*
 * Secure Network Transport Layer Library v2 implementation.
 * (sntllv2) it superseed all versions before due to the:
 * - memory consumption
 * - new features such as pulse emitting
 * - performance optimization
 *
 * This is a proprietary software. See COPYING for further details.
 *
 * (c) Askele Group 2013-2015 <http://askele.com>
 *
 */

#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <fcntl.h>

#include <tdata/usrtc.h>
#include <tdata/list.h>
#include <sexpr/sexp.h>

#include <sntl/sntllv2.h>

#include "internal.h"

void _message_process(sxmsg_t *msg)
{
  chnl_t *chan = msg->pch;
  sexp_t *sx, *isx;
  usrtc_t *listrpc = chan->rpc_list->rpc_tree;
  usrtc_node_t *node;
  cx_rpc_t *rpcc;
  int r;

  sx = parse_sexp(msg->payload, msg->mhead.payload_length);
  if(!sx) sxmsg_return(msg, SNE_BADPROTO);

  sexp_list_car(sx, &isx);
  if(!isx) {    r = SNE_BADPROTO;    goto __return_err;  }
  if(isx->ty == SEXP_LIST) {    r = SNE_BADPROTO;    goto __return_err;  }
  if(isx->aty != SEXP_BASIC) {    r = SNE_BADPROTO;    goto __return_err;  }

  node = usrtc_lookup(listrpc, (void *)isx->val);
  if(!node) {    r = SNE_ENORPC;    goto __return_err;  }
  else rpcc = (cx_rpc_t *)usrtc_node_getdata(node);

  rpcc->rpcf((void *)msg, sx);

  destroy_sexp(sx);

  return;

 __return_err:
  destroy_sexp(sx);
  sxmsg_return(msg, r);
  return;
}

static inline int __sxmsg_send(chnl_t *channel, const char *data, size_t datalen,
                               sxmsg_t **omsg, int pp)
{
  conn_t *co;
  sxmsg_t *msg;
  sntllv2_head_t *head;
  ppmsg_t *ppm;
  int msgidx, r;

  if(!channel) return SNE_FAILED;
  if(!data || !datalen) return SNE_FAILED;

  if(!(msg = malloc(sizeof(sxmsg_t)))) return SNE_ENOMEM;
  else memset(msg, 0, sizeof(sxmsg_t));

  co = channel->connection;
  head = &msg->mhead;
  /* form a head */
  head->attr = SXMSG_OPEN | SXMSG_REPLYREQ;
  head->reserve = channel->cid;
  head->payload_length = datalen;
  /* init message itself */
  pthread_mutex_init(&msg->wait, NULL);
  pthread_mutex_lock(&msg->wait);
  msg->pch = channel;
  msg->payload = (void *)data;

  pthread_mutex_lock(&co->idx_msg_lock);
  msgidx = idx_allocate(&co->idx_msg);
  if(msgidx != IDX_INVAL) co->messages[msgidx] = msg;
  pthread_mutex_unlock(&co->idx_msg_lock);

  if(msgidx == IDX_INVAL) {    r = SNE_MMESSAGES;    goto __freemsg;  }
  else head->msgid = (uint16_t)msgidx;

  /* ready to send it */
  if(!pp) {
    r = _sntll_writemsg(co, msg);
    if(r != SNE_SUCCESS) goto __closemsg;
  } else { /* postponed */
    if(!(ppm = malloc(sizeof(ppmsg_t)))) { r = SNE_ENOMEM; goto __closemsg; }
    list_init_node(&ppm->node);
    ppm->msg = msg;

    /* under locking here */
    pthread_mutex_lock(&co->write_pending_lock);
    co->pending_messages++;
    list_add2tail(&co->write_pending, &ppm->node); /* push it to the FIFO */
    pthread_mutex_unlock(&co->write_pending_lock);
  }

  pthread_mutex_lock(&msg->wait); /* we will sleep here */

  if(head->payload_length) {
    *omsg = msg;
    return head->opcode;
  } else r = head->opcode;

 __closemsg:
  pthread_mutex_lock(&co->idx_msg_lock);
  idx_free(&co->idx_msg, msgidx);
  co->messages[msgidx] = NULL;
  pthread_mutex_unlock(&co->idx_msg_lock);
 __freemsg:
  /* free resources for message */
  pthread_mutex_unlock(&msg->wait);
  pthread_mutex_destroy(&msg->wait);
  free(msg);

  return r;
}

int sxmsg_send(chnl_t *channel, const char *data, size_t datalen, sxmsg_t **omsg)
{
  return __sxmsg_send(channel, data, datalen, omsg, 0);
}

/* the same - postponed message i.e. will be written to the queue - not to write immendatly */
int sxmsg_send_pp(chnl_t *channel, const char *data, size_t datalen, sxmsg_t **omsg)
{
  return __sxmsg_send(channel, data, datalen, omsg, 1);
}

/* send a pulse message */
int sxmsg_pulse(conn_t *co, const char *data, size_t datalen)
{
  sxmsg_t *msg = malloc(sizeof(sxmsg_t));
  sntllv2_head_t *head;
  int r;

  /* a little bit of paranoid tests */
  if(!msg) return SNE_ENOMEM;
  else memset(msg, 0, sizeof(sxmsg_t));

  /* prepare it */
  head = &msg->mhead;
  head->attr = 0;
  head->attr = SXMSG_PULSE | SXMSG_LINK;
  head->opcode = SNE_RAPIDMSG;
  head->payload_length = datalen;
  msg->payload = (void *)data;

  r = _sntll_writemsg(co, msg);

  free(msg);

  return r;
}

static inline int __sxmsg_reply(sxmsg_t *msg, const char *data,
                                size_t datalen, int pp)
{
  chnl_t *ch;
  conn_t *co;
  sntllv2_head_t *head;
  ppmsg_t *ppm;
  int r, i;
  pthread_t self = pthread_self();

  /* a little bit of paranoid tests */
  if(!msg) return SNE_FAILED;
  if(!(ch = msg->pch)) return SNE_FAILED;
  if(!(co = ch->connection)) return SNE_FAILED;

  /* test for blocking */
  for(i = 0; i < 8; i++)
    if(pthread_equal(self, co->thrd_poll[i])) return SNE_WOULDBLOCK;

  /* prepare it */
  head = &msg->mhead;
  head->attr = 0;
  head->attr |= SXMSG_REPLYREQ;
  head->opcode = SNE_REPLYREQ;
  head->payload_length = datalen;
  msg->payload = (void *)data;

  if(!pp) {
    r = _sntll_writemsg(co, msg);
    if(r != SNE_SUCCESS) return r;
  } else {
    if(!(ppm = malloc(sizeof(ppmsg_t)))) return SNE_ENOMEM;
    list_init_node(&ppm->node);
    ppm->msg = msg;

    /* under locking here */
    pthread_mutex_lock(&co->write_pending_lock);
    co->pending_messages++;
    list_add2tail(&co->write_pending, &ppm->node); /* push it to the FIFO */
    pthread_mutex_unlock(&co->write_pending_lock);
  }

  pthread_mutex_lock(&msg->wait); /* wait */

  r = head->opcode;

  if((head->attr & SXMSG_CLOSED) && !head->payload_length) { /* dialog closed and no data exists */
    pthread_mutex_unlock(&msg->wait); /* we able to invalidate it */
    pthread_mutex_destroy(&msg->wait);
    free(msg);
  }

  return r;
}

int sxmsg_reply(sxmsg_t *msg, const char *data, size_t datalen)
{
  return __sxmsg_reply(msg, data, datalen, 0);
}

int sxmsg_reply_pp(sxmsg_t *msg, const char *data, size_t datalen)
{
  return __sxmsg_reply(msg, data, datalen, 1);
}

int sxmsg_rreply(sxmsg_t *msg, size_t datalen)
{
  chnl_t *ch;
  conn_t *co;
  sntllv2_head_t *head;
  int r, mid;

  /* a little bit of paranoid tests */
  if(!msg) return SNE_FAILED;
  if(!(ch = msg->pch)) return SNE_FAILED;
  if(!(co = ch->connection)) return SNE_FAILED;

  /* prepare it */
  head = &msg->mhead;
  head->attr = 0;
  head->attr |= SXMSG_CLOSED;
  head->opcode = SNE_RAPIDMSG;
  head->payload_length = datalen;

  mid = head->msgid;
  pthread_mutex_lock(&co->idx_msg_lock);
  idx_free(&co->idx_msg, mid);
  co->messages[mid] = NULL;
  pthread_mutex_unlock(&co->idx_msg_lock);

  r = _sntll_writemsg(co, msg);

  pthread_mutex_unlock(&msg->wait); /* we able to invalidate it */
  pthread_mutex_destroy(&msg->wait);
  free(msg);

  return r;
}

static inline int __sxmsg_return(sxmsg_t *msg, int opcode, int pp)
{
  chnl_t *ch;
  conn_t *co;
  sntllv2_head_t *head;
  ppmsg_t *ppm;
  int r, mid;

  /* a little bit of paranoid tests */
  if(!msg) return SNE_FAILED;
  if(!(ch = msg->pch)) return SNE_FAILED;
  if(!(co = ch->connection)) return SNE_FAILED;

  head = &msg->mhead;
  head->attr = 0;
  head->attr |= SXMSG_CLOSED;
  head->opcode = opcode;
  head->payload_length = 0;
  mid = head->msgid;

  if(!pp) {
    /* free index */
    pthread_mutex_lock(&co->idx_msg_lock);
    idx_free(&co->idx_msg, mid);
    co->messages[mid] = NULL;
    pthread_mutex_unlock(&co->idx_msg_lock);

    r = _sntll_writemsg(co, msg);
  } else {
    if(!(ppm = malloc(sizeof(ppmsg_t)))) return SNE_ENOMEM;
    else { /* remove it */
      pthread_mutex_lock(&co->idx_msg_lock);
      idx_free(&co->idx_msg, mid);
      co->messages[mid] = NULL;
      pthread_mutex_unlock(&co->idx_msg_lock);
    }

    list_init_node(&ppm->node);
    ppm->msg = msg;

    /* under locking here */
    pthread_mutex_lock(&co->write_pending_lock);
    co->pending_messages++;
    list_add2tail(&co->write_pending, &ppm->node); /* push it to the FIFO */
    pthread_mutex_unlock(&co->write_pending_lock);

    r = SNE_SUCCESS;
  }

  return r;
}

int sxmsg_return(sxmsg_t *msg, int opcode)
{
  return __sxmsg_return(msg, opcode, 0);
}

int sxmsg_return_pp(sxmsg_t *msg, int opcode)
{
  return __sxmsg_return(msg, opcode, 1);
}

void sxmsg_clean(sxmsg_t *msg)
{
  free(msg->payload);
  free(msg);
  return;
}