|
|
|
/*
|
|
|
|
* This is a proprietary software. See COPYING for further details.
|
|
|
|
*
|
|
|
|
* (c) 2013 Copyright Askele, inc. <http://askele.com>
|
|
|
|
* (c) 2013 Copyright Askele Ingria, inc. <http://askele-ingria.com>
|
|
|
|
* (c) 2014 Copyright Confident, inc. (granted permission to use in commercial software)
|
|
|
|
*/
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @file pth_queue.h
|
|
|
|
* @author Alexander Vdolainen
|
|
|
|
* @date 4 Nov 2013, 20 Dec 2014 (dynamic polls)
|
|
|
|
* @brief queue implementation for threads intercommunication
|
|
|
|
*
|
|
|
|
*/
|
|
|
|
|
|
|
|
#ifndef __PTH_QUEUE_H__
|
|
|
|
#define __PTH_QUEUE_H__
|
|
|
|
|
|
|
|
#include <pthread.h>
|
|
|
|
#include <tdata/idx_allocator.h>
|
|
|
|
|
|
|
|
/* possible message types, ones with POLL_ prefix valid on for pth_dqtpoll_* */
|
|
|
|
#define SYS_MSG 0x0f0affee
|
|
|
|
#define USR_MSG 0x0afeeffe
|
|
|
|
#define POLL_DECREASE 0x0afafafe
|
|
|
|
#define POLL_INCREASE 0x0afaffff
|
|
|
|
#define NIL_MSG 0x0
|
|
|
|
#define END_MSG 0xdead0000
|
|
|
|
|
|
|
|
/* max amount of threads within the poll */
|
|
|
|
#define MAX_POLL_VALUE 32
|
|
|
|
|
|
|
|
typedef struct pth_msg_s {
|
|
|
|
void *data; /** < message payload */
|
|
|
|
unsigned int msgtype; /** < message type ID */
|
|
|
|
unsigned int qlength; /** < current queue length (actual on add moment),
|
|
|
|
* it makes no sense with few readers */
|
|
|
|
usrtc_node_t node;
|
|
|
|
} pth_msg_t;
|
|
|
|
|
|
|
|
typedef struct pth_queue_s {
|
|
|
|
unsigned int length;
|
|
|
|
/* sync */
|
|
|
|
pthread_mutex_t mutex;
|
|
|
|
pthread_cond_t cond;
|
|
|
|
/* queue data */
|
|
|
|
usrtc_t qtree;
|
|
|
|
/* cache */
|
|
|
|
usrtc_t msgcache;
|
|
|
|
} pth_queue_t;
|
|
|
|
|
|
|
|
int pth_queue_init(pth_queue_t *queue);
|
|
|
|
|
|
|
|
int pth_queue_add(pth_queue_t *queue, void *data, unsigned int msgtype);
|
|
|
|
|
|
|
|
int pth_queue_get(pth_queue_t *queue, const struct timespec *timeout,
|
|
|
|
pth_msg_t *msg);
|
|
|
|
|
|
|
|
unsigned int pth_queue_length(pth_queue_t *queue);
|
|
|
|
|
|
|
|
int pth_queue_destroy(pth_queue_t *queue, int freedata,
|
|
|
|
void (*free_msg)(void *));
|
|
|
|
|
|
|
|
/* dynamic queue thread poll ... bbrrr .... ok, ok with beer
|
|
|
|
* Dynamic queue thread poll is a queue like pth_queue,
|
|
|
|
* but also it has itäs own mamagement for threads - that's
|
|
|
|
* why dynamic.
|
|
|
|
* Ideally, the model is trying to achieve the following:
|
|
|
|
* 1. one thread in queue while no or very small amount of jobs in the queue
|
|
|
|
* 2. grow until max threads is reached while too many requests
|
|
|
|
* 3. gently slide down volume of threads after job heat
|
|
|
|
* 4. minimal additional drawbacks (i hate something periodically running,
|
|
|
|
* it's bad practice)
|
|
|
|
* The model is quite simple, we should make spurious wakeups equal to zero,
|
|
|
|
* if no - decrease poll value, and, if we don't have thread available -
|
|
|
|
* create it.
|
|
|
|
*/
|
|
|
|
typedef struct pth_dqtpoll_s {
|
|
|
|
pth_queue_t *queue; /** < Job queue */
|
|
|
|
pthread_t *poll; /** < Thread descriptors */
|
|
|
|
int (*jobdata_callback)(void *); /** < Callback to have a deal with data */
|
|
|
|
int flags; /** < Flags */
|
|
|
|
idx_allocator_t *idx; /** < index allocator for the poll threads */
|
|
|
|
pthread_rwlock_t stats_lock; /** < rwlock for stats data */
|
|
|
|
unsigned long spurious_wakeups; /** < amount of spurios wakeups */
|
|
|
|
int poll_value; /** < value of the poll (totally) */
|
|
|
|
int sleep_value; /** < value of the sleeping threads */
|
|
|
|
} pth_dqtpoll_t;
|
|
|
|
|
|
|
|
/* flags for poll */
|
|
|
|
#define DQTPOLL_RUNNING (1 << 1) /* poll is running */
|
|
|
|
#define DQTPOLL_DEADSTAGE (1 << 2) /* poll in the stage of destroy */
|
|
|
|
|
|
|
|
/* init poll, structure must be allocated */
|
|
|
|
int pth_dqtpoll_init(pth_dqtpoll_t*, int (*jobdata_callback)(void *));
|
|
|
|
|
|
|
|
/* run poll: poll */
|
|
|
|
int pth_dqtpoll_run(pth_dqtpoll_t*);
|
|
|
|
|
|
|
|
/* add the job to the queue: poll, job data, message type */
|
|
|
|
int pth_dqtpoll_add(pth_dqtpoll_t*, void*, unsigned int);
|
|
|
|
|
|
|
|
/* destroy the poll: poll, force flag
|
|
|
|
* if force flag is set (!= 0), give up
|
|
|
|
* about jobs, if no, do the job, but don't
|
|
|
|
* accept the new ones, and destroy all poll
|
|
|
|
* with last thread.
|
|
|
|
*/
|
|
|
|
int pth_dqtpoll_destroy(pth_dqtpoll_t*, int);
|
|
|
|
|
|
|
|
#endif /* __PTH_QUEUE_H__ */
|