safe commit;
This commit is contained in:
		
							parent
							
								
									ab88f3e078
								
							
						
					
					
						commit
						cdee55108b
					
				@ -22,6 +22,11 @@
 | 
			
		||||
 | 
			
		||||
#include <sntl/pth_queue.h>
 | 
			
		||||
 | 
			
		||||
/* error codes */
 | 
			
		||||
#define SXOREPLYREQ  44  /* protocol require reply with expression,
 | 
			
		||||
                          * or expression return for the request */
 | 
			
		||||
#define SXOTIMEDOUT  45  /* timedout */
 | 
			
		||||
 | 
			
		||||
/* sexp helpers */
 | 
			
		||||
#define SEXP_IS_LIST(sx) \
 | 
			
		||||
        ((sx)->ty == SEXP_LIST) ? 1 : 0
 | 
			
		||||
@ -116,6 +121,7 @@ typedef struct __sexp_payload_t {
 | 
			
		||||
#define ESXMSG_PULSE     (1 << 6)
 | 
			
		||||
#define ESXMSG_NOWAIT    (1 << 7)
 | 
			
		||||
#define ESXMSG_ISREPLY   (1 << 8)
 | 
			
		||||
#define ESXMSG_CLOSURE   (1 << 9)
 | 
			
		||||
 | 
			
		||||
typedef struct __message_t {
 | 
			
		||||
  chnl_t *pch; /** < channel of the message(if applicable) */
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										101
									
								
								lib/connection.c
									
									
									
									
									
								
							
							
						
						
									
										101
									
								
								lib/connection.c
									
									
									
									
									
								
							@ -780,8 +780,21 @@ static void *__msg_queue_thread(void *ctx)
 | 
			
		||||
      /* now we need to complete the request */
 | 
			
		||||
      sx = (sexp_t *)msg->payload; tb = buf;
 | 
			
		||||
      if(!(msg->flags & ESXMSG_PULSE)) { /* %s))) */
 | 
			
		||||
        if(!(msg->flags & ESXMSG_ISREPLY))
 | 
			
		||||
          snprintf(buf, 4096, "(ch-msg (:chid %lu (:msgid %lu ", ch->cid,
 | 
			
		||||
                   msg->mid);
 | 
			
		||||
        else {
 | 
			
		||||
          if(!sx) {
 | 
			
		||||
            snprintf(buf, 4096, "(ch-msg-rete (:chid %lu (:msgid %lu (%d))))", ch->cid,
 | 
			
		||||
                     msg->mid, msg->opcode);
 | 
			
		||||
            /* mark it to close */
 | 
			
		||||
            msg->flags |= ESXMSG_CLOSURE;
 | 
			
		||||
            /* ok, here we will write it and wait, destroying dialog while reply */
 | 
			
		||||
            goto __ssl_write;
 | 
			
		||||
          } else snprintf(buf, 4096, "(ch-msg-repl (:chid %lu (:msgid %lu ", ch->cid,
 | 
			
		||||
                          msg->mid);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        len = strlen(buf);
 | 
			
		||||
        tb += len*sizeof(char);
 | 
			
		||||
        if(print_sexp(tb, 4096 - (len + 4*sizeof(char)), sx)) {
 | 
			
		||||
@ -798,11 +811,31 @@ static void *__msg_queue_thread(void *ctx)
 | 
			
		||||
        }
 | 
			
		||||
        len = strlen(tb);
 | 
			
		||||
        tb += len*sizeof(char);
 | 
			
		||||
        strcat(tb, ")))\0");
 | 
			
		||||
        strcat(tb, ")))");
 | 
			
		||||
      } else { /* pulse messages */
 | 
			
		||||
        tb = tb;
 | 
			
		||||
        /* here we're shouldn't process reply procedure */
 | 
			
		||||
        snprintf(buf, 4096, "(ch-msg-pulse (:chid %lu (:msgid %lu ", ch->cid,
 | 
			
		||||
                 msg->mid);
 | 
			
		||||
        len = strlen(buf); /* FIXME: code double shit ! */
 | 
			
		||||
        tb += len*sizeof(char);
 | 
			
		||||
        if(print_sexp(tb, 4096 - (len + 4*sizeof(char)), sx)) {
 | 
			
		||||
          msg->opcode = ENOMEM;
 | 
			
		||||
          /* we don't need to wake up anybody */
 | 
			
		||||
          if(msg->flags & ESXMSG_TIMEDOUT) {
 | 
			
		||||
            /* clean up all the shit:
 | 
			
		||||
             * 1. remove from message tree
 | 
			
		||||
             * 2. destroy message itself
 | 
			
		||||
             */
 | 
			
		||||
            /* TODO: destroy the message */
 | 
			
		||||
          } else
 | 
			
		||||
            pthread_mutex_unlock(&(msg->wait));
 | 
			
		||||
        }
 | 
			
		||||
        len = strlen(tb);
 | 
			
		||||
        tb += len*sizeof(char);
 | 
			
		||||
        strcat(tb, ")))");
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
    __ssl_write:
 | 
			
		||||
      /* write it */
 | 
			
		||||
      pthread_mutex_lock(&(co->oplock)); /* exclusive write */
 | 
			
		||||
      SSL_write(co->ssl, (void *)buf, strlen(buf) + sizeof(char)); /* TODO: SSL*/
 | 
			
		||||
@ -1557,6 +1590,9 @@ static int __message_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec
 | 
			
		||||
{
 | 
			
		||||
  int r = 0;
 | 
			
		||||
  sxmsg_t *m = NULL;
 | 
			
		||||
  conn_t *co = ch->connection;
 | 
			
		||||
 | 
			
		||||
  *msg = NULL;
 | 
			
		||||
 | 
			
		||||
  r = __create_reg_msg(&m, ch);
 | 
			
		||||
  if(r) return r;
 | 
			
		||||
@ -1565,7 +1601,30 @@ static int __message_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec
 | 
			
		||||
    pthread_rwlock_wrlock(&(ch->msglock));
 | 
			
		||||
    usrtc_insert(ch->msgs_tree, &(m->pendingq_node), &(m->mid));
 | 
			
		||||
    pthread_rwlock_unlock(&(ch->msglock));
 | 
			
		||||
 | 
			
		||||
    /* message assign */
 | 
			
		||||
    m->opcode = 0;
 | 
			
		||||
    m->payload = (void *)sx;
 | 
			
		||||
 | 
			
		||||
    /* put the message to the run queue */
 | 
			
		||||
    r = pth_queue_add(co->mqueue, (void *)m, USR_MSG);
 | 
			
		||||
    if(r) return r; /* FIXME: better give up */
 | 
			
		||||
 | 
			
		||||
    if(m->flags & ESXMSG_PENDING) {
 | 
			
		||||
      if(!tio) pthread_mutex_lock(&(m->wait));
 | 
			
		||||
      else pthread_mutex_timedlock(&(m->wait), tio);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if(tio && (m->flags & ESXMSG_PENDING))
 | 
			
		||||
      return SXOTIMEDOUT;
 | 
			
		||||
 | 
			
		||||
    if(!m->payload) {
 | 
			
		||||
      /* TODO: destroy the message */
 | 
			
		||||
      r = m->opcode;
 | 
			
		||||
    } else {
 | 
			
		||||
      *msg = m;
 | 
			
		||||
      r = SXOREPLYREQ;
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  return r;
 | 
			
		||||
@ -1581,19 +1640,51 @@ int msg_send_timed(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec *tio)
 | 
			
		||||
  return __message_send(ch, sx, msg, tio);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static int __msg_reply(sxmsg_t *msg, sexp_t *sx, struct timespec *tio, int opcode)
 | 
			
		||||
{
 | 
			
		||||
  int r = 0;
 | 
			
		||||
  chnl_t *ch = msg->pch;
 | 
			
		||||
  conn_t *co = ch->connection;
 | 
			
		||||
 | 
			
		||||
  msg->payload = sx;
 | 
			
		||||
  msg->opcode = opcode;
 | 
			
		||||
  msg->flags |= ESXMSG_PENDING; /* pending */
 | 
			
		||||
  msg->flags |= ESXMSG_ISREPLY; /* this is a reply */
 | 
			
		||||
 | 
			
		||||
  /* put the message to the queue */
 | 
			
		||||
  r = pth_queue_add(co->mqueue, (void *)msg, USR_MSG);
 | 
			
		||||
  if(r) return r; /* FIXME: better give up */
 | 
			
		||||
 | 
			
		||||
  if(msg->flags & ESXMSG_PENDING) {
 | 
			
		||||
    if(!tio) pthread_mutex_lock(&(msg->wait));
 | 
			
		||||
    else pthread_mutex_timedlock(&(msg->wait), tio);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  if(tio && (msg->flags & ESXMSG_PENDING))
 | 
			
		||||
    return SXOTIMEDOUT;
 | 
			
		||||
 | 
			
		||||
  r = msg->opcode;
 | 
			
		||||
 | 
			
		||||
  if(msg->flags & ESXMSG_CLOSURE) {
 | 
			
		||||
    /* TODO: destroy message */
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  return r;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int msg_return(sxmsg_t *msg, int opcode)
 | 
			
		||||
{
 | 
			
		||||
  return 0;
 | 
			
		||||
  return __msg_reply(msg, NULL, NULL, opcode);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int msg_reply(sxmsg_t *msg, sexp_t *sx)
 | 
			
		||||
{
 | 
			
		||||
  return 0;
 | 
			
		||||
  return __msg_reply(msg, sx, NULL, 0);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int msg_reply_timed(sxmsg_t *msg, sexp_t *sx, struct timespec *tio)
 | 
			
		||||
{
 | 
			
		||||
  return 0;
 | 
			
		||||
  return __msg_reply(msg, sx, tio, 0);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int msg_send_pulse(chnl_t *ch, sexp_t *sx)
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user