code cleanup;
parent
ad24dd4391
commit
ebfadad3e9
@ -1 +1 @@
|
||||
nobase_include_HEADERS = sntl/pth_queue.h sntl/connection.h
|
||||
nobase_include_HEADERS = sntl/sntllv2.h sntl/errno.h sntl/limits.h
|
||||
|
@ -1,117 +0,0 @@
|
||||
/*
|
||||
* This is a proprietary software. See COPYING for further details.
|
||||
*
|
||||
* (c) 2013 Copyright Askele, inc. <http://askele.com>
|
||||
* (c) 2013 Copyright Askele Ingria, inc. <http://askele-ingria.com>
|
||||
* (c) 2014 Copyright Confident, inc. (granted permission to use in commercial software)
|
||||
*/
|
||||
|
||||
/**
|
||||
* @file pth_queue.h
|
||||
* @author Alexander Vdolainen
|
||||
* @date 4 Nov 2013, 20 Dec 2014 (dynamic polls)
|
||||
* @brief queue implementation for threads intercommunication
|
||||
*
|
||||
*/
|
||||
|
||||
#ifndef __PTH_QUEUE_H__
|
||||
#define __PTH_QUEUE_H__
|
||||
|
||||
#include <pthread.h>
|
||||
#include <tdata/idx_allocator.h>
|
||||
|
||||
/* possible message types, ones with POLL_ prefix valid on for pth_dqtpoll_* */
|
||||
#define SYS_MSG 0x0f0affee
|
||||
#define USR_MSG 0x0afeeffe
|
||||
#define POLL_DECREASE 0x0afafafe
|
||||
#define POLL_INCREASE 0x0afaffff
|
||||
#define NIL_MSG 0x0
|
||||
#define END_MSG 0xdead0000
|
||||
|
||||
/* max amount of threads within the poll */
|
||||
#define MAX_POLL_VALUE 32
|
||||
|
||||
typedef struct pth_msg_s {
|
||||
void *data; /** < message payload */
|
||||
unsigned int msgtype; /** < message type ID */
|
||||
unsigned int qlength; /** < current queue length (actual on add moment),
|
||||
* it makes no sense with few readers */
|
||||
usrtc_node_t node;
|
||||
} pth_msg_t;
|
||||
|
||||
typedef struct pth_queue_s {
|
||||
unsigned int length;
|
||||
/* sync */
|
||||
pthread_mutex_t mutex;
|
||||
pthread_cond_t cond;
|
||||
/* queue data */
|
||||
usrtc_t qtree;
|
||||
/* cache */
|
||||
usrtc_t msgcache;
|
||||
} pth_queue_t;
|
||||
|
||||
int pth_queue_init(pth_queue_t *queue);
|
||||
|
||||
int pth_queue_add(pth_queue_t *queue, void *data, unsigned int msgtype);
|
||||
|
||||
int pth_queue_get(pth_queue_t *queue, const struct timespec *timeout,
|
||||
pth_msg_t *msg);
|
||||
|
||||
unsigned int pth_queue_length(pth_queue_t *queue);
|
||||
|
||||
int pth_queue_destroy(pth_queue_t *queue, int freedata,
|
||||
void (*free_msg)(void *));
|
||||
|
||||
/* dynamic queue thread poll ... bbrrr .... ok, ok with beer
|
||||
* Dynamic queue thread poll is a queue like pth_queue,
|
||||
* but also it has itäs own mamagement for threads - that's
|
||||
* why dynamic.
|
||||
* Ideally, the model is trying to achieve the following:
|
||||
* 1. one thread in queue while no or very small amount of jobs in the queue
|
||||
* 2. grow until max threads is reached while too many requests
|
||||
* 3. gently slide down volume of threads after job heat
|
||||
* 4. minimal additional drawbacks (i hate something periodically running,
|
||||
* it's bad practice)
|
||||
* The model is quite simple, we should make spurious wakeups equal to zero,
|
||||
* if no - decrease poll value, and, if we don't have thread available -
|
||||
* create it.
|
||||
*/
|
||||
typedef struct pth_dqtpoll_s {
|
||||
pth_queue_t *queue; /** < Job queue */
|
||||
pthread_t *poll; /** < Thread descriptors */
|
||||
int (*jobdata_callback)(void *); /** < Callback to have a deal with data */
|
||||
int flags; /** < Flags */
|
||||
idx_allocator_t *idx; /** < index allocator for the poll threads */
|
||||
pthread_rwlock_t stats_lock; /** < rwlock for stats data */
|
||||
unsigned long spurious_wakeups; /** < amount of spurios wakeups */
|
||||
int poll_value; /** < value of the poll (totally) */
|
||||
struct timeval sched_time;
|
||||
int msgop;
|
||||
} pth_dqtpoll_t;
|
||||
|
||||
/* flags for poll */
|
||||
#define DQTPOLL_RUNNING (1 << 1) /* poll is running */
|
||||
#define DQTPOLL_DEADSTAGE (1 << 2) /* poll in the stage of destroy */
|
||||
|
||||
/* keep it stupid */
|
||||
#define DQTPOLL_DELTAMS 500000
|
||||
#define DQTPOLL_DELTASE 0
|
||||
|
||||
/* init poll, structure must be allocated */
|
||||
int pth_dqtpoll_init(pth_dqtpoll_t*, int (*jobdata_callback)(void *));
|
||||
|
||||
/* run poll: poll */
|
||||
int pth_dqtpoll_run(pth_dqtpoll_t*);
|
||||
|
||||
/* add the job to the queue: poll, job data, message type */
|
||||
int pth_dqtpoll_add(pth_dqtpoll_t*, void*, unsigned int);
|
||||
|
||||
/* destroy the poll: poll, force flag
|
||||
* if force flag is set (!= 0), give up
|
||||
* about jobs, if no, do the job, but don't
|
||||
* accept the new ones, and destroy all poll
|
||||
* with last thread.
|
||||
*/
|
||||
int pth_dqtpoll_destroy(pth_dqtpoll_t*, int);
|
||||
|
||||
#endif /* __PTH_QUEUE_H__ */
|
File diff suppressed because it is too large
Load Diff
@ -1,286 +0,0 @@
|
||||
/*
|
||||
* Secure Network Transport Layer Library implementation.
|
||||
* This is a proprietary software. See COPYING for further details.
|
||||
*
|
||||
* (c) Askele Group 2013-2015 <http://askele.com>
|
||||
*
|
||||
*/
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
#include <errno.h>
|
||||
#include <string.h>
|
||||
#include <pthread.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/time.h>
|
||||
#include <sys/types.h>
|
||||
#include <fcntl.h>
|
||||
|
||||
|
||||
#ifdef WIN32
|
||||
#include <Winsock2.h>
|
||||
#else
|
||||
#include <sys/select.h>
|
||||
#include <netdb.h>
|
||||
#include <unistd.h>
|
||||
#include <uuid/uuid.h>
|
||||
#endif
|
||||
|
||||
#include <openssl/ssl.h>
|
||||
#include <openssl/err.h>
|
||||
|
||||
#include <tdata/usrtc.h>
|
||||
#include <sexpr/sexp.h>
|
||||
|
||||
#include <sntl/connection.h>
|
||||
|
||||
void __destroy_msg(sxmsg_t *msg)
|
||||
{
|
||||
chnl_t *ch = msg->pch;
|
||||
|
||||
if(msg->flags & ESXMSG_USR) {
|
||||
pthread_mutex_lock(&(ch->oplock));
|
||||
idx_free(ch->idx_msg, msg->mid);
|
||||
pthread_mutex_unlock(&(ch->oplock));
|
||||
} else if(msg->flags & ESXMSG_SYS) {
|
||||
//if(msg->uuid) free(msg->uuid);
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&(msg->wait));
|
||||
pthread_mutex_destroy(&(msg->wait));
|
||||
free(msg);
|
||||
return;
|
||||
}
|
||||
|
||||
sxmsg_t *__allocate_msg(int *res)
|
||||
{
|
||||
sxmsg_t *msg = malloc(sizeof(sxmsg_t));
|
||||
int r = 0;
|
||||
|
||||
if(!msg) {
|
||||
*res = ENOMEM;
|
||||
return NULL;
|
||||
} else {
|
||||
memset(msg, 0, sizeof(sxmsg_t));
|
||||
if((r = pthread_mutex_init(&(msg->wait), NULL))) {
|
||||
free(msg);
|
||||
*res = r;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
usrtc_node_init(&(msg->pendingq_node), msg);
|
||||
}
|
||||
|
||||
*res = 0;
|
||||
|
||||
return msg;
|
||||
}
|
||||
|
||||
int __create_reg_msg(sxmsg_t **msg, chnl_t *ch)
|
||||
{
|
||||
int r = 0;
|
||||
sxmsg_t *sm = __allocate_msg(&r);
|
||||
|
||||
if(r) return r;
|
||||
else {
|
||||
sm->pch = ch;
|
||||
sm->flags = (ESXMSG_USR | ESXMSG_PENDING);
|
||||
|
||||
/* ok allocate message ID */
|
||||
pthread_mutex_lock(&(ch->oplock));
|
||||
sm->mid = idx_allocate(ch->idx_msg);
|
||||
pthread_mutex_unlock(&(ch->oplock));
|
||||
|
||||
pthread_mutex_lock(&(sm->wait));
|
||||
*msg = sm;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int __create_sys_msg(sxmsg_t **msg, char *uuid, chnl_t *ch, sxpayload_t *data)
|
||||
{
|
||||
int r = 0;
|
||||
sxmsg_t *m = __allocate_msg(&r);
|
||||
|
||||
if(r) return r;
|
||||
else {
|
||||
/* fill values */
|
||||
m->pch = ch;
|
||||
m->uuid = uuid;
|
||||
m->payload = data;
|
||||
/* set the right flags */
|
||||
m->flags = (ESXMSG_SYS | ESXMSG_PENDING);
|
||||
/* we need to lock the wait mutex */
|
||||
pthread_mutex_lock(&(m->wait));
|
||||
|
||||
*msg = m;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* message passing */
|
||||
|
||||
/*
|
||||
* How message sending works:
|
||||
* 1. Create a message structure assigned to the channel,
|
||||
* 2. Put S-expression context to it
|
||||
* 3. Put the message to the queue
|
||||
* 4. expect the result waiting on the lock mutex
|
||||
*/
|
||||
static int __message_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec *tio)
|
||||
{
|
||||
int r = 0;
|
||||
sxmsg_t *m = NULL;
|
||||
conn_t *co = ch->connection;
|
||||
|
||||
if(!(co->flags & CXCONN_ESTABL)) {
|
||||
destroy_sexp(sx);
|
||||
return ESXNOCONNECT;
|
||||
}
|
||||
|
||||
*msg = NULL;
|
||||
|
||||
r = __create_reg_msg(&m, ch);
|
||||
if(r) return r;
|
||||
else {
|
||||
/* put the message to the search tree */
|
||||
pthread_rwlock_wrlock(&(ch->msglock));
|
||||
usrtc_insert(ch->msgs_tree, &(m->pendingq_node), &(m->mid));
|
||||
pthread_rwlock_unlock(&(ch->msglock));
|
||||
|
||||
/* message assign */
|
||||
m->opcode = 0;
|
||||
m->payload = (void *)sx;
|
||||
/* assign initial sx */
|
||||
m->initial_sx = sx;
|
||||
|
||||
/* put the message to the run queue */
|
||||
r = pth_queue_add(co->mqueue, (void *)m, USR_MSG);
|
||||
if(r) return r; /* FIXME: better give up */
|
||||
|
||||
if(m->flags & ESXMSG_PENDING) {
|
||||
if(!tio) pthread_mutex_lock(&(m->wait));
|
||||
else pthread_mutex_timedlock(&(m->wait), tio);
|
||||
}
|
||||
if(tio && (m->flags & ESXMSG_PENDING))
|
||||
return ESXOTIMEDOUT;
|
||||
if(!m->payload) {
|
||||
r = m->opcode;
|
||||
/* first remove the message from tree */
|
||||
pthread_rwlock_wrlock(&(ch->msglock));
|
||||
usrtc_delete(ch->msgs_tree, &(m->pendingq_node));
|
||||
pthread_rwlock_unlock(&(ch->msglock));
|
||||
/* destroy s expression */
|
||||
destroy_sexp(m->initial_sx);
|
||||
/* destroy */
|
||||
__destroy_msg(m);
|
||||
} else {
|
||||
*msg = m;
|
||||
if(m->opcode == ESXNOCONNECT || m->opcode == ESXRAPIDREPLY)
|
||||
r = m->opcode;
|
||||
else r = ESXOREPLYREQ;
|
||||
/* FIXME: remove ugly code */
|
||||
if(m->opcode == ESXRAPIDREPLY) {
|
||||
/* first remove the message from tree */
|
||||
pthread_rwlock_wrlock(&(ch->msglock));
|
||||
usrtc_delete(ch->msgs_tree, &(m->pendingq_node));
|
||||
pthread_rwlock_unlock(&(ch->msglock));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
int msg_send(chnl_t *ch, sexp_t *sx, sxmsg_t **msg)
|
||||
{
|
||||
return __message_send(ch, sx, msg, NULL);
|
||||
}
|
||||
|
||||
int msg_send_timed(chnl_t *ch, sexp_t *sx, sxmsg_t **msg, struct timespec *tio)
|
||||
{
|
||||
return __message_send(ch, sx, msg, tio);
|
||||
}
|
||||
|
||||
static int __msg_reply(sxmsg_t *msg, sexp_t *sx, struct timespec *tio, int opcode,
|
||||
int israpid)
|
||||
{
|
||||
int r = 0;
|
||||
chnl_t *ch = msg->pch;
|
||||
conn_t *co = ch->connection;
|
||||
|
||||
if(!(co->flags & CXCONN_ESTABL)) {
|
||||
destroy_sexp(sx);
|
||||
return ESXNOCONNECT;
|
||||
}
|
||||
|
||||
if(msg->flags & ESXMSG_ISREPLY)
|
||||
destroy_sexp((sexp_t *)msg->payload);
|
||||
|
||||
msg->payload = sx;
|
||||
msg->opcode = opcode;
|
||||
msg->flags |= ESXMSG_PENDING; /* pending */
|
||||
msg->flags |= ESXMSG_ISREPLY; /* this is a reply */
|
||||
if(israpid) msg->flags |= ESXMSG_ISRAPID; /* message is a rapid message */
|
||||
|
||||
if(!sx || israpid) msg->flags &= ~ESXMSG_PENDING;
|
||||
else msg->flags |= ESXMSG_RMONRETR;
|
||||
|
||||
/* put the message to the queue */
|
||||
r = pth_queue_add(co->mqueue, (void *)msg, USR_MSG);
|
||||
if(r) return r; /* FIXME: better give up */
|
||||
if(!sx || israpid) {
|
||||
/* wait for write */
|
||||
//pthread_mutex_lock(&(msg->wait));
|
||||
return 0;
|
||||
}
|
||||
|
||||
if(msg->flags & ESXMSG_PENDING) {
|
||||
if(!tio) pthread_mutex_lock(&(msg->wait));
|
||||
else pthread_mutex_timedlock(&(msg->wait), tio);
|
||||
}
|
||||
|
||||
if(tio && (msg->flags & ESXMSG_PENDING)) {
|
||||
msg->flags &= ~ESXMSG_PENDING; /* we will not wait for it */
|
||||
return ESXOTIMEDOUT;
|
||||
}
|
||||
|
||||
r = msg->opcode;
|
||||
|
||||
if(msg->flags & ESXMSG_CLOSURE) {
|
||||
__destroy_msg(msg);
|
||||
}
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
int msg_return(sxmsg_t *msg, int opcode)
|
||||
{
|
||||
return __msg_reply(msg, NULL, NULL, opcode, 0);
|
||||
}
|
||||
|
||||
int msg_reply(sxmsg_t *msg, sexp_t *sx)
|
||||
{
|
||||
return __msg_reply(msg, sx, NULL, 0, 0);
|
||||
}
|
||||
|
||||
int msg_reply_timed(sxmsg_t *msg, sexp_t *sx, struct timespec *tio)
|
||||
{
|
||||
return __msg_reply(msg, sx, tio, 0, 0);
|
||||
}
|
||||
|
||||
int msg_reply_rapid(sxmsg_t *msg, sexp_t *sx)
|
||||
{
|
||||
return __msg_reply(msg, sx, NULL, 0, 1);
|
||||
}
|
||||
|
||||
int msg_rapid_clean(sxmsg_t *msg)
|
||||
{
|
||||
destroy_sexp(msg->initial_sx);
|
||||
if(msg->payload) destroy_sexp(msg->payload);
|
||||
__destroy_msg(msg);
|
||||
|
||||
return 0;
|
||||
}
|
@ -1,465 +0,0 @@
|
||||
/*
|
||||
* This is a proprietary software. See COPYING for further details.
|
||||
*
|
||||
*
|
||||
*
|
||||
* (c) Askele Group 2013-2015 <http://askele.com>
|
||||
*/
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/time.h>
|
||||
#include <fcntl.h>
|
||||
#include <errno.h>
|
||||
/**/
|
||||
#ifdef WIN32
|
||||
#include <Winsock2.h>
|
||||
#else
|
||||
|
||||
|
||||
#include <unistd.h>
|
||||
|
||||
#endif
|
||||
|
||||
/**/
|
||||
// #include <unistd.h>
|
||||
#include <string.h>
|
||||
#include <pthread.h>
|
||||
|
||||
#include <tdata/usrtc.h>
|
||||
|
||||
#include <sntl/pth_queue.h>
|
||||
|
||||
#define MAX_QUEUE_SIZE 4096
|
||||
#define MAX_QUEUE_POOL 256
|
||||
|
||||
static long __cmp_uint(const void *a, const void *b)
|
||||
{
|
||||
return (long)(*(unsigned int *)a - *(unsigned int *)b);
|
||||
}
|
||||
|
||||
static inline pth_msg_t *__get_newmsg(pth_queue_t *queue)
|
||||
{
|
||||
usrtc_t *tree = &queue->msgcache;
|
||||
usrtc_node_t *node;
|
||||
pth_msg_t *tmp;
|
||||
|
||||
if(usrtc_count(tree)) {
|
||||
node = usrtc_first(tree);
|
||||
tmp = (pth_msg_t *)usrtc_node_getdata(node);
|
||||
usrtc_delete(tree, node);
|
||||
} else {
|
||||
tmp = malloc(sizeof(pth_msg_t));
|
||||
tree = &queue->qtree;
|
||||
node = &tmp->node;
|
||||
usrtc_node_init(node, tmp);
|
||||
}
|
||||
/* insert it */
|
||||
tree = &queue->qtree;
|
||||
tmp->qlength = usrtc_count(tree);
|
||||
usrtc_insert(tree, node, (void *)(&tmp->qlength));
|
||||
|
||||
return tmp;
|
||||
}
|
||||
|
||||
static inline void __release_msg(pth_queue_t *queue, pth_msg_t *msg)
|
||||
{
|
||||
usrtc_node_t *node = &msg->node;
|
||||
usrtc_t *tree = &queue->qtree;
|
||||
|
||||
tree = &queue->qtree; /* remove from queue */
|
||||
usrtc_delete(tree, node);
|
||||
|
||||
tree = &queue->msgcache;
|
||||
|
||||
if(usrtc_count(tree) >= MAX_QUEUE_POOL)
|
||||
free(msg);
|
||||
else {
|
||||
msg->data = NULL;
|
||||
msg->msgtype = NIL_MSG;
|
||||
usrtc_insert(tree, node, (void *)&msg->qlength);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
int pth_queue_init(pth_queue_t *queue)
|
||||
{
|
||||
int r = 0;
|
||||
|
||||
memset(queue, 0, sizeof(pth_queue_t));
|
||||
if((r = pthread_cond_init(&queue->cond, NULL)))
|
||||
return r;
|
||||
|
||||
if((r = pthread_mutex_init(&queue->mutex, NULL))) {
|
||||
pthread_cond_destroy(&queue->cond);
|
||||
return r;
|
||||
}
|
||||
|
||||
usrtc_init(&queue->qtree, USRTC_AVL, MAX_QUEUE_SIZE, __cmp_uint);
|
||||
usrtc_init(&queue->msgcache, USRTC_AVL, MAX_QUEUE_POOL, __cmp_uint);
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
int pth_queue_add(pth_queue_t *queue, void *data, unsigned int msgtype)
|
||||
{
|
||||
pth_msg_t *newmsg;
|
||||
|
||||
pthread_mutex_lock(&queue->mutex);
|
||||
newmsg = __get_newmsg(queue);
|
||||
if (newmsg == NULL) {
|
||||
pthread_mutex_unlock(&queue->mutex);
|
||||
return ENOMEM;
|
||||
}
|
||||
|
||||
newmsg->data = data;
|
||||
newmsg->msgtype = msgtype;
|
||||
|
||||
if(queue->length == 0)
|
||||
pthread_cond_broadcast(&queue->cond);
|
||||
queue->length++;
|
||||
pthread_mutex_unlock(&queue->mutex);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int pth_queue_get(pth_queue_t *queue, const struct timespec *timeout, pth_msg_t *msg)
|
||||
{
|
||||
usrtc_t *tree;
|
||||
usrtc_node_t *node = NULL;
|
||||
pth_msg_t *tmp;
|
||||
int r = 0;
|
||||
struct timespec abstimeout;
|
||||
|
||||
if (queue == NULL || msg == NULL)
|
||||
return EINVAL;
|
||||
else
|
||||
tree = &queue->qtree;
|
||||
|
||||
if (timeout) { /* setup timeout */
|
||||
struct timeval now;
|
||||
|
||||
gettimeofday(&now, NULL);
|
||||
abstimeout.tv_sec = now.tv_sec + timeout->tv_sec;
|
||||
abstimeout.tv_nsec = (now.tv_usec * 1000) + timeout->tv_nsec;
|
||||
if (abstimeout.tv_nsec >= 1000000000) {
|
||||
abstimeout.tv_sec++;
|
||||
abstimeout.tv_nsec -= 1000000000;
|
||||
}
|
||||
}
|
||||
|
||||
pthread_mutex_lock(&queue->mutex);
|
||||
|
||||
/* Will wait until awakened by a signal or broadcast */
|
||||
while ((node = usrtc_first(tree)) == NULL && r != ETIMEDOUT) { /* Need to loop to handle spurious wakeups */
|
||||
if (timeout)
|
||||
r = pthread_cond_timedwait(&queue->cond, &queue->mutex, &abstimeout);
|
||||
else
|
||||
pthread_cond_wait(&queue->cond, &queue->mutex);
|
||||
}
|
||||
if (r == ETIMEDOUT) {
|
||||
pthread_mutex_unlock(&queue->mutex);
|
||||
return r;
|
||||
}
|
||||
|
||||
tmp = (pth_msg_t *)usrtc_node_getdata(node);
|
||||
queue->length--;
|
||||
|
||||
msg->data = tmp->data;
|
||||
msg->msgtype = tmp->msgtype;
|
||||
msg->qlength = tmp->qlength; /* we will hold the msg id instead of size here */
|
||||
|
||||
__release_msg(queue, tmp);
|
||||
pthread_mutex_unlock(&queue->mutex);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int pth_queue_destroy(pth_queue_t *queue, int freedata, void (*free_msg)(void *))
|
||||
{
|
||||
int r = 0;
|
||||
usrtc_t *tree = &queue->qtree;
|
||||
usrtc_node_t *node = NULL;
|
||||
pth_msg_t *msg;
|
||||
|
||||
if (queue == NULL) return EINVAL;
|
||||
|
||||
pthread_mutex_lock(&queue->mutex);
|
||||
|
||||
for (node = usrtc_first(tree); node != NULL; node = usrtc_first(tree)) {
|
||||
usrtc_delete(tree, node);
|
||||
msg = (pth_msg_t *)usrtc_node_getdata(node);
|
||||
|
||||
if(freedata) free(msg->data);
|
||||
else if(free_msg) free_msg(msg->data);
|
||||
|
||||
free(msg);
|
||||
}
|
||||
/* free cache */
|
||||
tree = &queue->msgcache;
|
||||
for (node = usrtc_first(tree); node != NULL; node = usrtc_first(tree)) {
|
||||
usrtc_delete(tree, node);
|
||||
free(usrtc_node_getdata(node));
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&queue->mutex);
|
||||
r = pthread_mutex_destroy(&queue->mutex);
|
||||
pthread_cond_destroy(&queue->cond);
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
unsigned int pth_queue_length(pth_queue_t *queue)
|
||||
{
|
||||
unsigned int c;
|
||||
|
||||
pthread_mutex_lock(&queue->mutex);
|
||||
c = queue->length;
|
||||
pthread_mutex_unlock(&queue->mutex);
|
||||
|
||||
return c;
|
||||
}
|
||||
|
||||
/* dynamic queue thread poll */
|
||||
|
||||
struct __pthrd_data {
|
||||
pth_dqtpoll_t *pthrd;
|
||||
int myid;
|
||||
};
|
||||
|
||||
static void *__poll_thread(void *poll)
|
||||
{
|
||||
int r = 0;
|
||||
struct __pthrd_data *thrdata = (struct __pthrd_data *)poll;
|
||||
struct __pthrd_data *npoll = NULL;
|
||||
pth_msg_t msgbuf, sysbuf;
|
||||
pth_dqtpoll_t *p = thrdata->pthrd;
|
||||
pth_queue_t *q = p->queue;
|
||||
ulong_t myid = thrdata->myid;
|
||||
struct timeval now;
|
||||
int resched = 0;
|
||||
long tusec, si, mdrate;
|
||||
|
||||
while(1) {
|
||||
resched = 0;
|
||||
r = pth_queue_get(q, NULL, &msgbuf);
|
||||
pthread_rwlock_wrlock(&(p->stats_lock));
|
||||
if(p->flags & DQTPOLL_DEADSTAGE) { /* poll going to be killed */
|
||||
pthread_rwlock_unlock(&(p->stats_lock));
|
||||
idx_free(p->idx, myid);
|
||||
p->poll_value--;
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* now get the time */
|
||||
gettimeofday(&now, NULL);
|
||||
if((now.tv_sec >= p->sched_time.tv_sec) &&
|
||||
(now.tv_usec >= p->sched_time.tv_usec)) {
|
||||
resched = 1;
|
||||
/* set the new schedule time */
|
||||
si = 0;
|
||||
tusec = DQTPOLL_DELTAMS + now.tv_usec;
|
||||
if(tusec > 1000000) {
|
||||
tusec -= 1000000;
|
||||
si++;
|
||||
}
|
||||
p->sched_time.tv_sec += si + DQTPOLL_DELTASE;
|
||||
p->sched_time.tv_usec = tusec;
|
||||
}
|
||||
|
||||
if(resched) { /* ok now we need to resched and descrease/increase thread poll volume */
|
||||
if(p->msgop) mdrate = ((DQTPOLL_DELTASE*1000000) + DQTPOLL_DELTAMS)/p->msgop;
|
||||
else mdrate = 0;
|
||||
if((mdrate > p->poll_value) && (p->poll_value < MAX_POLL_VALUE)) { /* increase ! */
|
||||
if((npoll = malloc(sizeof(struct __pthrd_data)))) {
|
||||
npoll->myid = idx_allocate(p->idx);
|
||||
npoll->pthrd = p;
|
||||
p->poll_value++;
|
||||
/* create thread here */
|
||||
if(pthread_create(&(p->poll[npoll->myid]), NULL, __poll_thread, npoll)) {
|
||||
idx_free(p->idx, npoll->myid);
|
||||
p->poll_value--;
|
||||
free(npoll);
|
||||
}
|
||||
}
|
||||
} else if((p->poll_value > 2) && (mdrate < p->poll_value)) /* decrease */ {
|
||||
memset(&sysbuf, 0, sizeof(pth_msg_t));
|
||||
pth_queue_add(p->queue, &sysbuf, POLL_DECREASE);
|
||||
}
|
||||
|
||||
/* init all other stuff */
|
||||
p->msgop = 0;
|
||||
}
|
||||
|
||||
if(r) {
|
||||
pthread_rwlock_unlock(&(p->stats_lock));
|
||||
continue;
|
||||
} else p->msgop++;
|
||||
pthread_rwlock_unlock(&(p->stats_lock));
|
||||
|
||||
switch(msgbuf.msgtype) {
|
||||
case USR_MSG:
|
||||
/* do the job */
|
||||
p->jobdata_callback(msgbuf.data);
|
||||
break;
|
||||
case POLL_DECREASE:
|
||||
pthread_rwlock_rdlock(&(p->stats_lock));
|
||||
if(p->poll_value > 2) r = 1; /* exit now */
|
||||
pthread_rwlock_unlock(&(p->stats_lock));
|
||||
if(r) {
|
||||
pthread_rwlock_wrlock(&(p->stats_lock));
|
||||
idx_free(p->idx, myid);
|
||||
p->poll_value--;
|
||||
pthread_rwlock_unlock(&(p->stats_lock));
|
||||
free(poll);
|
||||
return NULL;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
/* TODO: do something ... */
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* init poll, structure must be allocated */
|
||||
int pth_dqtpoll_init(pth_dqtpoll_t *tpoll, int (*jobdata_callback)(void *))
|
||||
{
|
||||
int r = 0;
|
||||
pth_queue_t *queue = malloc(sizeof(pth_queue_t));
|
||||
pthread_t *poll = malloc(sizeof(pthread_t)*MAX_POLL_VALUE);
|
||||
idx_allocator_t *idx = malloc(sizeof(idx_allocator_t));
|
||||
struct __pthrd_data *ndata = malloc(sizeof(struct __pthrd_data));
|
||||
|
||||
/* check it for allocation */
|
||||
if(!ndata) goto __enomem;
|
||||
if(!idx) goto __enomem;
|
||||
if(!queue) goto __enomem;
|
||||
if(!poll) goto __enomem;
|
||||
|
||||
/* init all the stuff */
|
||||
if(idx_allocator_init(idx, MAX_POLL_VALUE*16, 0)) {
|
||||
__enomem:
|
||||
r = ENOMEM;
|
||||
goto __finish;
|
||||
}
|
||||
if(pth_queue_init(queue)) goto __enomem; /* init queue */
|
||||
if(pthread_rwlock_init(&(tpoll->stats_lock), NULL)) goto __enomem;
|
||||
|
||||
/* set parameters */
|
||||
memset(poll, 0, sizeof(pthread_t)*MAX_POLL_VALUE);
|
||||
tpoll->flags = 0;
|
||||
tpoll->idx = idx;
|
||||
tpoll->poll = poll;
|
||||
tpoll->queue = queue;
|
||||
tpoll->poll_value = 2;
|
||||
tpoll->spurious_wakeups = 0;
|
||||
tpoll->msgop = 0;
|
||||
tpoll->jobdata_callback = jobdata_callback;
|
||||
|
||||
/* first thread initiation */
|
||||
idx_reserve(idx, 0);
|
||||
ndata->myid = 0;
|
||||
ndata->pthrd = tpoll;
|
||||
if(pthread_create(&(poll[0]), NULL, __poll_thread, ndata)) {
|
||||
__eadd:
|
||||
pthread_rwlock_destroy(&(tpoll->stats_lock));
|
||||
goto __enomem;
|
||||
}
|
||||
/* second thread initiation */
|
||||
ndata = malloc(sizeof(struct __pthrd_data));
|
||||
if(!ndata) goto __eadd;
|
||||
idx_reserve(idx, 1);
|
||||
ndata->myid = 1;
|
||||
ndata->pthrd = tpoll;
|
||||
if(pthread_create(&(poll[1]), NULL, __poll_thread, ndata)) {
|
||||
pthread_rwlock_destroy(&(tpoll->stats_lock));
|
||||
goto __enomem;
|
||||
}
|
||||
|
||||
gettimeofday(&(tpoll->sched_time), NULL);
|
||||
|
||||
__finish:
|
||||
if(r) {
|
||||
if(ndata) free(ndata);
|
||||
if(idx) free(idx);
|
||||
if(queue) {
|
||||
pth_queue_destroy(queue, 0, NULL);
|
||||
free(queue);
|
||||
}
|
||||
if(poll) free(poll);
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
/* run poll: poll */
|
||||
int pth_dqtpoll_run(pth_dqtpoll_t *tpoll)
|
||||
{
|
||||
int r = 0;
|
||||
|
||||
pthread_rwlock_wrlock(&(tpoll->stats_lock));
|
||||
if((tpoll->flags & DQTPOLL_RUNNING) || (tpoll->flags & DQTPOLL_DEADSTAGE)) r = EINVAL;
|
||||
else {
|
||||
tpoll->flags |= DQTPOLL_RUNNING;
|
||||
}
|
||||
pthread_rwlock_unlock(&(tpoll->stats_lock));
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
/* add the job to the queue: poll, job data, message type */
|
||||
int pth_dqtpoll_add(pth_dqtpoll_t *tpoll, void *job, unsigned int type)
|
||||
{
|
||||
int r = 0;
|
||||
|
||||
r = pth_queue_add(tpoll->queue, job, type);
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
/* destroy the poll: poll, force flag
|
||||
* if force flag is set (!= 0), give up
|
||||
* about jobs, if no, do the job, but don't
|
||||
* accept the new ones, and destroy all poll
|
||||
* with last thread.
|
||||
*/
|
||||
int pth_dqtpoll_destroy(pth_dqtpoll_t *tpoll, int force)
|
||||
{
|
||||
int r = 0;
|
||||
pth_msg_t tmpmsg;
|
||||
|
||||
pthread_rwlock_wrlock(&(tpoll->stats_lock));
|
||||
tpoll->flags |= DQTPOLL_DEADSTAGE;
|
||||
pthread_rwlock_unlock(&(tpoll->stats_lock));
|
||||
|
||||
/* now we need to wait */
|
||||
while(1) {
|
||||
pthread_rwlock_rdlock(&(tpoll->stats_lock));
|
||||
if(!tpoll->poll_value) {
|
||||
pthread_rwlock_unlock(&(tpoll->stats_lock));
|
||||
break;
|
||||
} else {
|
||||
pthread_rwlock_unlock(&(tpoll->stats_lock));
|
||||
pth_queue_add(tpoll->queue, &tmpmsg, 0); /* spurious */
|
||||
}
|
||||
usleep(100); /* just to sleep and free timeslice to others */
|
||||
}
|
||||
|
||||
/* free all */
|
||||
pth_queue_destroy(tpoll->queue, 0, NULL);
|
||||
idx_allocator_destroy(tpoll->idx);
|
||||
pthread_rwlock_destroy(&(tpoll->stats_lock));
|
||||
|
||||
free(tpoll->poll);
|
||||
free(tpoll->queue);
|
||||
free(tpoll->idx);
|
||||
|
||||
return r;
|
||||
}
|
||||
|
Loading…
Reference in New Issue