eliminate deadlocking mechanism in dbgi search cache lookup/mutation; replace with simpler but possibly worse mechanism for now

This commit is contained in:
Ryan Fleury
2024-11-07 09:57:25 -08:00
parent d036eb9fc5
commit e204fe28c1
2 changed files with 15 additions and 20 deletions
+14 -18
View File
@@ -205,8 +205,7 @@ di_init(void)
for(U64 idx = 0; idx < di_shared->search_stripes_count; idx += 1)
{
di_shared->search_stripes[idx].arena = arena_alloc();
di_shared->search_stripes[idx].r_mutex = os_rw_mutex_alloc();
di_shared->search_stripes[idx].w_mutex = os_rw_mutex_alloc();
di_shared->search_stripes[idx].rw_mutex = os_rw_mutex_alloc();
di_shared->search_stripes[idx].cv = os_condition_variable_alloc();
}
di_shared->u2p_ring_mutex = os_mutex_alloc();
@@ -637,7 +636,7 @@ di_search_items_from_key_params_query(DI_Scope *scope, U128 key, DI_SearchParams
U64 stripe_idx = slot_idx%di_shared->search_stripes_count;
DI_SearchSlot * slot = &di_shared->search_slots[slot_idx];
DI_SearchStripe * stripe = &di_shared->search_stripes[stripe_idx];
OS_MutexScopeR(stripe->r_mutex) for(;;)
OS_MutexScopeW(stripe->rw_mutex) for(;;)
{
// rjf: map key -> node
DI_SearchNode *node = 0;
@@ -651,7 +650,7 @@ di_search_items_from_key_params_query(DI_Scope *scope, U128 key, DI_SearchParams
}
// rjf: no node? -> allocate
if(node == 0) OS_MutexScopeW(stripe->w_mutex) OS_MutexScopeRWPromote(stripe->r_mutex)
if(node == 0)
{
node = push_array(stripe->arena, DI_SearchNode, 1);
SLLQueuePush(slot->first, slot->last, node);
@@ -680,18 +679,15 @@ di_search_items_from_key_params_query(DI_Scope *scope, U128 key, DI_SearchParams
}
// rjf: if stale -> request again
if(stale) OS_MutexScopeW(stripe->w_mutex) OS_MutexScopeRWPromote(stripe->r_mutex)
if(stale && node->bucket_read_gen <= node->bucket_write_gen && node->bucket_write_gen < node->bucket_read_gen + ArrayCount(node->buckets)-1)
{
if(node->bucket_read_gen <= node->bucket_write_gen && node->bucket_write_gen < node->bucket_read_gen + ArrayCount(node->buckets)-1)
{
node->bucket_write_gen += 1;
U64 new_bucket_idx = node->bucket_write_gen%ArrayCount(node->buckets);
arena_clear(node->buckets[new_bucket_idx].arena);
node->buckets[new_bucket_idx].query = push_str8_copy(node->buckets[new_bucket_idx].arena, query);
node->buckets[new_bucket_idx].params = di_search_params_copy(node->buckets[new_bucket_idx].arena, params);
node->buckets[new_bucket_idx].params_hash = params_hash;
di_u2s_enqueue_req(thread_idx, key, endt_us);
}
node->bucket_write_gen += 1;
U64 new_bucket_idx = node->bucket_write_gen%ArrayCount(node->buckets);
arena_clear(node->buckets[new_bucket_idx].arena);
node->buckets[new_bucket_idx].query = push_str8_copy(node->buckets[new_bucket_idx].arena, query);
node->buckets[new_bucket_idx].params = di_search_params_copy(node->buckets[new_bucket_idx].arena, params);
node->buckets[new_bucket_idx].params_hash = params_hash;
di_u2s_enqueue_req(thread_idx, key, endt_us);
}
// rjf: not stale, or timeout -> break
@@ -701,7 +697,7 @@ di_search_items_from_key_params_query(DI_Scope *scope, U128 key, DI_SearchParams
}
// rjf: no results, but have time to wait -> wait
os_condition_variable_wait_rw_r(stripe->cv, stripe->r_mutex, endt_us);
os_condition_variable_wait_rw_w(stripe->cv, stripe->rw_mutex, endt_us);
}
}
return items;
@@ -1315,7 +1311,7 @@ di_search_thread__entry_point(void *p)
Arena *arena = 0;
String8 query = {0};
DI_SearchParams params = {0};
OS_MutexScopeR(stripe->r_mutex)
OS_MutexScopeR(stripe->rw_mutex)
{
for(DI_SearchNode *n = slot->first; n != 0; n = n->next)
{
@@ -1414,7 +1410,7 @@ di_search_thread__entry_point(void *p)
for(B32 done = 0; !done;)
{
B32 found = 0;
OS_MutexScopeW(stripe->r_mutex) OS_MutexScopeW(stripe->w_mutex) for(DI_SearchNode *n = slot->first; n != 0; n = n->next)
OS_MutexScopeW(stripe->rw_mutex) for(DI_SearchNode *n = slot->first; n != 0; n = n->next)
{
if(u128_match(n->key, key))
{
+1 -2
View File
@@ -206,8 +206,7 @@ struct DI_SearchStripe
{
Arena *arena;
DI_SearchNode *free_node;
OS_Handle r_mutex;
OS_Handle w_mutex;
OS_Handle rw_mutex;
OS_Handle cv;
};