search cache eviction

This commit is contained in:
Ryan Fleury
2024-11-15 14:40:05 -08:00
parent c42e7837ac
commit 6be2792659
2 changed files with 64 additions and 2 deletions
+60 -2
View File
@@ -235,6 +235,7 @@ di_init(void)
di_shared->search_threads[idx].ring_base = push_array_no_zero(arena, U8, di_shared->search_threads[idx].ring_size);
di_shared->search_threads[idx].thread = os_thread_launch(di_search_thread__entry_point, (void *)idx, 0);
}
di_shared->search_evictor_thread = os_thread_launch(di_search_evictor_thread__entry_point, 0, 0);
}
////////////////////////////////
@@ -667,7 +668,16 @@ di_search_items_from_key_params_query(DI_Scope *scope, U128 key, DI_SearchParams
// rjf: no node? -> allocate
if(node == 0)
{
node = push_array(stripe->arena, DI_SearchNode, 1);
node = stripe->free_node;
if(node)
{
SLLStackPop(stripe->free_node);
MemoryZeroStruct(node);
}
else
{
node = push_array(stripe->arena, DI_SearchNode, 1);
}
SLLQueuePush(slot->first, slot->last, node);
node->key = key;
for(U64 idx = 0; idx < ArrayCount(node->buckets); idx += 1)
@@ -677,7 +687,7 @@ di_search_items_from_key_params_query(DI_Scope *scope, U128 key, DI_SearchParams
}
// rjf: record update idx info
ins_atomic_u64_eval_assign(&node->last_update_tick_idx, update_tick_idx());
node->last_update_tick_idx = update_tick_idx();
// rjf: try to grab last valid results for this key/query; determine if stale
B32 stale = 1;
@@ -1347,6 +1357,7 @@ di_search_thread__entry_point(void *p)
if(u128_match(n->key, key))
{
U64 bucket_idx = n->bucket_write_gen%ArrayCount(n->buckets);
n->work_refcount += 1;
arena = n->buckets[bucket_idx].arena;
query = push_str8_copy(scratch.arena, n->buckets[bucket_idx].query);
params = di_search_params_copy(scratch.arena, &n->buckets[bucket_idx].params);
@@ -1453,6 +1464,7 @@ di_search_thread__entry_point(void *p)
if(n->scope_refcount == 0)
{
n->bucket_read_gen += 1;
n->work_refcount -= 1;
if(!cancelled)
{
n->items = items;
@@ -1476,6 +1488,52 @@ di_search_thread__entry_point(void *p)
}
}
internal void
di_search_evictor_thread__entry_point(void *p)
{
ThreadNameF("[di] search evictor thread");
for(;;)
{
for(U64 slot_idx = 0; slot_idx < di_shared->search_slots_count; slot_idx += 1)
{
U64 stripe_idx = slot_idx%di_shared->search_stripes_count;
DI_SearchSlot *slot = &di_shared->search_slots[slot_idx];
DI_SearchStripe *stripe = &di_shared->search_stripes[stripe_idx];
B32 slot_has_work = 0;
OS_MutexScopeR(stripe->rw_mutex)
{
for(DI_SearchNode *n = slot->first; n != 0; n = n->next)
{
if(n->last_update_tick_idx+10 < update_tick_idx() && n->scope_refcount == 0 && n->work_refcount == 0)
{
slot_has_work = 1;
break;
}
}
}
if(slot_has_work) OS_MutexScopeW(stripe->rw_mutex)
{
for(DI_SearchNode *n = slot->first, *next = 0; n != 0; n = next)
{
next = n->next;
if(n->last_update_tick_idx+10 < update_tick_idx() && n->scope_refcount == 0 && n->work_refcount == 0)
{
DLLRemove(slot->first, slot->last, n);
SLLStackPush(stripe->free_node, n);
for EachElement(idx, n->buckets)
{
arena_release(n->buckets[idx].arena);
MemoryZeroStruct(&n->buckets[idx]);
}
MemoryZeroStruct(&n->items);
}
}
}
}
os_sleep_milliseconds(100);
}
}
////////////////////////////////
//~ rjf: Match Store
+4
View File
@@ -185,6 +185,7 @@ struct DI_SearchNode
DI_SearchNode *prev;
U128 key;
U64 scope_refcount;
U64 work_refcount;
U64 last_update_tick_idx;
U64 bucket_read_gen;
U64 bucket_write_gen;
@@ -367,6 +368,7 @@ struct DI_Shared
// rjf: search threads
U64 search_threads_count;
DI_SearchThread *search_threads;
OS_Handle search_evictor_thread;
};
////////////////////////////////
@@ -457,6 +459,8 @@ ASYNC_WORK_DEF(di_search_work);
internal int di_qsort_compare_search_items(DI_SearchItem *a, DI_SearchItem *b);
internal void di_search_thread__entry_point(void *p);
internal void di_search_evictor_thread__entry_point(void *p);
////////////////////////////////
//~ rjf: Match Store