added thread outbone caller;
This commit is contained in:
		
							parent
							
								
									5ea003dfad
								
							
						
					
					
						commit
						f737e500d1
					
				@ -1026,6 +1026,59 @@ static void *__cxmaster_thread_listener(void *wctx)
 | 
				
			|||||||
  return NULL;
 | 
					  return NULL;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					static void *__rmsg_queue_thread(void *ctx)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					  conn_t *co = (conn_t *)ctx;
 | 
				
			||||||
 | 
					  pth_msg_t *tmp = malloc(sizeof(pth_msg_t));
 | 
				
			||||||
 | 
					  sxmsg_t *msg;
 | 
				
			||||||
 | 
					  chnl_t *ch;
 | 
				
			||||||
 | 
					  int r = 0;
 | 
				
			||||||
 | 
					  char *rpcf;
 | 
				
			||||||
 | 
					  sexp_t *sx;
 | 
				
			||||||
 | 
					  usrtc_node_t *node = NULL;
 | 
				
			||||||
 | 
					  cx_rpc_t *rpccall;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  if(!tmp) return NULL;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  while(1) {
 | 
				
			||||||
 | 
					    r = pth_queue_get(co->mqueue, NULL, tmp);
 | 
				
			||||||
 | 
					    if(r) {
 | 
				
			||||||
 | 
					      free(tmp);
 | 
				
			||||||
 | 
					      return NULL;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    msg = tmp->data;
 | 
				
			||||||
 | 
					    if(!msg) continue; /* spurious !! */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    /* check to right job */
 | 
				
			||||||
 | 
					    if(!(msg->flags & ESXMSG_USR)) { /* not a regular message */
 | 
				
			||||||
 | 
					      msg->flags |= ESXMSG_NOWAY; /* mark it's as undeliverable */
 | 
				
			||||||
 | 
					      msg->flags &= ~ESXMSG_PENDING;
 | 
				
			||||||
 | 
					      __DBGLINE;
 | 
				
			||||||
 | 
					      pthread_mutex_unlock(&(msg->wait)); /* wake up the waitee */
 | 
				
			||||||
 | 
					      continue;
 | 
				
			||||||
 | 
					    } else {
 | 
				
			||||||
 | 
					      /* now we're need to have a deal with the rpc calling, other - we don't care */
 | 
				
			||||||
 | 
					      ch = msg->pch;
 | 
				
			||||||
 | 
					      sx = (sexp_t *)msg->payload;
 | 
				
			||||||
 | 
					      /* get the function name */
 | 
				
			||||||
 | 
					      if(sx->ty == SEXP_LIST) rpcf = sx->list->val;
 | 
				
			||||||
 | 
					      else rpcf = sx->val;
 | 
				
			||||||
 | 
					      printf("Inbound queue RPC call = '%s'\n", rpcf);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      node = usrtc_lookup(ch->rpc_list->rpc_tree, rpcf);
 | 
				
			||||||
 | 
					      if(!node) {
 | 
				
			||||||
 | 
					        printf("RPC call illegal!\n");
 | 
				
			||||||
 | 
					        /* TODO: correct reply with an error code */
 | 
				
			||||||
 | 
					      } else rpccall = (cx_rpc_t *)usrtc_node_getdata(node);
 | 
				
			||||||
 | 
					      /* call this ! */
 | 
				
			||||||
 | 
					      rpccall->rpcf((void *)msg, sx);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  return NULL;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static void *__msg_queue_thread(void *ctx)
 | 
					static void *__msg_queue_thread(void *ctx)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
  conn_t *co = (conn_t *)ctx;
 | 
					  conn_t *co = (conn_t *)ctx;
 | 
				
			||||||
@ -1547,6 +1600,7 @@ int connection_create(conn_t *co, int sck)
 | 
				
			|||||||
  char *buf = NULL;
 | 
					  char *buf = NULL;
 | 
				
			||||||
  usrtc_t *ch_tree, *rpc_tree;
 | 
					  usrtc_t *ch_tree, *rpc_tree;
 | 
				
			||||||
  pth_queue_t *rqueue = malloc(sizeof(pth_queue_t));
 | 
					  pth_queue_t *rqueue = malloc(sizeof(pth_queue_t));
 | 
				
			||||||
 | 
					  pth_queue_t *mqueue = malloc(sizeof(pth_queue_t));
 | 
				
			||||||
  idx_allocator_t *idx_ch = malloc(sizeof(idx_allocator_t));
 | 
					  idx_allocator_t *idx_ch = malloc(sizeof(idx_allocator_t));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  if(!co) return EINVAL;
 | 
					  if(!co) return EINVAL;
 | 
				
			||||||
@ -1572,10 +1626,13 @@ int connection_create(conn_t *co, int sck)
 | 
				
			|||||||
  usrtc_init(ch_tree, USRTC_REDBLACK, MAX_CHANNELS, __cmp_ulong);
 | 
					  usrtc_init(ch_tree, USRTC_REDBLACK, MAX_CHANNELS, __cmp_ulong);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  co->idx_ch = idx_ch;
 | 
					  co->idx_ch = idx_ch;
 | 
				
			||||||
  
 | 
					
 | 
				
			||||||
  /* assign message queue */
 | 
					  /* assign message queue */
 | 
				
			||||||
  pth_queue_init(rqueue); /* TODO: check for initialization */
 | 
					  pth_queue_init(rqueue); /* TODO: check for initialization */
 | 
				
			||||||
  co->rqueue = rqueue;
 | 
					  co->rqueue = rqueue;
 | 
				
			||||||
 | 
					  /* assigned outbone message queue master also has this one */
 | 
				
			||||||
 | 
					  pth_queue_init(mqueue); /* TODO: check for initialization */
 | 
				
			||||||
 | 
					  co->mqueue = mqueue;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  /* init SSL certificates and context */
 | 
					  /* init SSL certificates and context */
 | 
				
			||||||
  co->ctx = SSL_CTX_new(SSLv3_server_method());
 | 
					  co->ctx = SSL_CTX_new(SSLv3_server_method());
 | 
				
			||||||
@ -1665,6 +1722,9 @@ int connection_create(conn_t *co, int sck)
 | 
				
			|||||||
    pthread_rwlock_wrlock(&conn_sys->rwlock);
 | 
					    pthread_rwlock_wrlock(&conn_sys->rwlock);
 | 
				
			||||||
    usrtc_insert(conn_sys->connections, &co->csnode, (void *)co->uuid);
 | 
					    usrtc_insert(conn_sys->connections, &co->csnode, (void *)co->uuid);
 | 
				
			||||||
    pthread_rwlock_unlock(&conn_sys->rwlock);
 | 
					    pthread_rwlock_unlock(&conn_sys->rwlock);
 | 
				
			||||||
 | 
					    /* threads poll --- */
 | 
				
			||||||
 | 
					    r = pthread_create(&co->msgthread, NULL, __msg_queue_thread, (void *)co); /* TODO: check for thread creation */
 | 
				
			||||||
 | 
					    r = pthread_create(&co->rmsgthread, NULL, __rmsg_queue_thread, (void *)co); /* TODO: check for thread creation */
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  return r;
 | 
					  return r;
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user