From 58b3968eedc33ddde1413edfdac991197b14e939 Mon Sep 17 00:00:00 2001 From: Alexander Vdolainen Date: Sat, 20 Dec 2014 02:39:11 +0200 Subject: [PATCH] added API for dynamic polls; --- include/sntl/pth_queue.h | 60 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 56 insertions(+), 4 deletions(-) diff --git a/include/sntl/pth_queue.h b/include/sntl/pth_queue.h index d29e7b3..e73bf84 100644 --- a/include/sntl/pth_queue.h +++ b/include/sntl/pth_queue.h @@ -9,7 +9,7 @@ /** * @file pth_queue.h * @author Alexander Vdolainen - * @date 4 Nov 2013 + * @date 4 Nov 2013, 20 Dec 2014 (dynamic polls) * @brief queue implementation for threads intercommunication * */ @@ -19,9 +19,15 @@ #include -#define SYS_MSG 0x0f0affee -#define USR_MSG 0x0afeeffe -#define NIL_MSG 0x0 +/* possible message types, ones with POLL_ prefix valid on for pth_dqtpoll_* */ +#define SYS_MSG 0x0f0affee +#define USR_MSG 0x0afeeffe +#define POLL_DECREASE 0x0afafafe +#define POLL_INCREASE 0x0afaffff +#define NIL_MSG 0x0 + +/* max amount of threads within the poll */ +#define MAX_POLL_VALUE 32 typedef struct pth_msg_s { void *data; /** < message payload */ @@ -54,4 +60,50 @@ unsigned int pth_queue_length(pth_queue_t *queue); int pth_queue_destroy(pth_queue_t *queue, int freedata, void (*free_msg)(void *)); +/* dynamic queue thread poll ... bbrrr .... ok, ok with beer + * Dynamic queue thread poll is a queue like pth_queue, + * but also it has itäs own mamagement for threads - that's + * why dynamic. + * Ideally, the model is trying to achieve the following: + * 1. one thread in queue while no or very small amount of jobs in the queue + * 2. grow until max threads is reached while too many requests + * 3. gently slide down volume of threads after job heat + * 4. minimal additional drawbacks (i hate something periodically running, + * it's bad practice) + * The model is quite simple, we should make spurious wakeups equal to zero, + * if no - decrease poll value, and, if we don't have thread available - + * create it. + */ +typedef struct pth_dqtpoll_s { + pth_queue_t *queue; /** < Job queue */ + pthread_t *poll; /** < Thread descriptors */ + int (*jobdata_callback)(void *); /** < Callback to have a deal with data */ + int flags; /** < Flags */ + pthread_rwlock_t stats_lock; /** < rwlock for stats data */ + unsigned long spurious_wakeups; /** < amount of spurios wakeups */ + int poll_value; /** < value of the poll (totally) */ + int sleep_value; /** < value of the sleeping threads */ +} pth_dqtpoll_t; + +/* flags for poll */ +#define DQTPOLL_RUNNING (1 << 1) /* poll is running */ +#define DQTPOLL_DEADSTAGE (1 << 2) /* poll in the stage of destroy */ + +/* init poll, structure must be allocated */ +int pth_dqtpoll_init(pth_dqtpoll_t*, int (*jobdata_callback)(void *)); + +/* run poll: poll */ +int pth_dqtpoll_run(pth_dqtpoll_t*); + +/* add the job to the queue: poll, job data, message type */ +int pth_dqtpoll_add(pth_dqtpoll_t*, void*, unsigned int); + +/* destroy the poll: poll, force flag + * if force flag is set (!= 0), give up + * about jobs, if no, do the job, but don't + * accept the new ones, and destroy all poll + * with last thread. + */ +int pth_dqtpoll_destroy(pth_dqtpoll_t*, int); + #endif /* __PTH_QUEUE_H__ */