You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
396 lines
9.7 KiB
C
396 lines
9.7 KiB
C
/*
|
|
* This is a proprietary software. See COPYING for further details.
|
|
*
|
|
* (c) 2013 Copyright Askele, inc. <http://askele.com>
|
|
* (c) 2013 Copyright Askele Ingria, inc. <http://askele-ingria.com>
|
|
* (c) 2014 Copyright Confident, inc. (granted permission to use in commercial software)
|
|
*/
|
|
|
|
#include <stdlib.h>
|
|
#include <stdio.h>
|
|
#include <sys/types.h>
|
|
#include <sys/stat.h>
|
|
#include <sys/time.h>
|
|
#include <fcntl.h>
|
|
#include <errno.h>
|
|
#include <unistd.h>
|
|
#include <string.h>
|
|
#include <pthread.h>
|
|
|
|
#include <tdata/usrtc.h>
|
|
|
|
#include <sntl/pth_queue.h>
|
|
|
|
#define MAX_QUEUE_SIZE 4096
|
|
#define MAX_QUEUE_POOL 256
|
|
|
|
static long __cmp_uint(const void *a, const void *b)
|
|
{
|
|
return (long)(*(unsigned int *)a - *(unsigned int *)b);
|
|
}
|
|
|
|
static inline pth_msg_t *__get_newmsg(pth_queue_t *queue)
|
|
{
|
|
usrtc_t *tree = &queue->msgcache;
|
|
usrtc_node_t *node;
|
|
pth_msg_t *tmp;
|
|
|
|
if(usrtc_count(tree)) {
|
|
node = usrtc_first(tree);
|
|
tmp = (pth_msg_t *)usrtc_node_getdata(node);
|
|
usrtc_delete(tree, node);
|
|
} else {
|
|
tmp = malloc(sizeof(pth_msg_t));
|
|
tree = &queue->qtree;
|
|
node = &tmp->node;
|
|
usrtc_node_init(node, tmp);
|
|
}
|
|
/* insert it */
|
|
tree = &queue->qtree;
|
|
tmp->qlength = usrtc_count(tree);
|
|
usrtc_insert(tree, node, (void *)(&tmp->qlength));
|
|
|
|
return tmp;
|
|
}
|
|
|
|
static inline void __release_msg(pth_queue_t *queue, pth_msg_t *msg)
|
|
{
|
|
usrtc_node_t *node = &msg->node;
|
|
usrtc_t *tree = &queue->qtree;
|
|
|
|
tree = &queue->qtree; /* remove from queue */
|
|
usrtc_delete(tree, node);
|
|
|
|
tree = &queue->msgcache;
|
|
|
|
if(usrtc_count(tree) >= MAX_QUEUE_POOL)
|
|
free(msg);
|
|
else {
|
|
msg->data = NULL;
|
|
msg->msgtype = NIL_MSG;
|
|
usrtc_insert(tree, node, (void *)&msg->qlength);
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
int pth_queue_init(pth_queue_t *queue)
|
|
{
|
|
int r = 0;
|
|
|
|
memset(queue, 0, sizeof(pth_queue_t));
|
|
if((r = pthread_cond_init(&queue->cond, NULL)))
|
|
return r;
|
|
|
|
if((r = pthread_mutex_init(&queue->mutex, NULL))) {
|
|
pthread_cond_destroy(&queue->cond);
|
|
return r;
|
|
}
|
|
|
|
usrtc_init(&queue->qtree, USRTC_AVL, MAX_QUEUE_SIZE, __cmp_uint);
|
|
usrtc_init(&queue->msgcache, USRTC_AVL, MAX_QUEUE_POOL, __cmp_uint);
|
|
|
|
return r;
|
|
}
|
|
|
|
int pth_queue_add(pth_queue_t *queue, void *data, unsigned int msgtype)
|
|
{
|
|
pth_msg_t *newmsg;
|
|
|
|
pthread_mutex_lock(&queue->mutex);
|
|
newmsg = __get_newmsg(queue);
|
|
if (newmsg == NULL) {
|
|
pthread_mutex_unlock(&queue->mutex);
|
|
return ENOMEM;
|
|
}
|
|
|
|
newmsg->data = data;
|
|
newmsg->msgtype = msgtype;
|
|
|
|
if(queue->length == 0)
|
|
pthread_cond_broadcast(&queue->cond);
|
|
queue->length++;
|
|
pthread_mutex_unlock(&queue->mutex);
|
|
|
|
return 0;
|
|
}
|
|
|
|
int pth_queue_get(pth_queue_t *queue, const struct timespec *timeout, pth_msg_t *msg)
|
|
{
|
|
usrtc_t *tree;
|
|
usrtc_node_t *node = NULL;
|
|
pth_msg_t *tmp;
|
|
int r = 0;
|
|
struct timespec abstimeout;
|
|
|
|
if (queue == NULL || msg == NULL)
|
|
return EINVAL;
|
|
else
|
|
tree = &queue->qtree;
|
|
|
|
if (timeout) { /* setup timeout */
|
|
struct timeval now;
|
|
|
|
gettimeofday(&now, NULL);
|
|
abstimeout.tv_sec = now.tv_sec + timeout->tv_sec;
|
|
abstimeout.tv_nsec = (now.tv_usec * 1000) + timeout->tv_nsec;
|
|
if (abstimeout.tv_nsec >= 1000000000) {
|
|
abstimeout.tv_sec++;
|
|
abstimeout.tv_nsec -= 1000000000;
|
|
}
|
|
}
|
|
|
|
pthread_mutex_lock(&queue->mutex);
|
|
|
|
/* Will wait until awakened by a signal or broadcast */
|
|
while ((node = usrtc_first(tree)) == NULL && r != ETIMEDOUT) { /* Need to loop to handle spurious wakeups */
|
|
if (timeout)
|
|
r = pthread_cond_timedwait(&queue->cond, &queue->mutex, &abstimeout);
|
|
else
|
|
pthread_cond_wait(&queue->cond, &queue->mutex);
|
|
}
|
|
if (r == ETIMEDOUT) {
|
|
pthread_mutex_unlock(&queue->mutex);
|
|
return r;
|
|
}
|
|
|
|
tmp = (pth_msg_t *)usrtc_node_getdata(node);
|
|
queue->length--;
|
|
|
|
msg->data = tmp->data;
|
|
msg->msgtype = tmp->msgtype;
|
|
msg->qlength = tmp->qlength; /* we will hold the msg id instead of size here */
|
|
|
|
__release_msg(queue, tmp);
|
|
pthread_mutex_unlock(&queue->mutex);
|
|
|
|
return 0;
|
|
}
|
|
|
|
int pth_queue_destroy(pth_queue_t *queue, int freedata, void (*free_msg)(void *))
|
|
{
|
|
int r = 0;
|
|
usrtc_t *tree = &queue->qtree;
|
|
usrtc_node_t *node = NULL;
|
|
pth_msg_t *msg;
|
|
|
|
if (queue == NULL) return EINVAL;
|
|
|
|
pthread_mutex_lock(&queue->mutex);
|
|
|
|
for (node = usrtc_first(tree); node != NULL; node = usrtc_first(tree)) {
|
|
usrtc_delete(tree, node);
|
|
msg = (pth_msg_t *)usrtc_node_getdata(node);
|
|
|
|
if(freedata) free(msg->data);
|
|
else if(free_msg) free_msg(msg->data);
|
|
|
|
free(msg);
|
|
}
|
|
/* free cache */
|
|
tree = &queue->msgcache;
|
|
for (node = usrtc_first(tree); node != NULL; node = usrtc_first(tree)) {
|
|
usrtc_delete(tree, node);
|
|
free(usrtc_node_getdata(node));
|
|
}
|
|
|
|
pthread_mutex_unlock(&queue->mutex);
|
|
r = pthread_mutex_destroy(&queue->mutex);
|
|
pthread_cond_destroy(&queue->cond);
|
|
|
|
return r;
|
|
}
|
|
|
|
unsigned int pth_queue_length(pth_queue_t *queue)
|
|
{
|
|
unsigned int c;
|
|
|
|
pthread_mutex_lock(&queue->mutex);
|
|
c = queue->length;
|
|
pthread_mutex_unlock(&queue->mutex);
|
|
|
|
return c;
|
|
}
|
|
|
|
/* dynamic queue thread poll */
|
|
|
|
struct __pthrd_data {
|
|
pth_dqtpoll_t *pthrd;
|
|
int myid;
|
|
};
|
|
|
|
static void *__poll_thread(void *poll)
|
|
{
|
|
int r = 0;
|
|
struct __pthrd_data *thrdata = (struct __pthrd_data *)poll;
|
|
struct __pthrd_data *npoll = NULL;
|
|
pth_msg_t msgbuf, sysbuf;
|
|
pth_dqtpoll_t *p = thrdata->pthrd;
|
|
pth_queue_t *q = p->queue;
|
|
ulong_t myid = thrdata->myid;
|
|
|
|
while(1) {
|
|
r = pth_queue_get(q, NULL, &msgbuf);
|
|
pthread_rwlock_wrlock(&(p->stats_lock));
|
|
if(r) {
|
|
p->sleep_value++;
|
|
pthread_rwlock_unlock(&(p->stats_lock));
|
|
continue;
|
|
} else p->sleep_value--;
|
|
pthread_rwlock_unlock(&(p->stats_lock));
|
|
|
|
switch(msgbuf.msgtype) {
|
|
case USR_MSG:
|
|
/* schedule new task for poll - increase or decrease */
|
|
pthread_rwlock_rdlock(&(p->stats_lock));
|
|
if(pth_queue_length(p->queue) > p->sleep_value) {
|
|
memset(&sysbuf, 0, sizeof(pth_msg_t));
|
|
pth_queue_add(p->queue, &sysbuf, POLL_INCREASE);
|
|
} else if((pth_queue_length(p->queue) > p->sleep_value) && (p->sleep_value > 1)) {
|
|
memset(&sysbuf, 0, sizeof(pth_msg_t));
|
|
pth_queue_add(p->queue, &sysbuf, POLL_DECREASE);
|
|
}
|
|
pthread_rwlock_unlock(&(p->stats_lock));
|
|
/* do the job */
|
|
p->jobdata_callback(msgbuf.data);
|
|
break;
|
|
case POLL_DECREASE:
|
|
pthread_rwlock_rdlock(&(p->stats_lock));
|
|
if(p->poll_value > 1) r = 1; /* exit now */
|
|
pthread_rwlock_unlock(&(p->stats_lock));
|
|
if(r) {
|
|
pthread_rwlock_wrlock(&(p->stats_lock));
|
|
idx_free(p->idx, myid);
|
|
p->poll_value--;
|
|
pthread_rwlock_unlock(&(p->stats_lock));
|
|
free(poll);
|
|
return NULL;
|
|
}
|
|
break;
|
|
case POLL_INCREASE:
|
|
if((npoll = malloc(sizeof(struct __pthrd_data)))) {
|
|
pthread_rwlock_rdlock(&(p->stats_lock));
|
|
if(p->poll_value < MAX_POLL_VALUE) r = 1;
|
|
pthread_rwlock_unlock(&(p->stats_lock));
|
|
if(r) {
|
|
pthread_rwlock_wrlock(&(p->stats_lock));
|
|
if(p->poll_value < MAX_POLL_VALUE) { /* check it again */
|
|
npoll->myid = idx_allocate(p->idx);
|
|
npoll->pthrd = p;
|
|
p->poll_value++;
|
|
/* create thread here */
|
|
if(pthread_create(&(p->poll[npoll->myid]), NULL, __poll_thread, npoll)) {
|
|
idx_free(p->idx, npoll->myid);
|
|
p->poll_value--;
|
|
free(npoll);
|
|
}
|
|
}
|
|
pthread_rwlock_unlock(&(p->stats_lock));
|
|
}
|
|
}
|
|
break;
|
|
default:
|
|
/* TODO: do something ... */
|
|
break;
|
|
}
|
|
}
|
|
|
|
return NULL;
|
|
}
|
|
|
|
/* init poll, structure must be allocated */
|
|
int pth_dqtpoll_init(pth_dqtpoll_t *tpoll, int (*jobdata_callback)(void *))
|
|
{
|
|
int r = 0;
|
|
pth_queue_t *queue = malloc(sizeof(pth_queue_t));
|
|
pthread_t *poll = malloc(sizeof(pthread_t)*MAX_POLL_VALUE);
|
|
idx_allocator_t *idx = malloc(sizeof(idx_allocator_t));
|
|
struct __pthrd_data *ndata = malloc(sizeof(struct __pthrd_data));
|
|
|
|
/* check it for allocation */
|
|
if(!ndata) goto __enomem;
|
|
if(!idx) goto __enomem;
|
|
if(!queue) goto __enomem;
|
|
if(!poll) goto __enomem;
|
|
|
|
/* init all the stuff */
|
|
if(idx_allocator_init(idx, MAX_POLL_VALUE, 0)) {
|
|
__enomem:
|
|
r = ENOMEM;
|
|
goto __finish;
|
|
}
|
|
if(pth_queue_init(queue)) goto __enomem; /* init queue */
|
|
if(pthread_rwlock_init(&(tpoll->stats_lock), NULL)) goto __enomem;
|
|
|
|
/* set parameters */
|
|
memset(poll, 0, sizeof(pthread_t)*MAX_POLL_VALUE);
|
|
tpoll->flags = 0;
|
|
tpoll->idx = idx;
|
|
tpoll->poll = poll;
|
|
tpoll->queue = queue;
|
|
tpoll->poll_value = 1;
|
|
tpoll->sleep_value = 0;
|
|
tpoll->spurious_wakeups = 0;
|
|
tpoll->jobdata_callback = jobdata_callback;
|
|
|
|
/* first thread initiation */
|
|
idx_reserve(idx, 0);
|
|
ndata->myid = 0;
|
|
ndata->pthrd = tpoll;
|
|
if(pthread_create(&(poll[0]), NULL, __poll_thread, ndata)) {
|
|
pthread_rwlock_destroy(&(tpoll->stats_lock));
|
|
goto __enomem;
|
|
}
|
|
|
|
__finish:
|
|
if(r) {
|
|
if(ndata) free(ndata);
|
|
if(idx) free(idx);
|
|
if(queue) {
|
|
pth_queue_destroy(queue, 0, NULL);
|
|
free(queue);
|
|
}
|
|
if(poll) free(poll);
|
|
}
|
|
return r;
|
|
}
|
|
|
|
/* run poll: poll */
|
|
int pth_dqtpoll_run(pth_dqtpoll_t *tpoll)
|
|
{
|
|
int r = 0;
|
|
|
|
pthread_rwlock_wrlock(&(tpoll->stats_lock));
|
|
if((tpoll->flags & DQTPOLL_RUNNING) || (tpoll->flags & DQTPOLL_DEADSTAGE)) r = EINVAL;
|
|
else {
|
|
tpoll->flags |= DQTPOLL_RUNNING;
|
|
tpoll->sleep_value = 1;
|
|
}
|
|
pthread_rwlock_unlock(&(tpoll->stats_lock));
|
|
|
|
return r;
|
|
}
|
|
|
|
/* add the job to the queue: poll, job data, message type */
|
|
int pth_dqtpoll_add(pth_dqtpoll_t *tpoll, void *job, unsigned int type)
|
|
{
|
|
int r = 0;
|
|
|
|
r = pth_queue_add(tpoll->queue, job, type);
|
|
|
|
return r;
|
|
}
|
|
|
|
/* destroy the poll: poll, force flag
|
|
* if force flag is set (!= 0), give up
|
|
* about jobs, if no, do the job, but don't
|
|
* accept the new ones, and destroy all poll
|
|
* with last thread.
|
|
*/
|
|
int pth_dqtpoll_destroy(pth_dqtpoll_t *tpoll, int force)
|
|
{
|
|
int r = 0;
|
|
return r;
|
|
}
|
|
|