diff --git a/include/sntl/pth_queue.h b/include/sntl/pth_queue.h index e186f7c..521e849 100644 --- a/include/sntl/pth_queue.h +++ b/include/sntl/pth_queue.h @@ -85,13 +85,18 @@ typedef struct pth_dqtpoll_s { pthread_rwlock_t stats_lock; /** < rwlock for stats data */ unsigned long spurious_wakeups; /** < amount of spurios wakeups */ int poll_value; /** < value of the poll (totally) */ - int sleep_value; /** < value of the sleeping threads */ + struct timeval sched_time; + int msgop; } pth_dqtpoll_t; /* flags for poll */ #define DQTPOLL_RUNNING (1 << 1) /* poll is running */ #define DQTPOLL_DEADSTAGE (1 << 2) /* poll in the stage of destroy */ +/* keep it stupid */ +#define DQTPOLL_DELTAMS 500000 +#define DQTPOLL_DELTASE 0 + /* init poll, structure must be allocated */ int pth_dqtpoll_init(pth_dqtpoll_t*, int (*jobdata_callback)(void *)); diff --git a/lib/queue.c b/lib/queue.c index 1085ef3..0762fb9 100644 --- a/lib/queue.c +++ b/lib/queue.c @@ -239,8 +239,12 @@ static void *__poll_thread(void *poll) pth_dqtpoll_t *p = thrdata->pthrd; pth_queue_t *q = p->queue; ulong_t myid = thrdata->myid; + struct timeval now; + int resched = 0; + long tusec, si, mdrate; while(1) { + resched = 0; r = pth_queue_get(q, NULL, &msgbuf); pthread_rwlock_wrlock(&(p->stats_lock)); if(p->flags & DQTPOLL_DEADSTAGE) { /* poll going to be killed */ @@ -250,25 +254,54 @@ static void *__poll_thread(void *poll) return NULL; } + + /* now get the time */ + gettimeofday(&now, NULL); + if((now.tv_sec >= p->sched_time.tv_sec) && + (now.tv_usec >= p->sched_time.tv_usec)) { + resched = 1; + /* set the new schedule time */ + si = 0; + tusec = DQTPOLL_DELTAMS + now.tv_usec; + if(tusec > 1000000) { + tusec -= 1000000; + si++; + } + p->sched_time.tv_sec += si + DQTPOLL_DELTASE; + p->sched_time.tv_usec = tusec; + } + + if(resched) { /* ok now we need to resched and descrease/increase thread poll volume */ + mdrate = ((DQTPOLL_DELTASE*1000000) + DQTPOLL_DELTAMS)/p->msgop; + if((mdrate > p->poll_value) && (p->poll_value < MAX_POLL_VALUE)) { /* increase ! */ + if((npoll = malloc(sizeof(struct __pthrd_data)))) { + npoll->myid = idx_allocate(p->idx); + npoll->pthrd = p; + p->poll_value++; + /* create thread here */ + if(pthread_create(&(p->poll[npoll->myid]), NULL, __poll_thread, npoll)) { + idx_free(p->idx, npoll->myid); + p->poll_value--; + free(npoll); + } + } + } else if((p->poll_value > 2) && (mdrate < p->poll_value)) /* decrease */ { + memset(&sysbuf, 0, sizeof(pth_msg_t)); + pth_queue_add(p->queue, &sysbuf, POLL_DECREASE); + } + + /* init all other stuff */ + p->msgop = 0; + } + if(r) { - p->sleep_value++; pthread_rwlock_unlock(&(p->stats_lock)); continue; - } else p->sleep_value--; + } else p->msgop++; pthread_rwlock_unlock(&(p->stats_lock)); switch(msgbuf.msgtype) { case USR_MSG: - /* schedule new task for poll - increase or decrease */ - pthread_rwlock_rdlock(&(p->stats_lock)); - if((pth_queue_length(p->queue) + 1 /* since we take a job */) >= p->sleep_value) { - memset(&sysbuf, 0, sizeof(pth_msg_t)); - pth_queue_add(p->queue, &sysbuf, POLL_INCREASE); - } else if((pth_queue_length(p->queue) > p->sleep_value) && (p->sleep_value > 1)) { - memset(&sysbuf, 0, sizeof(pth_msg_t)); - pth_queue_add(p->queue, &sysbuf, POLL_DECREASE); - } - pthread_rwlock_unlock(&(p->stats_lock)); /* do the job */ p->jobdata_callback(msgbuf.data); break; @@ -285,28 +318,6 @@ static void *__poll_thread(void *poll) return NULL; } break; - case POLL_INCREASE: - if((npoll = malloc(sizeof(struct __pthrd_data)))) { - pthread_rwlock_rdlock(&(p->stats_lock)); - if(p->poll_value < MAX_POLL_VALUE) r = 1; - pthread_rwlock_unlock(&(p->stats_lock)); - if(r) { - pthread_rwlock_wrlock(&(p->stats_lock)); - if(p->poll_value < MAX_POLL_VALUE) { /* check it again */ - npoll->myid = idx_allocate(p->idx); - npoll->pthrd = p; - p->poll_value++; - /* create thread here */ - if(pthread_create(&(p->poll[npoll->myid]), NULL, __poll_thread, npoll)) { - idx_free(p->idx, npoll->myid); - p->poll_value--; - free(npoll); - } - } - pthread_rwlock_unlock(&(p->stats_lock)); - } - } - break; default: /* TODO: do something ... */ break; @@ -332,7 +343,7 @@ int pth_dqtpoll_init(pth_dqtpoll_t *tpoll, int (*jobdata_callback)(void *)) if(!poll) goto __enomem; /* init all the stuff */ - if(idx_allocator_init(idx, MAX_POLL_VALUE, 0)) { + if(idx_allocator_init(idx, MAX_POLL_VALUE*16, 0)) { __enomem: r = ENOMEM; goto __finish; @@ -346,9 +357,9 @@ int pth_dqtpoll_init(pth_dqtpoll_t *tpoll, int (*jobdata_callback)(void *)) tpoll->idx = idx; tpoll->poll = poll; tpoll->queue = queue; - tpoll->poll_value = 1; - tpoll->sleep_value = 0; + tpoll->poll_value = 2; tpoll->spurious_wakeups = 0; + tpoll->msgop = 0; tpoll->jobdata_callback = jobdata_callback; /* first thread initiation */ @@ -371,6 +382,8 @@ int pth_dqtpoll_init(pth_dqtpoll_t *tpoll, int (*jobdata_callback)(void *)) goto __enomem; } + gettimeofday(&(tpoll->sched_time), NULL); + __finish: if(r) { if(ndata) free(ndata); @@ -393,7 +406,6 @@ int pth_dqtpoll_run(pth_dqtpoll_t *tpoll) if((tpoll->flags & DQTPOLL_RUNNING) || (tpoll->flags & DQTPOLL_DEADSTAGE)) r = EINVAL; else { tpoll->flags |= DQTPOLL_RUNNING; - tpoll->sleep_value = 1; } pthread_rwlock_unlock(&(tpoll->stats_lock));