/* * This is a proprietary software. See COPYING for further details. * * (c) 2013 Copyright Askele, inc. * (c) 2013 Copyright Askele Ingria, inc. * (c) 2014 Copyright Confident, inc. (granted permission to use in commercial software) */ #include #include #include #include #include #include #include /**/ #ifdef WIN32 #include #else #include #endif /**/ // #include #include #include #include #include #define MAX_QUEUE_SIZE 4096 #define MAX_QUEUE_POOL 256 static long __cmp_uint(const void *a, const void *b) { return (long)(*(unsigned int *)a - *(unsigned int *)b); } static inline pth_msg_t *__get_newmsg(pth_queue_t *queue) { usrtc_t *tree = &queue->msgcache; usrtc_node_t *node; pth_msg_t *tmp; if(usrtc_count(tree)) { node = usrtc_first(tree); tmp = (pth_msg_t *)usrtc_node_getdata(node); usrtc_delete(tree, node); } else { tmp = malloc(sizeof(pth_msg_t)); tree = &queue->qtree; node = &tmp->node; usrtc_node_init(node, tmp); } /* insert it */ tree = &queue->qtree; tmp->qlength = usrtc_count(tree); usrtc_insert(tree, node, (void *)(&tmp->qlength)); return tmp; } static inline void __release_msg(pth_queue_t *queue, pth_msg_t *msg) { usrtc_node_t *node = &msg->node; usrtc_t *tree = &queue->qtree; tree = &queue->qtree; /* remove from queue */ usrtc_delete(tree, node); tree = &queue->msgcache; if(usrtc_count(tree) >= MAX_QUEUE_POOL) free(msg); else { msg->data = NULL; msg->msgtype = NIL_MSG; usrtc_insert(tree, node, (void *)&msg->qlength); } return; } int pth_queue_init(pth_queue_t *queue) { int r = 0; memset(queue, 0, sizeof(pth_queue_t)); if((r = pthread_cond_init(&queue->cond, NULL))) return r; if((r = pthread_mutex_init(&queue->mutex, NULL))) { pthread_cond_destroy(&queue->cond); return r; } usrtc_init(&queue->qtree, USRTC_AVL, MAX_QUEUE_SIZE, __cmp_uint); usrtc_init(&queue->msgcache, USRTC_AVL, MAX_QUEUE_POOL, __cmp_uint); return r; } int pth_queue_add(pth_queue_t *queue, void *data, unsigned int msgtype) { pth_msg_t *newmsg; pthread_mutex_lock(&queue->mutex); newmsg = __get_newmsg(queue); if (newmsg == NULL) { pthread_mutex_unlock(&queue->mutex); return ENOMEM; } newmsg->data = data; newmsg->msgtype = msgtype; if(queue->length == 0) pthread_cond_broadcast(&queue->cond); queue->length++; pthread_mutex_unlock(&queue->mutex); return 0; } int pth_queue_get(pth_queue_t *queue, const struct timespec *timeout, pth_msg_t *msg) { usrtc_t *tree; usrtc_node_t *node = NULL; pth_msg_t *tmp; int r = 0; struct timespec abstimeout; if (queue == NULL || msg == NULL) return EINVAL; else tree = &queue->qtree; if (timeout) { /* setup timeout */ struct timeval now; gettimeofday(&now, NULL); abstimeout.tv_sec = now.tv_sec + timeout->tv_sec; abstimeout.tv_nsec = (now.tv_usec * 1000) + timeout->tv_nsec; if (abstimeout.tv_nsec >= 1000000000) { abstimeout.tv_sec++; abstimeout.tv_nsec -= 1000000000; } } pthread_mutex_lock(&queue->mutex); /* Will wait until awakened by a signal or broadcast */ while ((node = usrtc_first(tree)) == NULL && r != ETIMEDOUT) { /* Need to loop to handle spurious wakeups */ if (timeout) r = pthread_cond_timedwait(&queue->cond, &queue->mutex, &abstimeout); else pthread_cond_wait(&queue->cond, &queue->mutex); } if (r == ETIMEDOUT) { pthread_mutex_unlock(&queue->mutex); return r; } tmp = (pth_msg_t *)usrtc_node_getdata(node); queue->length--; msg->data = tmp->data; msg->msgtype = tmp->msgtype; msg->qlength = tmp->qlength; /* we will hold the msg id instead of size here */ __release_msg(queue, tmp); pthread_mutex_unlock(&queue->mutex); return 0; } int pth_queue_destroy(pth_queue_t *queue, int freedata, void (*free_msg)(void *)) { int r = 0; usrtc_t *tree = &queue->qtree; usrtc_node_t *node = NULL; pth_msg_t *msg; if (queue == NULL) return EINVAL; pthread_mutex_lock(&queue->mutex); for (node = usrtc_first(tree); node != NULL; node = usrtc_first(tree)) { usrtc_delete(tree, node); msg = (pth_msg_t *)usrtc_node_getdata(node); if(freedata) free(msg->data); else if(free_msg) free_msg(msg->data); free(msg); } /* free cache */ tree = &queue->msgcache; for (node = usrtc_first(tree); node != NULL; node = usrtc_first(tree)) { usrtc_delete(tree, node); free(usrtc_node_getdata(node)); } pthread_mutex_unlock(&queue->mutex); r = pthread_mutex_destroy(&queue->mutex); pthread_cond_destroy(&queue->cond); return r; } unsigned int pth_queue_length(pth_queue_t *queue) { unsigned int c; pthread_mutex_lock(&queue->mutex); c = queue->length; pthread_mutex_unlock(&queue->mutex); return c; } /* dynamic queue thread poll */ struct __pthrd_data { pth_dqtpoll_t *pthrd; int myid; }; static void *__poll_thread(void *poll) { int r = 0; struct __pthrd_data *thrdata = (struct __pthrd_data *)poll; struct __pthrd_data *npoll = NULL; pth_msg_t msgbuf, sysbuf; pth_dqtpoll_t *p = thrdata->pthrd; pth_queue_t *q = p->queue; ulong_t myid = thrdata->myid; struct timeval now; int resched = 0; long tusec, si, mdrate; while(1) { resched = 0; r = pth_queue_get(q, NULL, &msgbuf); pthread_rwlock_wrlock(&(p->stats_lock)); if(p->flags & DQTPOLL_DEADSTAGE) { /* poll going to be killed */ pthread_rwlock_unlock(&(p->stats_lock)); idx_free(p->idx, myid); p->poll_value--; return NULL; } /* now get the time */ gettimeofday(&now, NULL); if((now.tv_sec >= p->sched_time.tv_sec) && (now.tv_usec >= p->sched_time.tv_usec)) { resched = 1; /* set the new schedule time */ si = 0; tusec = DQTPOLL_DELTAMS + now.tv_usec; if(tusec > 1000000) { tusec -= 1000000; si++; } p->sched_time.tv_sec += si + DQTPOLL_DELTASE; p->sched_time.tv_usec = tusec; } if(resched) { /* ok now we need to resched and descrease/increase thread poll volume */ mdrate = ((DQTPOLL_DELTASE*1000000) + DQTPOLL_DELTAMS)/p->msgop; if((mdrate > p->poll_value) && (p->poll_value < MAX_POLL_VALUE)) { /* increase ! */ if((npoll = malloc(sizeof(struct __pthrd_data)))) { npoll->myid = idx_allocate(p->idx); npoll->pthrd = p; p->poll_value++; /* create thread here */ if(pthread_create(&(p->poll[npoll->myid]), NULL, __poll_thread, npoll)) { idx_free(p->idx, npoll->myid); p->poll_value--; free(npoll); } } } else if((p->poll_value > 2) && (mdrate < p->poll_value)) /* decrease */ { memset(&sysbuf, 0, sizeof(pth_msg_t)); pth_queue_add(p->queue, &sysbuf, POLL_DECREASE); } /* init all other stuff */ p->msgop = 0; } if(r) { pthread_rwlock_unlock(&(p->stats_lock)); continue; } else p->msgop++; pthread_rwlock_unlock(&(p->stats_lock)); switch(msgbuf.msgtype) { case USR_MSG: /* do the job */ p->jobdata_callback(msgbuf.data); break; case POLL_DECREASE: pthread_rwlock_rdlock(&(p->stats_lock)); if(p->poll_value > 2) r = 1; /* exit now */ pthread_rwlock_unlock(&(p->stats_lock)); if(r) { pthread_rwlock_wrlock(&(p->stats_lock)); idx_free(p->idx, myid); p->poll_value--; pthread_rwlock_unlock(&(p->stats_lock)); free(poll); return NULL; } break; default: /* TODO: do something ... */ break; } } return NULL; } /* init poll, structure must be allocated */ int pth_dqtpoll_init(pth_dqtpoll_t *tpoll, int (*jobdata_callback)(void *)) { int r = 0; pth_queue_t *queue = malloc(sizeof(pth_queue_t)); pthread_t *poll = malloc(sizeof(pthread_t)*MAX_POLL_VALUE); idx_allocator_t *idx = malloc(sizeof(idx_allocator_t)); struct __pthrd_data *ndata = malloc(sizeof(struct __pthrd_data)); /* check it for allocation */ if(!ndata) goto __enomem; if(!idx) goto __enomem; if(!queue) goto __enomem; if(!poll) goto __enomem; /* init all the stuff */ if(idx_allocator_init(idx, MAX_POLL_VALUE*16, 0)) { __enomem: r = ENOMEM; goto __finish; } if(pth_queue_init(queue)) goto __enomem; /* init queue */ if(pthread_rwlock_init(&(tpoll->stats_lock), NULL)) goto __enomem; /* set parameters */ memset(poll, 0, sizeof(pthread_t)*MAX_POLL_VALUE); tpoll->flags = 0; tpoll->idx = idx; tpoll->poll = poll; tpoll->queue = queue; tpoll->poll_value = 2; tpoll->spurious_wakeups = 0; tpoll->msgop = 0; tpoll->jobdata_callback = jobdata_callback; /* first thread initiation */ idx_reserve(idx, 0); ndata->myid = 0; ndata->pthrd = tpoll; if(pthread_create(&(poll[0]), NULL, __poll_thread, ndata)) { __eadd: pthread_rwlock_destroy(&(tpoll->stats_lock)); goto __enomem; } /* second thread initiation */ ndata = malloc(sizeof(struct __pthrd_data)); if(!ndata) goto __eadd; idx_reserve(idx, 1); ndata->myid = 1; ndata->pthrd = tpoll; if(pthread_create(&(poll[1]), NULL, __poll_thread, ndata)) { pthread_rwlock_destroy(&(tpoll->stats_lock)); goto __enomem; } gettimeofday(&(tpoll->sched_time), NULL); __finish: if(r) { if(ndata) free(ndata); if(idx) free(idx); if(queue) { pth_queue_destroy(queue, 0, NULL); free(queue); } if(poll) free(poll); } return r; } /* run poll: poll */ int pth_dqtpoll_run(pth_dqtpoll_t *tpoll) { int r = 0; pthread_rwlock_wrlock(&(tpoll->stats_lock)); if((tpoll->flags & DQTPOLL_RUNNING) || (tpoll->flags & DQTPOLL_DEADSTAGE)) r = EINVAL; else { tpoll->flags |= DQTPOLL_RUNNING; } pthread_rwlock_unlock(&(tpoll->stats_lock)); return r; } /* add the job to the queue: poll, job data, message type */ int pth_dqtpoll_add(pth_dqtpoll_t *tpoll, void *job, unsigned int type) { int r = 0; r = pth_queue_add(tpoll->queue, job, type); return r; } /* destroy the poll: poll, force flag * if force flag is set (!= 0), give up * about jobs, if no, do the job, but don't * accept the new ones, and destroy all poll * with last thread. */ int pth_dqtpoll_destroy(pth_dqtpoll_t *tpoll, int force) { int r = 0; pth_msg_t tmpmsg; pthread_rwlock_wrlock(&(tpoll->stats_lock)); tpoll->flags |= DQTPOLL_DEADSTAGE; pthread_rwlock_unlock(&(tpoll->stats_lock)); /* now we need to wait */ while(1) { pthread_rwlock_rdlock(&(tpoll->stats_lock)); if(!tpoll->poll_value) { pthread_rwlock_unlock(&(tpoll->stats_lock)); break; } else { pthread_rwlock_unlock(&(tpoll->stats_lock)); pth_queue_add(tpoll->queue, &tmpmsg, 0); /* spurious */ } usleep(100); /* just to sleep and free timeslice to others */ } /* free all */ pth_queue_destroy(tpoll->queue, 0, NULL); idx_allocator_destroy(tpoll->idx); pthread_rwlock_destroy(&(tpoll->stats_lock)); free(tpoll->poll); free(tpoll->queue); free(tpoll->idx); return r; }