/* * This is a proprietary software. See COPYING for further details. * * (c) 2013 Copyright Askele, inc. * (c) 2013 Copyright Askele Ingria, inc. * (c) 2014 Copyright Confident, inc. (granted permission to use in commercial software) */ #include #include #include #include #include #include #include #include #include #include #include #include #define MAX_QUEUE_SIZE 4096 #define MAX_QUEUE_POOL 256 static long __cmp_uint(const void *a, const void *b) { return (long)(*(unsigned int *)a - *(unsigned int *)b); } static inline pth_msg_t *__get_newmsg(pth_queue_t *queue) { usrtc_t *tree = &queue->msgcache; usrtc_node_t *node; pth_msg_t *tmp; if(usrtc_count(tree)) { node = usrtc_first(tree); tmp = (pth_msg_t *)usrtc_node_getdata(node); usrtc_delete(tree, node); } else { tmp = malloc(sizeof(pth_msg_t)); tree = &queue->qtree; node = &tmp->node; usrtc_node_init(node, tmp); } /* insert it */ tree = &queue->qtree; tmp->qlength = usrtc_count(tree); usrtc_insert(tree, node, (void *)(&tmp->qlength)); return tmp; } static inline void __release_msg(pth_queue_t *queue, pth_msg_t *msg) { usrtc_node_t *node = &msg->node; usrtc_t *tree = &queue->qtree; tree = &queue->qtree; /* remove from queue */ usrtc_delete(tree, node); tree = &queue->msgcache; if(usrtc_count(tree) >= MAX_QUEUE_POOL) free(msg); else { msg->data = NULL; msg->msgtype = NIL_MSG; usrtc_insert(tree, node, (void *)&msg->qlength); } return; } int pth_queue_init(pth_queue_t *queue) { int r = 0; memset(queue, 0, sizeof(pth_queue_t)); if((r = pthread_cond_init(&queue->cond, NULL))) return r; if((r = pthread_mutex_init(&queue->mutex, NULL))) { pthread_cond_destroy(&queue->cond); return r; } usrtc_init(&queue->qtree, USRTC_AVL, MAX_QUEUE_SIZE, __cmp_uint); usrtc_init(&queue->msgcache, USRTC_AVL, MAX_QUEUE_POOL, __cmp_uint); return r; } int pth_queue_add(pth_queue_t *queue, void *data, unsigned int msgtype) { pth_msg_t *newmsg; pthread_mutex_lock(&queue->mutex); newmsg = __get_newmsg(queue); if (newmsg == NULL) { pthread_mutex_unlock(&queue->mutex); return ENOMEM; } newmsg->data = data; newmsg->msgtype = msgtype; if(queue->length == 0) pthread_cond_broadcast(&queue->cond); queue->length++; pthread_mutex_unlock(&queue->mutex); return 0; } int pth_queue_get(pth_queue_t *queue, const struct timespec *timeout, pth_msg_t *msg) { usrtc_t *tree; usrtc_node_t *node = NULL; pth_msg_t *tmp; int r = 0; struct timespec abstimeout; if (queue == NULL || msg == NULL) return EINVAL; else tree = &queue->qtree; if (timeout) { /* setup timeout */ struct timeval now; gettimeofday(&now, NULL); abstimeout.tv_sec = now.tv_sec + timeout->tv_sec; abstimeout.tv_nsec = (now.tv_usec * 1000) + timeout->tv_nsec; if (abstimeout.tv_nsec >= 1000000000) { abstimeout.tv_sec++; abstimeout.tv_nsec -= 1000000000; } } pthread_mutex_lock(&queue->mutex); /* Will wait until awakened by a signal or broadcast */ while ((node = usrtc_first(tree)) == NULL && r != ETIMEDOUT) { /* Need to loop to handle spurious wakeups */ if (timeout) r = pthread_cond_timedwait(&queue->cond, &queue->mutex, &abstimeout); else pthread_cond_wait(&queue->cond, &queue->mutex); } if (r == ETIMEDOUT) { pthread_mutex_unlock(&queue->mutex); return r; } tmp = (pth_msg_t *)usrtc_node_getdata(node); queue->length--; msg->data = tmp->data; msg->msgtype = tmp->msgtype; msg->qlength = tmp->qlength; /* we will hold the msg id instead of size here */ __release_msg(queue, tmp); pthread_mutex_unlock(&queue->mutex); return 0; } int pth_queue_destroy(pth_queue_t *queue, int freedata, void (*free_msg)(void *)) { int r = 0; usrtc_t *tree = &queue->qtree; usrtc_node_t *node = NULL; pth_msg_t *msg; if (queue == NULL) return EINVAL; pthread_mutex_lock(&queue->mutex); for (node = usrtc_first(tree); node != NULL; node = usrtc_first(tree)) { usrtc_delete(tree, node); msg = (pth_msg_t *)usrtc_node_getdata(node); if(freedata) free(msg->data); else if(free_msg) free_msg(msg->data); free(msg); } /* free cache */ tree = &queue->msgcache; for (node = usrtc_first(tree); node != NULL; node = usrtc_first(tree)) { usrtc_delete(tree, node); free(usrtc_node_getdata(node)); } pthread_mutex_unlock(&queue->mutex); r = pthread_mutex_destroy(&queue->mutex); pthread_cond_destroy(&queue->cond); return r; } unsigned int pth_queue_length(pth_queue_t *queue) { unsigned int c; pthread_mutex_lock(&queue->mutex); c = queue->length; pthread_mutex_unlock(&queue->mutex); return c; }