From 31f25d0fe79de5fde3234dd32edca01c98da4f98 Mon Sep 17 00:00:00 2001 From: Ryan Fleury Date: Sun, 10 Nov 2024 11:23:02 -0800 Subject: [PATCH] eliminate attempts at clever read-only synchronization in file stream & ctrl layer, just fall back to simple locking mechanism --- src/ctrl/ctrl_core.c | 9 ++++----- src/ctrl/ctrl_core.h | 3 +-- src/file_stream/file_stream.c | 28 ++++++++++++---------------- src/file_stream/file_stream.h | 3 +-- 4 files changed, 18 insertions(+), 25 deletions(-) diff --git a/src/ctrl/ctrl_core.c b/src/ctrl/ctrl_core.c index e6fce9b5..789bead4 100644 --- a/src/ctrl/ctrl_core.c +++ b/src/ctrl/ctrl_core.c @@ -1253,8 +1253,7 @@ ctrl_init(void) for(U64 idx = 0; idx < ctrl_state->thread_reg_cache.stripes_count; idx += 1) { ctrl_state->thread_reg_cache.stripes[idx].arena = arena_alloc(); - ctrl_state->thread_reg_cache.stripes[idx].r_mutex = os_rw_mutex_alloc(); - ctrl_state->thread_reg_cache.stripes[idx].w_mutex = os_rw_mutex_alloc(); + ctrl_state->thread_reg_cache.stripes[idx].rw_mutex = os_rw_mutex_alloc(); } ctrl_state->module_image_info_cache.slots_count = 1024; ctrl_state->module_image_info_cache.slots = push_array(arena, CTRL_ModuleImageInfoCacheSlot, ctrl_state->module_image_info_cache.slots_count); @@ -1780,7 +1779,7 @@ ctrl_query_cached_reg_block_from_thread(Arena *arena, CTRL_EntityStore *store, C CTRL_ThreadRegCacheSlot *slot = &cache->slots[slot_idx]; CTRL_ThreadRegCacheStripe *stripe = &cache->stripes[stripe_idx]; void *result = push_array(arena, U8, reg_block_size); - OS_MutexScopeR(stripe->r_mutex) + OS_MutexScopeW(stripe->rw_mutex) { // rjf: find existing node CTRL_ThreadRegCacheNode *node = 0; @@ -1794,7 +1793,7 @@ ctrl_query_cached_reg_block_from_thread(Arena *arena, CTRL_EntityStore *store, C } // rjf: allocate existing node - if(!node) OS_MutexScopeW(stripe->w_mutex) OS_MutexScopeRWPromote(stripe->r_mutex) + if(!node) { node = push_array(stripe->arena, CTRL_ThreadRegCacheNode, 1); DLLPushBack(slot->first, slot->last, node); @@ -1810,7 +1809,7 @@ ctrl_query_cached_reg_block_from_thread(Arena *arena, CTRL_EntityStore *store, C B32 need_stale = 1; if(node->reg_gen != current_reg_gen && dmn_thread_read_reg_block(handle.dmn_handle, result)) { - OS_MutexScopeW(stripe->w_mutex) OS_MutexScopeRWPromote(stripe->r_mutex) if(node != 0) + if(node != 0) { need_stale = 0; node->reg_gen = current_reg_gen; diff --git a/src/ctrl/ctrl_core.h b/src/ctrl/ctrl_core.h index 6022256b..47c0078d 100644 --- a/src/ctrl/ctrl_core.h +++ b/src/ctrl/ctrl_core.h @@ -772,8 +772,7 @@ typedef struct CTRL_ThreadRegCacheStripe CTRL_ThreadRegCacheStripe; struct CTRL_ThreadRegCacheStripe { Arena *arena; - OS_Handle r_mutex; - OS_Handle w_mutex; + OS_Handle rw_mutex; }; typedef struct CTRL_ThreadRegCache CTRL_ThreadRegCache; diff --git a/src/file_stream/file_stream.c b/src/file_stream/file_stream.c index bedb6f60..e30cc17a 100644 --- a/src/file_stream/file_stream.c +++ b/src/file_stream/file_stream.c @@ -47,8 +47,7 @@ fs_init(void) { fs_shared->stripes[idx].arena = arena_alloc(); fs_shared->stripes[idx].cv = os_condition_variable_alloc(); - fs_shared->stripes[idx].r_mutex = os_rw_mutex_alloc(); - fs_shared->stripes[idx].w_mutex = os_rw_mutex_alloc(); + fs_shared->stripes[idx].rw_mutex = os_rw_mutex_alloc(); } fs_shared->u2s_ring_size = KB(64); fs_shared->u2s_ring_base = push_array_no_zero(arena, U8, fs_shared->u2s_ring_size); @@ -107,7 +106,7 @@ fs_hash_from_path_range(String8 path, Rng1U64 range, U64 endt_us) FS_Stripe *path_stripe = &fs_shared->stripes[path_stripe_idx]; // rjf: loop: request, check for results, return until we can't - OS_MutexScopeR(path_stripe->r_mutex) for(;;) + OS_MutexScopeW(path_stripe->rw_mutex) for(;;) { // rjf: path -> node FS_Node *node = 0; @@ -121,7 +120,7 @@ fs_hash_from_path_range(String8 path, Rng1U64 range, U64 endt_us) } // rjf: node does not exist? -> create & store - if(node == 0) OS_MutexScopeW(path_stripe->w_mutex) OS_MutexScopeRWPromote(path_stripe->r_mutex) + if(node == 0) { node = push_array(path_stripe->arena, FS_Node, 1); SLLQueuePush(path_slot->first, path_slot->last, node); @@ -145,14 +144,11 @@ fs_hash_from_path_range(String8 path, Rng1U64 range, U64 endt_us) } // rjf: range node does not exist? create & store - if(range_node == 0) OS_MutexScopeW(path_stripe->w_mutex) OS_MutexScopeRWPromote(path_stripe->r_mutex) + if(range_node == 0) { - if(range_node == 0) - { - range_node = push_array(path_stripe->arena, FS_RangeNode, 1); - SLLQueuePush(range_slot->first, range_slot->last, range_node); - range_node->range = range; - } + range_node = push_array(path_stripe->arena, FS_RangeNode, 1); + SLLQueuePush(range_slot->first, range_slot->last, range_node); + range_node->range = range; } // rjf: try to send stream request @@ -169,7 +165,7 @@ fs_hash_from_path_range(String8 path, Rng1U64 range, U64 endt_us) // rjf: have time to wait? -> wait on this stripe; otherwise exit if(u128_match(result, u128_zero()) && os_now_microseconds() <= endt_us) { - os_condition_variable_wait_rw_r(path_stripe->cv, path_stripe->r_mutex, endt_us); + os_condition_variable_wait_rw_w(path_stripe->cv, path_stripe->rw_mutex, endt_us); } else { @@ -205,7 +201,7 @@ fs_timestamp_from_path(String8 path) U64 stripe_idx = slot_idx%fs_shared->stripes_count; FS_Slot *slot = &fs_shared->slots[slot_idx]; FS_Stripe *stripe = &fs_shared->stripes[stripe_idx]; - OS_MutexScopeR(stripe->r_mutex) + OS_MutexScopeR(stripe->rw_mutex) { for(FS_Node *n = slot->first; n != 0; n = n->next) { @@ -231,7 +227,7 @@ fs_size_from_path(String8 path) U64 stripe_idx = slot_idx%fs_shared->stripes_count; FS_Slot *slot = &fs_shared->slots[slot_idx]; FS_Stripe *stripe = &fs_shared->stripes[stripe_idx]; - OS_MutexScopeR(stripe->r_mutex) + OS_MutexScopeR(stripe->rw_mutex) { for(FS_Node *n = slot->first; n != 0; n = n->next) { @@ -357,7 +353,7 @@ ASYNC_WORK_DEF(fs_stream_work) } //- rjf: commit info to cache - ProfScope("commit to cache") OS_MutexScopeW(path_stripe->r_mutex) OS_MutexScopeW(path_stripe->w_mutex) + ProfScope("commit to cache") OS_MutexScopeW(path_stripe->rw_mutex) { FS_Node *node = 0; for(FS_Node *n = path_slot->first; n != 0; n = n->next) @@ -414,7 +410,7 @@ fs_detector_thread__entry_point(void *p) for(U64 stripe_idx = 0; stripe_idx < fs_shared->stripes_count; stripe_idx += 1) { FS_Stripe *stripe = &fs_shared->stripes[stripe_idx]; - OS_MutexScopeR(stripe->r_mutex) for(U64 slot_in_stripe_idx = 0; slot_in_stripe_idx < slots_per_stripe; slot_in_stripe_idx += 1) + OS_MutexScopeR(stripe->rw_mutex) for(U64 slot_in_stripe_idx = 0; slot_in_stripe_idx < slots_per_stripe; slot_in_stripe_idx += 1) { U64 slot_idx = stripe_idx*slots_per_stripe + slot_in_stripe_idx; FS_Slot *slot = &fs_shared->slots[slot_idx]; diff --git a/src/file_stream/file_stream.h b/src/file_stream/file_stream.h index 30178d9e..e948c5a1 100644 --- a/src/file_stream/file_stream.h +++ b/src/file_stream/file_stream.h @@ -50,8 +50,7 @@ struct FS_Stripe { Arena *arena; OS_Handle cv; - OS_Handle r_mutex; - OS_Handle w_mutex; + OS_Handle rw_mutex; }; ////////////////////////////////