You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
libsxmp/lib/queue.c

214 lines
4.9 KiB
C

/*
* This is a proprietary software. See COPYING for further details.
*
* (c) 2013 Copyright Askele, inc. <http://askele.com>
* (c) 2013 Copyright Askele Ingria, inc. <http://askele-ingria.com>
* (c) 2014 Copyright Confident, inc. (granted permission to use in commercial software)
*/
#include <stdlib.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <fcntl.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <tdata/usrtc.h>
#include <sntl/pth_queue.h>
#define MAX_QUEUE_SIZE 4096
#define MAX_QUEUE_POOL 256
static long __cmp_uint(const void *a, const void *b)
{
return (long)(*(unsigned int *)a - *(unsigned int *)b);
}
static inline pth_msg_t *__get_newmsg(pth_queue_t *queue)
{
usrtc_t *tree = &queue->msgcache;
usrtc_node_t *node;
pth_msg_t *tmp;
if(usrtc_count(tree)) {
node = usrtc_first(tree);
tmp = (pth_msg_t *)usrtc_node_getdata(node);
usrtc_delete(tree, node);
} else {
tmp = malloc(sizeof(pth_msg_t));
tree = &queue->qtree;
node = &tmp->node;
usrtc_node_init(node, tmp);
}
/* insert it */
tree = &queue->qtree;
tmp->qlength = usrtc_count(tree);
usrtc_insert(tree, node, (void *)(&tmp->qlength));
return tmp;
}
static inline void __release_msg(pth_queue_t *queue, pth_msg_t *msg)
{
usrtc_node_t *node = &msg->node;
usrtc_t *tree = &queue->qtree;
tree = &queue->qtree; /* remove from queue */
usrtc_delete(tree, node);
tree = &queue->msgcache;
if(usrtc_count(tree) >= MAX_QUEUE_POOL)
free(msg);
else {
msg->data = NULL;
msg->msgtype = NIL_MSG;
usrtc_insert(tree, node, (void *)&msg->qlength);
}
return;
}
int pth_queue_init(pth_queue_t *queue)
{
int r = 0;
memset(queue, 0, sizeof(pth_queue_t));
if((r = pthread_cond_init(&queue->cond, NULL)))
return r;
if((r = pthread_mutex_init(&queue->mutex, NULL))) {
pthread_cond_destroy(&queue->cond);
return r;
}
usrtc_init(&queue->qtree, USRTC_AVL, MAX_QUEUE_SIZE, __cmp_uint);
usrtc_init(&queue->msgcache, USRTC_AVL, MAX_QUEUE_POOL, __cmp_uint);
return r;
}
int pth_queue_add(pth_queue_t *queue, void *data, unsigned int msgtype)
{
pth_msg_t *newmsg;
pthread_mutex_lock(&queue->mutex);
newmsg = __get_newmsg(queue);
if (newmsg == NULL) {
pthread_mutex_unlock(&queue->mutex);
return ENOMEM;
}
newmsg->data = data;
newmsg->msgtype = msgtype;
if(queue->length == 0)
pthread_cond_broadcast(&queue->cond);
queue->length++;
pthread_mutex_unlock(&queue->mutex);
return 0;
}
int pth_queue_get(pth_queue_t *queue, const struct timespec *timeout, pth_msg_t *msg)
{
usrtc_t *tree;
usrtc_node_t *node = NULL;
pth_msg_t *tmp;
int r = 0;
struct timespec abstimeout;
if (queue == NULL || msg == NULL)
return EINVAL;
else
tree = &queue->qtree;
if (timeout) { /* setup timeout */
struct timeval now;
gettimeofday(&now, NULL);
abstimeout.tv_sec = now.tv_sec + timeout->tv_sec;
abstimeout.tv_nsec = (now.tv_usec * 1000) + timeout->tv_nsec;
if (abstimeout.tv_nsec >= 1000000000) {
abstimeout.tv_sec++;
abstimeout.tv_nsec -= 1000000000;
}
}
pthread_mutex_lock(&queue->mutex);
/* Will wait until awakened by a signal or broadcast */
while ((node = usrtc_first(tree)) == NULL && r != ETIMEDOUT) { /* Need to loop to handle spurious wakeups */
if (timeout)
r = pthread_cond_timedwait(&queue->cond, &queue->mutex, &abstimeout);
else
pthread_cond_wait(&queue->cond, &queue->mutex);
}
if (r == ETIMEDOUT) {
pthread_mutex_unlock(&queue->mutex);
return r;
}
tmp = (pth_msg_t *)usrtc_node_getdata(node);
queue->length--;
msg->data = tmp->data;
msg->msgtype = tmp->msgtype;
msg->qlength = tmp->qlength; /* we will hold the msg id instead of size here */
__release_msg(queue, tmp);
pthread_mutex_unlock(&queue->mutex);
return 0;
}
int pth_queue_destroy(pth_queue_t *queue, int freedata, void (*free_msg)(void *))
{
int r = 0;
usrtc_t *tree = &queue->qtree;
usrtc_node_t *node = NULL;
pth_msg_t *msg;
if (queue == NULL) return EINVAL;
pthread_mutex_lock(&queue->mutex);
for (node = usrtc_first(tree); node != NULL; node = usrtc_first(tree)) {
usrtc_delete(tree, node);
msg = (pth_msg_t *)usrtc_node_getdata(node);
if(freedata) free(msg->data);
else if(free_msg) free_msg(msg->data);
free(msg);
}
/* free cache */
tree = &queue->msgcache;
for (node = usrtc_first(tree); node != NULL; node = usrtc_first(tree)) {
usrtc_delete(tree, node);
free(usrtc_node_getdata(node));
}
pthread_mutex_unlock(&queue->mutex);
r = pthread_mutex_destroy(&queue->mutex);
pthread_cond_destroy(&queue->cond);
return r;
}
unsigned int pth_queue_length(pth_queue_t *queue)
{
unsigned int c;
pthread_mutex_lock(&queue->mutex);
c = queue->length;
pthread_mutex_unlock(&queue->mutex);
return c;
}