/* * This is a proprietary software. See COPYING for further details. * * (c) 2013 Copyright Askele, inc. * (c) 2013 Copyright Askele Ingria, inc. * (c) 2014 Copyright Confident, inc. (granted permission to use in commercial software) */ /** * @file pth_queue.h * @author Alexander Vdolainen * @date 4 Nov 2013, 20 Dec 2014 (dynamic polls) * @brief queue implementation for threads intercommunication * */ #ifndef __PTH_QUEUE_H__ #define __PTH_QUEUE_H__ #include #include /* possible message types, ones with POLL_ prefix valid on for pth_dqtpoll_* */ #define SYS_MSG 0x0f0affee #define USR_MSG 0x0afeeffe #define POLL_DECREASE 0x0afafafe #define POLL_INCREASE 0x0afaffff #define NIL_MSG 0x0 #define END_MSG 0xdead0000 /* max amount of threads within the poll */ #define MAX_POLL_VALUE 32 typedef struct pth_msg_s { void *data; /** < message payload */ unsigned int msgtype; /** < message type ID */ unsigned int qlength; /** < current queue length (actual on add moment), * it makes no sense with few readers */ usrtc_node_t node; } pth_msg_t; typedef struct pth_queue_s { unsigned int length; /* sync */ pthread_mutex_t mutex; pthread_cond_t cond; /* queue data */ usrtc_t qtree; /* cache */ usrtc_t msgcache; } pth_queue_t; int pth_queue_init(pth_queue_t *queue); int pth_queue_add(pth_queue_t *queue, void *data, unsigned int msgtype); int pth_queue_get(pth_queue_t *queue, const struct timespec *timeout, pth_msg_t *msg); unsigned int pth_queue_length(pth_queue_t *queue); int pth_queue_destroy(pth_queue_t *queue, int freedata, void (*free_msg)(void *)); /* dynamic queue thread poll ... bbrrr .... ok, ok with beer * Dynamic queue thread poll is a queue like pth_queue, * but also it has itäs own mamagement for threads - that's * why dynamic. * Ideally, the model is trying to achieve the following: * 1. one thread in queue while no or very small amount of jobs in the queue * 2. grow until max threads is reached while too many requests * 3. gently slide down volume of threads after job heat * 4. minimal additional drawbacks (i hate something periodically running, * it's bad practice) * The model is quite simple, we should make spurious wakeups equal to zero, * if no - decrease poll value, and, if we don't have thread available - * create it. */ typedef struct pth_dqtpoll_s { pth_queue_t *queue; /** < Job queue */ pthread_t *poll; /** < Thread descriptors */ int (*jobdata_callback)(void *); /** < Callback to have a deal with data */ int flags; /** < Flags */ idx_allocator_t *idx; /** < index allocator for the poll threads */ pthread_rwlock_t stats_lock; /** < rwlock for stats data */ unsigned long spurious_wakeups; /** < amount of spurios wakeups */ int poll_value; /** < value of the poll (totally) */ struct timeval sched_time; int msgop; } pth_dqtpoll_t; /* flags for poll */ #define DQTPOLL_RUNNING (1 << 1) /* poll is running */ #define DQTPOLL_DEADSTAGE (1 << 2) /* poll in the stage of destroy */ /* keep it stupid */ #define DQTPOLL_DELTAMS 500000 #define DQTPOLL_DELTASE 0 /* init poll, structure must be allocated */ int pth_dqtpoll_init(pth_dqtpoll_t*, int (*jobdata_callback)(void *)); /* run poll: poll */ int pth_dqtpoll_run(pth_dqtpoll_t*); /* add the job to the queue: poll, job data, message type */ int pth_dqtpoll_add(pth_dqtpoll_t*, void*, unsigned int); /* destroy the poll: poll, force flag * if force flag is set (!= 0), give up * about jobs, if no, do the job, but don't * accept the new ones, and destroy all poll * with last thread. */ int pth_dqtpoll_destroy(pth_dqtpoll_t*, int); #endif /* __PTH_QUEUE_H__ */