You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
libsxmp/ydaemon/cache.c

466 lines
13 KiB
C

/*
* Yet another daemon library especially designed to be used
* with libsxmp based daemons.
*
* (c) Alexander Vdolainen 2016 <avdolainen@zoho.com>
*
* libsxmp is free software: you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* libsxmp is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.";
*
*/
#include <sys/mman.h>
#include <unistd.h>
#include <time.h>
#include <ydaemon/dataobject.h>
#include <ydaemon/cache.h>
static long __cmp_oids(const void *a, const void *b)
{
return (long)(*(oid_t *)a - *(oid_t *)b);
}
static ydata_cache_item_t *__alloc_ydcitem(uint32_t idx)
{
ydata_cache_item_t *out = malloc(sizeof(ydata_cache_item_t));
if(out) {
memset(out, 0, sizeof(ydata_cache_item_t));
out->idx = idx;
out->attr |= YDC_UNUSED;
list_init_node(&out->listnode);
usrtc_node_init(&out->idxnode, out);
}
return out;
}
static void *__sync_thread(void *p);
int yd_cache_start_thread(domx_t *domx)
{
ydata_cache_t *cc;
if(!domx || !(cc = (ydata_cache_t *)domx->cache)) return EINVAL;
return pthread_create(&cc->syncthread, NULL, __sync_thread, (void *)domx);
}
static void *__sync_thread(void *p)
{
domx_t *domx = (domx_t *)p;
ydata_cache_t *cc;
domx_dsbe_t *be;
list_node_t *iter, *siter;
ydata_cache_item_t *citem;
struct timespec tio;
int r, c, o;
/* check up, we shouldn't falls to segmentation */
if(!p || !(cc = (ydata_cache_t *)domx->cache)) goto __fini;
if(!(be = domx->be)) goto __fini;
/* everyday life is here */
while(1) {
/* get our tio, yes it's not so green ... but it works */
tio.tv_sec = time(NULL) + YDC_SYNCDELTASEC;
/* lock on mutex for a time being or will wake up on request */
r = pthread_mutex_timedlock(&cc->dirtlock, &tio);
__retrywolock:
yd_cache_rdlock(cc);
if(!cc->dirties) {
yd_cache_unlock(cc);
goto __again;
}
yd_cache_unlock(cc);
yd_cache_wrlock(cc);
c = 0;
list_for_each_safe(&cc->dirty_poll, iter, siter) {
citem = container_of(iter, ydata_cache_item_t, listnode);
list_del(&citem->listnode);
if(citem->attr & YDC_INVALIDATE) { /* invalidated item, just pullback it */
citem->attr = 0;
citem->attr |= YDC_UNUSED;
list_add2tail(&cc->free_poll, &citem->listnode);
usrtc_delete(&cc->idx_tree, &citem->idxnode);
} else {
o = be->f->set(be, citem->oid, yd_cache_ptrbyidx(cc, citem->idx));
if(o != 0) { /* invalidate */
citem->attr = 0;
citem->attr |= YDC_UNUSED;
list_add2tail(&cc->free_poll, &citem->listnode);
usrtc_delete(&cc->idx_tree, &citem->idxnode);
} else citem->attr &= ~YDC_DIRTY; /* just remove this flag */
}
cc->dirties--;
c++;
if(c >= YDC_SYNCTHRESHOLD) break;
}
yd_cache_unlock(cc);
if(c >= YDC_SYNCTHRESHOLD) {
usleep(100); /* looks like a huge load, let's wait and retry without lock */
goto __retrywolock;
}
__again:
if(r != ETIMEDOUT) pthread_mutex_lock(&cc->dirtlock); /* acquire lock again */
}
__fini:
return NULL;
}
ydata_cache_t *yd_cache_init(dataobject_t *object, size_t cache_size)
{
ydata_cache_t *cache = NULL;
char *iter;
ydata_cache_item_t *zcitem;
list_node_t *it, *sit;
int r = 0;
size_t i;
if(!object || !cache_size) {
errno = EINVAL;
goto __fail;
}
/* allocate and zero structure */
if(!(cache = malloc(sizeof(ydata_cache_t)))) {
errno = ENOMEM;
goto __fail;
} else memset(cache, 0, sizeof(ydata_cache_t));
/* map a memory buffer */
cache->cache = mmap(NULL, cache_size, PROT_READ | PROT_WRITE,
MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
if(cache->cache == MAP_FAILED) {
errno = ENOMEM;
goto __fail;
}
/* allocate all the locks */
if((r = pthread_rwlock_init(&cache->rwlock, NULL))) {
errno = r;
goto __fail;
}
if((r = pthread_mutex_init(&cache->dirtlock, NULL))) {
pthread_rwlock_destroy(&cache->rwlock);
errno = r;
goto __fail;
}
/* init other values */
cache->cache_size = cache_size;
cache->object_size = dtolen(object);
/* some paranoic check up */
if(!cache->object_size) {
pthread_rwlock_destroy(&cache->rwlock);
pthread_mutex_destroy(&cache->dirtlock);
errno = EINVAL;
goto __fail;
}
/* well, able to continue ... */
cache->objects_amount = cache->cache_size/(cache->object_size + sizeof(uint32_t));
/* init structures */
usrtc_init(&cache->idx_tree, USRTC_SPLAY, cache->objects_amount, __cmp_oids); /* indexing one */
list_init_head(&cache->free_poll);
list_init_head(&cache->dirty_poll);
list_init_head(&cache->pending_poll);
/* mark i.e. format our memory buffer, also it will populate the pages */
for(i = 0, iter = (char *)cache->cache; i < cache->objects_amount;
i++, iter += cache->object_size + sizeof(uint32_t)) {
/* mark with spacer */
*(uint32_t *)iter = YD_CACHE_SPACER_MAGIC;
zcitem = __alloc_ydcitem(i);
if(!zcitem) { errno = ENOMEM; goto __failall; }
list_add2tail(&cache->free_poll, &zcitem->listnode);
}
/* lock the mutex */
pthread_mutex_lock(&cache->dirtlock);
return cache;
__failall:
pthread_rwlock_destroy(&cache->rwlock);
pthread_mutex_destroy(&cache->dirtlock);
__fail:
/* failcase freeing */
if(cache) {
/* memory buffer */
if(cache->cache != MAP_FAILED || cache->cache != NULL) {
/* if we have a buffer, we might have some items already allocated ... */
list_for_each_safe(&cache->free_poll, it, sit) {
zcitem = container_of(it, ydata_cache_item_t, listnode);
list_del(&zcitem->listnode);
free(zcitem);
}
munmap(cache->cache, cache_size);
}
free(cache);
}
return NULL; /* sadly ... */
}
/* allocating is quite simple:
* we're trying to get a free item, if it
* exists, we're moving this the the pending poll,
* otherwise return nil.
*/
void *yd_cache_alloc_item(ydata_cache_t *cache, ydata_cache_item_t **item)
{
char *data = NULL;
list_node_t *listnode;
ydata_cache_item_t *zcitem;
yd_cache_rdlock(cache);
listnode = list_node_first(&cache->free_poll);
if(listnode) {
yd_cache_unlock(cache); /* retake locks */
yd_cache_wrlock(cache);
zcitem = container_of(listnode, ydata_cache_item_t, listnode);
list_del(&zcitem->listnode); /* delete from free poll */
list_add2tail(&cache->pending_poll, &zcitem->listnode); /* add to the pending poll */
/* do the magic */
data = yd_cache_ptrbyidx(cache, zcitem->idx);
/* mark it invalidate - since we don't know what will be in future */
zcitem->attr |= YDC_INVALIDATE;
*item = zcitem;
}
yd_cache_unlock(cache);
/* ok. we'll do some automagically check for the near items safety */
if(data && *(uint32_t *)data != YD_CACHE_SPACER_MAGIC ) { /* currupted - mark near elements invalidated ! */
/* TODO: do it */
}
if(data) {
data += sizeof(uint32_t); /* bypass magic spacer */
return (void *)data;
}
return NULL;
}
void yd_cache_discard_alloc_item(ydata_cache_t *cache, ydata_cache_item_t *item)
{
if(!(item->attr & YDC_INVALIDATE)) return; /* some shit happens */
yd_cache_wrlock(cache);
list_del(&item->listnode); /* delete from panding poll - guess, u can use API ugly and dirty */
list_add2tail(&cache->free_poll, &item->listnode); /* add to the pending poll */
item->attr &= ~YDC_INVALIDATE; /* clear the invalidate bit */
yd_cache_unlock(cache);
return;
}
void yd_cache_confirm_alloc_item(ydata_cache_t *cache, ydata_cache_item_t *item, oid_t oid)
{
if(!(item->attr & YDC_INVALIDATE)) return; /* some shit happens */
yd_cache_wrlock(cache);
list_del(&item->listnode); /* delete from pending poll - guess, u can use API ugly and dirty */
item->attr &= ~YDC_INVALIDATE; /* clear the invalidate bit */
item->attr &= ~YDC_UNUSED; /* clear the use bit */
yd_item_setoid(item, oid);
usrtc_insert(&cache->idx_tree, &item->idxnode, (void *)&item->oid); /* now it will be in the splay tree of the active cached data */
yd_cache_unlock(cache);
return;
}
void *yd_cache_lookup(ydata_cache_t *cache, oid_t oid)
{
usrtc_node_t *node;
ydata_cache_item_t *item;
void *ret = NULL;
yd_cache_rdlock(cache);
node = usrtc_lookup(&cache->idx_tree, &oid);
if(node) {
item = (ydata_cache_item_t *)usrtc_node_getdata(node);
if(!(item->attr & YDC_INVALIDATE)) ret = yd_cache_ptrbyidx(cache, yd_item_idx(item));
else if(item->attr & YDC_INVALIDATE) { /* invalidated item */
yd_cache_unlock(cache); /* retake lock */
yd_cache_wrlock(cache);
usrtc_delete(&cache->idx_tree, &item->idxnode);
if(item->attr & YDC_DIRTY) { list_del(&item->listnode); /* it was dirty - but invalidated, remove it */
cache->dirties--;
}
item->attr = 0; /* clear attributes */
item->attr |= YDC_UNUSED;
list_add2tail(&cache->free_poll, &item->listnode);
ret = NULL;
}
}
yd_cache_unlock(cache);
if(ret && *(uint32_t *)ret != YD_CACHE_SPACER_MAGIC ) { /* currupted - mark near elements invalidated ! */
/* TODO: do it */
}
if(ret) ret += sizeof(uint32_t);
return ret;
}
static inline ydata_qitem_t *__quickitem_alloc(ydata_cache_item_t *zitem)
{
ydata_qitem_t *itm = malloc(sizeof(ydata_qitem_t));
if(itm) {
list_init_node(&itm->node);
itm->item = zitem;
}
return itm;
}
list_head_t *yd_cache_getrslist(ydata_cache_t *cache, uint8_t amount)
{
list_head_t *head = malloc(sizeof(list_head_t));
usrtc_node_t *node;
list_node_t *_i, *_si;
ydata_qitem_t *itm;
ydata_cache_item_t *zitem;
int i;
if(!head) return NULL;
if(!cache || !amount) goto __failcase; /* little bit of paranoic check */
yd_cache_wrlock(cache);
for(i = 0, node = usrtc_last(&cache->idx_tree);
i < amount;
i++, node = usrtc_last(&cache->idx_tree)) {
if(!node) break; /* pfff */
zitem = (ydata_cache_item_t *)usrtc_node_getdata(node);
/* create an quick entry */
if(!(itm = __quickitem_alloc(zitem))) goto __failcase_hasent;
/* delete from the index tree */
usrtc_delete(&cache->idx_tree, &zitem->idxnode);
list_add2tail(head, &itm->node);
}
yd_cache_unlock(cache);
return head;
__failcase_hasent:
if(head) {
list_for_each_safe(head, _i, _si) {
itm = container_of(_i, ydata_qitem_t, node);
zitem = itm->item;
if(zitem->attr & YDC_INVALIDATE) { /* oops, invalidated item found ! */
if(zitem->attr & YDC_DIRTY) {
list_del(&zitem->listnode);
cache->dirties--;
}
list_add2tail(&cache->free_poll, &zitem->listnode);
zitem->attr = 0;
zitem->attr |= YDC_UNUSED;
} else /* return back */
usrtc_insert(&cache->idx_tree, &zitem->idxnode, (void *)&zitem->oid);
list_del(&itm->node);
free(itm);
}
yd_cache_unlock(cache);
}
__failcase:
if(head) free(head);
return NULL;
}
void yd_cache_qitems_pullback(ydata_cache_t *cache, list_head_t *list)
{
list_node_t *iter, *siter;
ydata_qitem_t *itm;
ydata_cache_item_t *zitem;
yd_cache_wrlock(cache);
list_for_each_safe(list, iter, siter) {
itm = container_of(iter, ydata_qitem_t, node);
zitem = itm->item;
if(zitem->attr & YDC_DIRTY) {
list_del(&zitem->listnode);
cache->dirties--;
}
zitem->attr = 0; /* reset flags */
zitem->attr |= YDC_UNUSED;
list_add2tail(&cache->free_poll, &zitem->listnode);
/* free it at all */
list_del(&itm->node);
free(itm);
}
yd_cache_unlock(cache);
free(list);
return;
}
void yd_cache_item_dirtyoid(ydata_cache_t *cache, oid_t oid)
{
usrtc_node_t *node;
ydata_cache_item_t *zitem;
yd_cache_wrlock(cache);
node = usrtc_lookup(&cache->idx_tree, &oid);
if(node) {
zitem = (ydata_cache_item_t *)usrtc_node_getdata(node);
if(!(zitem->attr & YDC_DIRTY)) {
list_add2tail(&cache->dirty_poll, &zitem->listnode);
zitem->attr |= YDC_DIRTY;
cache->dirties++;
/* schedule sync thread, since threshold of dirty items occured */
if(cache->dirties >= YDC_SYNCTHRESHOLD) pthread_mutex_unlock(&cache->dirtlock);
}
}
yd_cache_unlock(cache);
return;
}
void yd_cache_item_invalidateoid(ydata_cache_t *cache, oid_t oid)
{
usrtc_node_t *node;
ydata_cache_item_t *zitem;
yd_cache_wrlock(cache);
node = usrtc_lookup(&cache->idx_tree, &oid);
if(node) {
zitem = (ydata_cache_item_t *)usrtc_node_getdata(node);
if(!(zitem->attr & YDC_INVALIDATE)) zitem->attr |= YDC_INVALIDATE;
}
yd_cache_unlock(cache);
return;
}