added API for dynamic polls;

v0.5.xx
Alexander Vdolainen 10 years ago
parent 9f11941705
commit 58b3968eed

@ -9,7 +9,7 @@
/** /**
* @file pth_queue.h * @file pth_queue.h
* @author Alexander Vdolainen * @author Alexander Vdolainen
* @date 4 Nov 2013 * @date 4 Nov 2013, 20 Dec 2014 (dynamic polls)
* @brief queue implementation for threads intercommunication * @brief queue implementation for threads intercommunication
* *
*/ */
@ -19,9 +19,15 @@
#include <pthread.h> #include <pthread.h>
#define SYS_MSG 0x0f0affee /* possible message types, ones with POLL_ prefix valid on for pth_dqtpoll_* */
#define USR_MSG 0x0afeeffe #define SYS_MSG 0x0f0affee
#define NIL_MSG 0x0 #define USR_MSG 0x0afeeffe
#define POLL_DECREASE 0x0afafafe
#define POLL_INCREASE 0x0afaffff
#define NIL_MSG 0x0
/* max amount of threads within the poll */
#define MAX_POLL_VALUE 32
typedef struct pth_msg_s { typedef struct pth_msg_s {
void *data; /** < message payload */ void *data; /** < message payload */
@ -54,4 +60,50 @@ unsigned int pth_queue_length(pth_queue_t *queue);
int pth_queue_destroy(pth_queue_t *queue, int freedata, int pth_queue_destroy(pth_queue_t *queue, int freedata,
void (*free_msg)(void *)); void (*free_msg)(void *));
/* dynamic queue thread poll ... bbrrr .... ok, ok with beer
* Dynamic queue thread poll is a queue like pth_queue,
* but also it has itäs own mamagement for threads - that's
* why dynamic.
* Ideally, the model is trying to achieve the following:
* 1. one thread in queue while no or very small amount of jobs in the queue
* 2. grow until max threads is reached while too many requests
* 3. gently slide down volume of threads after job heat
* 4. minimal additional drawbacks (i hate something periodically running,
* it's bad practice)
* The model is quite simple, we should make spurious wakeups equal to zero,
* if no - decrease poll value, and, if we don't have thread available -
* create it.
*/
typedef struct pth_dqtpoll_s {
pth_queue_t *queue; /** < Job queue */
pthread_t *poll; /** < Thread descriptors */
int (*jobdata_callback)(void *); /** < Callback to have a deal with data */
int flags; /** < Flags */
pthread_rwlock_t stats_lock; /** < rwlock for stats data */
unsigned long spurious_wakeups; /** < amount of spurios wakeups */
int poll_value; /** < value of the poll (totally) */
int sleep_value; /** < value of the sleeping threads */
} pth_dqtpoll_t;
/* flags for poll */
#define DQTPOLL_RUNNING (1 << 1) /* poll is running */
#define DQTPOLL_DEADSTAGE (1 << 2) /* poll in the stage of destroy */
/* init poll, structure must be allocated */
int pth_dqtpoll_init(pth_dqtpoll_t*, int (*jobdata_callback)(void *));
/* run poll: poll */
int pth_dqtpoll_run(pth_dqtpoll_t*);
/* add the job to the queue: poll, job data, message type */
int pth_dqtpoll_add(pth_dqtpoll_t*, void*, unsigned int);
/* destroy the poll: poll, force flag
* if force flag is set (!= 0), give up
* about jobs, if no, do the job, but don't
* accept the new ones, and destroy all poll
* with last thread.
*/
int pth_dqtpoll_destroy(pth_dqtpoll_t*, int);
#endif /* __PTH_QUEUE_H__ */ #endif /* __PTH_QUEUE_H__ */

Loading…
Cancel
Save