eliminate bifurcated rw lock path based on exclusive mode; promote thread operations to base layer, use os layer as impl; first pass on moving file streaming layer to base layer's async wavefront

This commit is contained in:
Ryan Fleury
2025-09-17 14:47:55 -07:00
parent 99c989a3c3
commit 0d15b8670b
40 changed files with 450 additions and 607 deletions
+38 -38
View File
@@ -1603,7 +1603,7 @@ ctrl_init(void)
ctrl_state->u2csb_ring_mutex = mutex_alloc();
ctrl_state->u2csb_ring_cv = cond_var_alloc();
ctrl_state->ctrl_thread_log = log_alloc();
ctrl_state->ctrl_thread = os_thread_launch(ctrl_thread__entry_point, 0, 0);
ctrl_state->ctrl_thread = thread_launch(ctrl_thread__entry_point, 0);
}
////////////////////////////////
@@ -1637,7 +1637,7 @@ ctrl_key_from_process_vaddr_range(CTRL_Handle process, Rng1U64 vaddr_range, B32
HS_Root root = {0};
{
B32 node_found = 0;
OS_MutexScopeR(process_stripe->rw_mutex)
MutexScopeR(process_stripe->rw_mutex)
{
for(CTRL_ProcessMemoryCacheNode *n = process_slot->first; n != 0; n = n->next)
{
@@ -1649,7 +1649,7 @@ ctrl_key_from_process_vaddr_range(CTRL_Handle process, Rng1U64 vaddr_range, B32
}
}
}
if(!node_found) OS_MutexScopeW(process_stripe->rw_mutex)
if(!node_found) MutexScopeW(process_stripe->rw_mutex)
{
for(CTRL_ProcessMemoryCacheNode *n = process_slot->first; n != 0; n = n->next)
{
@@ -1699,7 +1699,7 @@ ctrl_key_from_process_vaddr_range(CTRL_Handle process, Rng1U64 vaddr_range, B32
B32 id_exists = 0;
B32 id_stale = 0;
B32 id_working = 0;
OS_MutexScopeR(process_stripe->rw_mutex) for(;;)
MutexScopeR(process_stripe->rw_mutex) for(;;)
{
for(CTRL_ProcessMemoryCacheNode *process_n = process_slot->first; process_n != 0; process_n = process_n->next)
{
@@ -1747,7 +1747,7 @@ ctrl_key_from_process_vaddr_range(CTRL_Handle process, Rng1U64 vaddr_range, B32
if(!id_exists || (id_exists && id_stale && !id_working))
{
B32 node_needs_stream = 0;
OS_MutexScopeW(process_stripe->rw_mutex)
MutexScopeW(process_stripe->rw_mutex)
{
for(CTRL_ProcessMemoryCacheNode *process_n = process_slot->first; process_n != 0; process_n = process_n->next)
{
@@ -1796,7 +1796,7 @@ ctrl_key_from_process_vaddr_range(CTRL_Handle process, Rng1U64 vaddr_range, B32
async_push_work(ctrl_mem_stream_work);
requested = 1;
}
else OS_MutexScopeW(process_stripe->rw_mutex)
else MutexScopeW(process_stripe->rw_mutex)
{
for(CTRL_ProcessMemoryCacheNode *process_n = process_slot->first; process_n != 0; process_n = process_n->next)
{
@@ -2041,7 +2041,7 @@ ctrl_process_write(CTRL_Handle process, Rng1U64 range, void *src)
U64 stripe_idx = slot_idx%cache->stripes_count;
CTRL_ProcessMemoryCacheSlot *slot = &cache->slots[slot_idx];
CTRL_ProcessMemoryCacheStripe *stripe = &cache->stripes[stripe_idx];
OS_MutexScopeW(stripe->rw_mutex)
MutexScopeW(stripe->rw_mutex)
{
for(CTRL_ProcessMemoryCacheNode *proc_n = slot->first; proc_n != 0; proc_n = proc_n->next)
{
@@ -2097,7 +2097,7 @@ ctrl_reg_block_from_thread(Arena *arena, CTRL_EntityCtx *ctx, CTRL_Handle handle
CTRL_ThreadRegCacheSlot *slot = &cache->slots[slot_idx];
CTRL_ThreadRegCacheStripe *stripe = &cache->stripes[stripe_idx];
void *result = push_array(arena, U8, reg_block_size);
OS_MutexScopeW(stripe->rw_mutex)
MutexScopeW(stripe->rw_mutex)
{
// rjf: find existing node
CTRL_ThreadRegCacheNode *node = 0;
@@ -2203,7 +2203,7 @@ ctrl_intel_pdata_from_module_voff(Arena *arena, CTRL_Handle module_handle, U64 v
U64 stripe_idx = slot_idx%ctrl_state->module_image_info_cache.stripes_count;
CTRL_ModuleImageInfoCacheSlot *slot = &ctrl_state->module_image_info_cache.slots[slot_idx];
CTRL_ModuleImageInfoCacheStripe *stripe = &ctrl_state->module_image_info_cache.stripes[stripe_idx];
OS_MutexScopeR(stripe->rw_mutex) for(CTRL_ModuleImageInfoCacheNode *n = slot->first; n != 0; n = n->next)
MutexScopeR(stripe->rw_mutex) for(CTRL_ModuleImageInfoCacheNode *n = slot->first; n != 0; n = n->next)
{
if(ctrl_handle_match(n->module, module_handle))
{
@@ -2269,7 +2269,7 @@ ctrl_entry_point_voff_from_module(CTRL_Handle module_handle)
U64 stripe_idx = slot_idx%ctrl_state->module_image_info_cache.stripes_count;
CTRL_ModuleImageInfoCacheSlot *slot = &ctrl_state->module_image_info_cache.slots[slot_idx];
CTRL_ModuleImageInfoCacheStripe *stripe = &ctrl_state->module_image_info_cache.stripes[stripe_idx];
OS_MutexScopeR(stripe->rw_mutex) for(CTRL_ModuleImageInfoCacheNode *n = slot->first; n != 0; n = n->next)
MutexScopeR(stripe->rw_mutex) for(CTRL_ModuleImageInfoCacheNode *n = slot->first; n != 0; n = n->next)
{
if(ctrl_handle_match(n->module, module_handle))
{
@@ -2289,7 +2289,7 @@ ctrl_tls_vaddr_range_from_module(CTRL_Handle module_handle)
U64 stripe_idx = slot_idx%ctrl_state->module_image_info_cache.stripes_count;
CTRL_ModuleImageInfoCacheSlot *slot = &ctrl_state->module_image_info_cache.slots[slot_idx];
CTRL_ModuleImageInfoCacheStripe *stripe = &ctrl_state->module_image_info_cache.stripes[stripe_idx];
OS_MutexScopeR(stripe->rw_mutex) for(CTRL_ModuleImageInfoCacheNode *n = slot->first; n != 0; n = n->next)
MutexScopeR(stripe->rw_mutex) for(CTRL_ModuleImageInfoCacheNode *n = slot->first; n != 0; n = n->next)
{
if(ctrl_handle_match(n->module, module_handle))
{
@@ -2309,7 +2309,7 @@ ctrl_initial_debug_info_path_from_module(Arena *arena, CTRL_Handle module_handle
U64 stripe_idx = slot_idx%ctrl_state->module_image_info_cache.stripes_count;
CTRL_ModuleImageInfoCacheSlot *slot = &ctrl_state->module_image_info_cache.slots[slot_idx];
CTRL_ModuleImageInfoCacheStripe *stripe = &ctrl_state->module_image_info_cache.stripes[stripe_idx];
OS_MutexScopeR(stripe->rw_mutex) for(CTRL_ModuleImageInfoCacheNode *n = slot->first; n != 0; n = n->next)
MutexScopeR(stripe->rw_mutex) for(CTRL_ModuleImageInfoCacheNode *n = slot->first; n != 0; n = n->next)
{
if(ctrl_handle_match(n->module, module_handle))
{
@@ -2329,7 +2329,7 @@ ctrl_raddbg_data_from_module(Arena *arena, CTRL_Handle module_handle)
U64 stripe_idx = slot_idx%ctrl_state->module_image_info_cache.stripes_count;
CTRL_ModuleImageInfoCacheSlot *slot = &ctrl_state->module_image_info_cache.slots[slot_idx];
CTRL_ModuleImageInfoCacheStripe *stripe = &ctrl_state->module_image_info_cache.stripes[stripe_idx];
OS_MutexScopeR(stripe->rw_mutex) for(CTRL_ModuleImageInfoCacheNode *n = slot->first; n != 0; n = n->next)
MutexScopeR(stripe->rw_mutex) for(CTRL_ModuleImageInfoCacheNode *n = slot->first; n != 0; n = n->next)
{
if(ctrl_handle_match(n->module, module_handle))
{
@@ -3460,7 +3460,7 @@ ctrl_call_stack_from_thread(CTRL_Scope *scope, CTRL_Handle thread_handle, B32 hi
B32 node_exists = 0;
B32 node_stale = 1;
B32 node_working = 0;
OS_MutexScopeR(stripe->rw_mutex) for(;;)
MutexScopeR(stripe->rw_mutex) for(;;)
{
CTRL_CallStackCacheNode *node = 0;
for(CTRL_CallStackCacheNode *n = slot->first; n != 0; n = n->next)
@@ -3499,7 +3499,7 @@ ctrl_call_stack_from_thread(CTRL_Scope *scope, CTRL_Handle thread_handle, B32 hi
//- rjf: [write] node does not exist => create; request if new or stale
B32 need_request = (!node_exists || node_stale);
CTRL_CallStackCacheNode *node_to_request = 0;
if(can_request && need_request) OS_MutexScopeW(stripe->rw_mutex)
if(can_request && need_request) MutexScopeW(stripe->rw_mutex)
{
CTRL_CallStackCacheNode *node = 0;
for(CTRL_CallStackCacheNode *n = slot->first; n != 0; n = n->next)
@@ -3530,7 +3530,7 @@ ctrl_call_stack_from_thread(CTRL_Scope *scope, CTRL_Handle thread_handle, B32 hi
{
async_push_work(ctrl_call_stack_build_work, .priority = high_priority ? ASYNC_Priority_High : ASYNC_Priority_Low);
}
else OS_MutexScopeW(stripe->rw_mutex)
else MutexScopeW(stripe->rw_mutex)
{
node_to_request->working_count -= 1;
}
@@ -3551,7 +3551,7 @@ ctrl_call_stack_tree(CTRL_Scope *scope, U64 endt_us)
U64 reg_gen = ctrl_reg_gen();
U64 mem_gen = ctrl_mem_gen();
CTRL_CallStackTreeCache *cache = &ctrl_state->call_stack_tree_cache;
OS_MutexScopeR(cache->rw_mutex) for(;;)
MutexScopeR(cache->rw_mutex) for(;;)
{
// rjf: unpack cache/time state
B32 is_stale = (cache->reg_gen != reg_gen || cache->mem_gen != mem_gen);
@@ -3641,7 +3641,7 @@ ctrl_u2c_push_msgs(CTRL_MsgList *msgs, U64 endt_us)
Temp scratch = scratch_begin(0, 0);
String8 msgs_srlzed_baked = ctrl_serialized_string_from_msg_list(scratch.arena, msgs);
B32 good = 0;
OS_MutexScope(ctrl_state->u2c_ring_mutex) for(;;)
MutexScope(ctrl_state->u2c_ring_mutex) for(;;)
{
U64 unconsumed_size = (ctrl_state->u2c_ring_write_pos-ctrl_state->u2c_ring_read_pos);
U64 available_size = ctrl_state->u2c_ring_size-unconsumed_size;
@@ -3672,7 +3672,7 @@ ctrl_u2c_pop_msgs(Arena *arena)
{
Temp scratch = scratch_begin(&arena, 1);
String8 msgs_srlzed_baked = {0};
OS_MutexScope(ctrl_state->u2c_ring_mutex) for(;;)
MutexScope(ctrl_state->u2c_ring_mutex) for(;;)
{
U64 unconsumed_size = (ctrl_state->u2c_ring_write_pos-ctrl_state->u2c_ring_read_pos);
if(unconsumed_size >= sizeof(U64))
@@ -3699,7 +3699,7 @@ ctrl_c2u_push_events(CTRL_EventList *events)
{
if(events->count != 0) ProfScope("ctrl_c2u_push_events")
{
OS_MutexScopeW(ctrl_state->ctrl_thread_entity_ctx_rw_mutex)
MutexScopeW(ctrl_state->ctrl_thread_entity_ctx_rw_mutex)
{
ctrl_entity_store_apply_events(ctrl_state->ctrl_thread_entity_store, events);
}
@@ -3707,7 +3707,7 @@ ctrl_c2u_push_events(CTRL_EventList *events)
{
Temp scratch = scratch_begin(0, 0);
String8 event_srlzed = ctrl_serialized_string_from_event(scratch.arena, &n->v, ctrl_state->c2u_ring_size-sizeof(U64));
OS_MutexScope(ctrl_state->c2u_ring_mutex) for(;;)
MutexScope(ctrl_state->c2u_ring_mutex) for(;;)
{
U64 unconsumed_size = (ctrl_state->c2u_ring_write_pos-ctrl_state->c2u_ring_read_pos);
U64 available_size = ctrl_state->c2u_ring_size-unconsumed_size;
@@ -3736,7 +3736,7 @@ ctrl_c2u_pop_events(Arena *arena)
ProfBeginFunction();
Temp scratch = scratch_begin(&arena, 1);
CTRL_EventList events = {0};
OS_MutexScope(ctrl_state->c2u_ring_mutex) for(;;)
MutexScope(ctrl_state->c2u_ring_mutex) for(;;)
{
U64 unconsumed_size = (ctrl_state->c2u_ring_write_pos-ctrl_state->c2u_ring_read_pos);
if(unconsumed_size >= sizeof(U64))
@@ -3766,7 +3766,7 @@ ctrl_c2u_pop_events(Arena *arena)
internal void
ctrl_thread__entry_point(void *p)
{
ThreadNameF("[ctrl] thread");
ThreadNameF("ctrl_thread");
ProfBeginFunction();
DMN_CtrlCtx *ctrl_ctx = dmn_ctrl_begin();
log_select(ctrl_state->ctrl_thread_log);
@@ -3896,7 +3896,7 @@ ctrl_thread__entry_point(void *p)
CTRL_Entity *debug_info_path = ctrl_entity_child_from_kind(module, CTRL_EntityKind_DebugInfoPath);
DI_Key old_dbgi_key = {debug_info_path->string, debug_info_path->timestamp};
di_close(&old_dbgi_key);
OS_MutexScopeW(ctrl_state->ctrl_thread_entity_ctx_rw_mutex)
MutexScopeW(ctrl_state->ctrl_thread_entity_ctx_rw_mutex)
{
ctrl_entity_equip_string(ctrl_state->ctrl_thread_entity_store, debug_info_path, path_normalized_from_string(scratch.arena, path));
}
@@ -4386,7 +4386,7 @@ ctrl_thread__module_open(CTRL_Handle process, CTRL_Handle module, Rng1U64 vaddr_
U64 stripe_idx = slot_idx%ctrl_state->module_image_info_cache.stripes_count;
CTRL_ModuleImageInfoCacheSlot *slot = &ctrl_state->module_image_info_cache.slots[slot_idx];
CTRL_ModuleImageInfoCacheStripe *stripe = &ctrl_state->module_image_info_cache.stripes[stripe_idx];
OS_MutexScopeW(stripe->rw_mutex)
MutexScopeW(stripe->rw_mutex)
{
CTRL_ModuleImageInfoCacheNode *node = 0;
for(CTRL_ModuleImageInfoCacheNode *n = slot->first; n != 0; n = n->next)
@@ -4427,7 +4427,7 @@ ctrl_thread__module_close(CTRL_Handle process, CTRL_Handle module, Rng1U64 vaddr
U64 stripe_idx = slot_idx%ctrl_state->module_image_info_cache.stripes_count;
CTRL_ModuleImageInfoCacheSlot *slot = &ctrl_state->module_image_info_cache.slots[slot_idx];
CTRL_ModuleImageInfoCacheStripe *stripe = &ctrl_state->module_image_info_cache.stripes[stripe_idx];
OS_MutexScopeW(stripe->rw_mutex)
MutexScopeW(stripe->rw_mutex)
{
CTRL_ModuleImageInfoCacheNode *node = 0;
for(CTRL_ModuleImageInfoCacheNode *n = slot->first; n != 0; n = n->next)
@@ -5068,7 +5068,7 @@ ctrl_thread__next_dmn_event(Arena *arena, DMN_CtrlCtx *ctrl_ctx, CTRL_Msg *msg,
U64 stripe_idx = slot_idx%cache->stripes_count;
CTRL_ProcessMemoryCacheSlot *slot = &cache->slots[slot_idx];
CTRL_ProcessMemoryCacheStripe *stripe = &cache->stripes[stripe_idx];
OS_MutexScopeW(stripe->rw_mutex)
MutexScopeW(stripe->rw_mutex)
{
for(CTRL_ProcessMemoryCacheNode *n = slot->first, *next = 0; n != 0; n = next)
{
@@ -5431,7 +5431,7 @@ ctrl_thread__launch(DMN_CtrlCtx *ctrl_ctx, CTRL_Msg *msg)
//- rjf: record (id -> entry points), so that we know custom entry points for this PID
CTRL_EntityCtxRWStore *entity_ctx_rw_store = ctrl_state->ctrl_thread_entity_store;
OS_MutexScopeW(ctrl_state->ctrl_thread_entity_ctx_rw_mutex)
MutexScopeW(ctrl_state->ctrl_thread_entity_ctx_rw_mutex)
{
for(String8Node *n = msg->entry_points.first; n != 0; n = n->next)
{
@@ -6869,7 +6869,7 @@ internal B32
ctrl_u2ms_enqueue_req(HS_Key key, CTRL_Handle process, Rng1U64 vaddr_range, B32 zero_terminated, U64 endt_us)
{
B32 good = 0;
OS_MutexScope(ctrl_state->u2ms_ring_mutex) for(;;)
MutexScope(ctrl_state->u2ms_ring_mutex) for(;;)
{
U64 unconsumed_size = ctrl_state->u2ms_ring_write_pos-ctrl_state->u2ms_ring_read_pos;
U64 available_size = ctrl_state->u2ms_ring_size-unconsumed_size;
@@ -6892,7 +6892,7 @@ ctrl_u2ms_enqueue_req(HS_Key key, CTRL_Handle process, Rng1U64 vaddr_range, B32
internal void
ctrl_u2ms_dequeue_req(HS_Key *out_key, CTRL_Handle *out_process, Rng1U64 *out_vaddr_range, B32 *out_zero_terminated)
{
OS_MutexScope(ctrl_state->u2ms_ring_mutex) for(;;)
MutexScope(ctrl_state->u2ms_ring_mutex) for(;;)
{
U64 unconsumed_size = ctrl_state->u2ms_ring_write_pos-ctrl_state->u2ms_ring_read_pos;
if(unconsumed_size >= sizeof(*out_key)+sizeof(*out_process)+sizeof(*out_vaddr_range)+sizeof(*out_zero_terminated))
@@ -7045,7 +7045,7 @@ ASYNC_WORK_DEF(ctrl_mem_stream_work)
}
//- rjf: commit new info to cache
OS_MutexScopeW(process_stripe->rw_mutex)
MutexScopeW(process_stripe->rw_mutex)
{
for(CTRL_ProcessMemoryCacheNode *n = process_slot->first; n != 0; n = n->next)
{
@@ -7093,7 +7093,7 @@ internal B32
ctrl_u2csb_enqueue_req(CTRL_Handle thread, U64 endt_us)
{
B32 good = 0;
OS_MutexScope(ctrl_state->u2csb_ring_mutex) for(;;)
MutexScope(ctrl_state->u2csb_ring_mutex) for(;;)
{
U64 unconsumed_size = ctrl_state->u2csb_ring_write_pos - ctrl_state->u2csb_ring_read_pos;
U64 available_size = ctrl_state->u2csb_ring_size - unconsumed_size;
@@ -7119,7 +7119,7 @@ ctrl_u2csb_enqueue_req(CTRL_Handle thread, U64 endt_us)
internal void
ctrl_u2csb_dequeue_req(CTRL_Handle *out_thread)
{
OS_MutexScope(ctrl_state->u2csb_ring_mutex) for(;;)
MutexScope(ctrl_state->u2csb_ring_mutex) for(;;)
{
U64 unconsumed_size = ctrl_state->u2csb_ring_write_pos - ctrl_state->u2csb_ring_read_pos;
if(unconsumed_size >= sizeof(*out_thread))
@@ -7150,7 +7150,7 @@ ASYNC_WORK_DEF(ctrl_call_stack_build_work)
//- rjf: produce mini entity context for just this process
CTRL_EntityCtx *entity_ctx = push_array(scratch.arena, CTRL_EntityCtx, 1);
OS_MutexScopeR(ctrl_state->ctrl_thread_entity_ctx_rw_mutex)
MutexScopeR(ctrl_state->ctrl_thread_entity_ctx_rw_mutex)
{
CTRL_EntityCtx *src_ctx = &ctrl_state->ctrl_thread_entity_store->ctx;
CTRL_EntityCtx *dst_ctx = entity_ctx;
@@ -7263,7 +7263,7 @@ ASYNC_WORK_DEF(ctrl_call_stack_build_work)
{
B32 found = 0;
B32 committed = 0;
OS_MutexScopeW(stripe->rw_mutex) for(;;)
MutexScopeW(stripe->rw_mutex) for(;;)
{
// rjf: try to find node & commit
for(CTRL_CallStackCacheNode *n = slot->first; n != 0; n = n->next)
@@ -7311,7 +7311,7 @@ ASYNC_WORK_DEF(ctrl_call_stack_build_work)
}
//- rjf: mark work as done
OS_MutexScopeW(stripe->rw_mutex) for(CTRL_CallStackCacheNode *n = slot->first; n != 0; n = n->next)
MutexScopeW(stripe->rw_mutex) for(CTRL_CallStackCacheNode *n = slot->first; n != 0; n = n->next)
{
if(ctrl_handle_match(n->thread, thread_handle))
{
@@ -7345,7 +7345,7 @@ ASYNC_WORK_DEF(ctrl_call_stack_tree_build_work)
CTRL_Handle *threads = 0;
CTRL_Handle *threads_processes = 0;
Arch *threads_arches = 0;
OS_MutexScopeR(ctrl_state->ctrl_thread_entity_ctx_rw_mutex)
MutexScopeR(ctrl_state->ctrl_thread_entity_ctx_rw_mutex)
{
CTRL_EntityCtx *ctx = &ctrl_state->ctrl_thread_entity_store->ctx;
CTRL_EntityArray thread_entities = ctrl_entity_array_from_kind(ctx, CTRL_EntityKind_Thread);
@@ -7434,7 +7434,7 @@ ASYNC_WORK_DEF(ctrl_call_stack_tree_build_work)
CTRL_CallStackTreeCache *cache = &ctrl_state->call_stack_tree_cache;
if(tree.root != &ctrl_call_stack_tree_node_nil)
{
OS_MutexScopeW(cache->rw_mutex) for(;;)
MutexScopeW(cache->rw_mutex) for(;;)
{
if(cache->scope_touch_count == 0)
{