/* * This is a proprietary software. See COPYING for further details. * * (c) 2013 Copyright Askele, inc. * (c) 2013 Copyright Askele Ingria, inc. * (c) 2014 Copyright Confident, inc. (granted permission to use in commercial software) */ #include #include #include #include #include #include #include #include #include #include #include #include #define MAX_QUEUE_SIZE 4096 #define MAX_QUEUE_POOL 256 static long __cmp_uint(const void *a, const void *b) { return (long)(*(unsigned int *)a - *(unsigned int *)b); } static inline pth_msg_t *__get_newmsg(pth_queue_t *queue) { usrtc_t *tree = &queue->msgcache; usrtc_node_t *node; pth_msg_t *tmp; if(usrtc_count(tree)) { node = usrtc_first(tree); tmp = (pth_msg_t *)usrtc_node_getdata(node); usrtc_delete(tree, node); } else { tmp = malloc(sizeof(pth_msg_t)); tree = &queue->qtree; node = &tmp->node; usrtc_node_init(node, tmp); } /* insert it */ tree = &queue->qtree; tmp->qlength = usrtc_count(tree); usrtc_insert(tree, node, (void *)(&tmp->qlength)); return tmp; } static inline void __release_msg(pth_queue_t *queue, pth_msg_t *msg) { usrtc_node_t *node = &msg->node; usrtc_t *tree = &queue->qtree; tree = &queue->qtree; /* remove from queue */ usrtc_delete(tree, node); tree = &queue->msgcache; if(usrtc_count(tree) >= MAX_QUEUE_POOL) free(msg); else { msg->data = NULL; msg->msgtype = NIL_MSG; usrtc_insert(tree, node, (void *)&msg->qlength); } return; } int pth_queue_init(pth_queue_t *queue) { int r = 0; memset(queue, 0, sizeof(pth_queue_t)); if((r = pthread_cond_init(&queue->cond, NULL))) return r; if((r = pthread_mutex_init(&queue->mutex, NULL))) { pthread_cond_destroy(&queue->cond); return r; } usrtc_init(&queue->qtree, USRTC_AVL, MAX_QUEUE_SIZE, __cmp_uint); usrtc_init(&queue->msgcache, USRTC_AVL, MAX_QUEUE_POOL, __cmp_uint); return r; } int pth_queue_add(pth_queue_t *queue, void *data, unsigned int msgtype) { pth_msg_t *newmsg; pthread_mutex_lock(&queue->mutex); newmsg = __get_newmsg(queue); if (newmsg == NULL) { pthread_mutex_unlock(&queue->mutex); return ENOMEM; } newmsg->data = data; newmsg->msgtype = msgtype; if(queue->length == 0) pthread_cond_broadcast(&queue->cond); queue->length++; pthread_mutex_unlock(&queue->mutex); return 0; } int pth_queue_get(pth_queue_t *queue, const struct timespec *timeout, pth_msg_t *msg) { usrtc_t *tree; usrtc_node_t *node = NULL; pth_msg_t *tmp; int r = 0; struct timespec abstimeout; if (queue == NULL || msg == NULL) return EINVAL; else tree = &queue->qtree; if (timeout) { /* setup timeout */ struct timeval now; gettimeofday(&now, NULL); abstimeout.tv_sec = now.tv_sec + timeout->tv_sec; abstimeout.tv_nsec = (now.tv_usec * 1000) + timeout->tv_nsec; if (abstimeout.tv_nsec >= 1000000000) { abstimeout.tv_sec++; abstimeout.tv_nsec -= 1000000000; } } pthread_mutex_lock(&queue->mutex); /* Will wait until awakened by a signal or broadcast */ while ((node = usrtc_first(tree)) == NULL && r != ETIMEDOUT) { /* Need to loop to handle spurious wakeups */ if (timeout) r = pthread_cond_timedwait(&queue->cond, &queue->mutex, &abstimeout); else pthread_cond_wait(&queue->cond, &queue->mutex); } if (r == ETIMEDOUT) { pthread_mutex_unlock(&queue->mutex); return r; } tmp = (pth_msg_t *)usrtc_node_getdata(node); queue->length--; msg->data = tmp->data; msg->msgtype = tmp->msgtype; msg->qlength = tmp->qlength; /* we will hold the msg id instead of size here */ __release_msg(queue, tmp); pthread_mutex_unlock(&queue->mutex); return 0; } int pth_queue_destroy(pth_queue_t *queue, int freedata, void (*free_msg)(void *)) { int r = 0; usrtc_t *tree = &queue->qtree; usrtc_node_t *node = NULL; pth_msg_t *msg; if (queue == NULL) return EINVAL; pthread_mutex_lock(&queue->mutex); for (node = usrtc_first(tree); node != NULL; node = usrtc_first(tree)) { usrtc_delete(tree, node); msg = (pth_msg_t *)usrtc_node_getdata(node); if(freedata) free(msg->data); else if(free_msg) free_msg(msg->data); free(msg); } /* free cache */ tree = &queue->msgcache; for (node = usrtc_first(tree); node != NULL; node = usrtc_first(tree)) { usrtc_delete(tree, node); free(usrtc_node_getdata(node)); } pthread_mutex_unlock(&queue->mutex); r = pthread_mutex_destroy(&queue->mutex); pthread_cond_destroy(&queue->cond); return r; } unsigned int pth_queue_length(pth_queue_t *queue) { unsigned int c; pthread_mutex_lock(&queue->mutex); c = queue->length; pthread_mutex_unlock(&queue->mutex); return c; } /* dynamic queue thread poll */ static void *__poll_thread(void *poll) { int r = 0; pth_msg_t msgbuf; pth_dqtpoll_t *p = (pth_dqtpoll_t *)poll; pth_queue_t *q = p->queue; while(1) { r = pth_queue_get(q, NULL, &msgbuf); if(r) continue; switch(msgbuf.msgtype) { case USR_MSG: p->jobdata_callback(msgbuf.data); break; default: /* TODO: do something ... */ break; } } return NULL; } /* init poll, structure must be allocated */ int pth_dqtpoll_init(pth_dqtpoll_t *tpoll, int (*jobdata_callback)(void *)) { int r = 0; pth_queue_t *queue = malloc(sizeof(pth_queue_t)); pthread_t *poll = malloc(sizeof(pthread_t)*MAX_POLL_VALUE); idx_allocator_t *idx = malloc(sizeof(idx_allocator_t)); /* check it for allocation */ if(!idx) goto __enomem; if(!queue) goto __enomem; if(!poll) goto __enomem; /* init all the stuff */ if(idx_allocator_init(idx, MAX_POLL_VALUE, 0)) { __enomem: r = ENOMEM; goto __finish; } if(pth_queue_init(queue)) goto __enomem; /* init queue */ if(pthread_rwlock_init(&(tpoll->stats_lock), NULL)) goto __enomem; /* set parameters */ memset(poll, 0, sizeof(pthread_t)*MAX_POLL_VALUE); tpoll->flags = 0; tpoll->idx = idx; tpoll->poll = poll; tpoll->queue = queue; tpoll->poll_value = 1; tpoll->sleep_value = 0; tpoll->spurious_wakeups = 0; tpoll->jobdata_callback = jobdata_callback; /* first thread initiation */ idx_reserve(idx, 0); if(pthread_create(&(poll[0]), NULL, __poll_thread, tpoll)) { pthread_rwlock_destroy(&(tpoll->stats_lock)); goto __enomem; } __finish: if(r) { if(idx) free(idx); if(queue) { pth_queue_destroy(queue, 0, NULL); free(queue); } if(poll) free(poll); } return r; } /* run poll: poll */ int pth_dqtpoll_run(pth_dqtpoll_t *tpoll) { int r = 0; pthread_rwlock_wrlock(&(tpoll->stats_lock)); if((tpoll->flags & DQTPOLL_RUNNING) || (tpoll->flags & DQTPOLL_DEADSTAGE)) r = EINVAL; else { tpoll->flags |= DQTPOLL_RUNNING; tpoll->sleep_value = 1; } pthread_rwlock_unlock(&(tpoll->stats_lock)); return r; } /* add the job to the queue: poll, job data, message type */ int pth_dqtpoll_add(pth_dqtpoll_t *tpoll, void *job, unsigned int type) { int r = 0; r = pth_queue_add(tpoll->queue, job, type); return r; } /* destroy the poll: poll, force flag * if force flag is set (!= 0), give up * about jobs, if no, do the job, but don't * accept the new ones, and destroy all poll * with last thread. */ int pth_dqtpoll_destroy(pth_dqtpoll_t *tpoll, int force) { int r = 0; return r; }