From 15459edc75744067de443640715e29a6e63b60a7 Mon Sep 17 00:00:00 2001 From: Ryan Fleury Date: Tue, 20 May 2025 13:03:58 -0700 Subject: [PATCH] adjust wait timeout properties of ctrl process memory cache grab; don't wait forever to submit, don't wait if work was completed even if results bad, etc. --- src/async/async.c | 2 -- src/ctrl/ctrl_core.c | 47 ++++++++++++++++++++++++++++++++++++++------ 2 files changed, 41 insertions(+), 8 deletions(-) diff --git a/src/async/async.c b/src/async/async.c index 2cf0f666..ec84c372 100644 --- a/src/async/async.c +++ b/src/async/async.c @@ -164,7 +164,6 @@ async_task_join(ASYNC_Task *task) internal ASYNC_Work async_pop_work(void) { - ProfBeginFunction(); ASYNC_Work work = {0}; B32 done = 0; ASYNC_Priority taken_priority = ASYNC_Priority_Low; @@ -199,7 +198,6 @@ async_pop_work(void) } os_condition_variable_broadcast(async_shared->ring_cv); os_condition_variable_broadcast(async_shared->rings[taken_priority].ring_cv); - ProfEnd(); return work; } diff --git a/src/ctrl/ctrl_core.c b/src/ctrl/ctrl_core.c index eebc8fb5..e3ac5773 100644 --- a/src/ctrl/ctrl_core.c +++ b/src/ctrl/ctrl_core.c @@ -1652,11 +1652,13 @@ ctrl_key_from_process_vaddr_range(CTRL_Handle process, Rng1U64 vaddr_range, B32 //- rjf: loop: try to look for current results, request if not there, wait if we can, repeat until we can't U64 mem_gen = ctrl_mem_gen(); B32 key_is_stale = 0; + B32 requested = 0; for(;;) { //- rjf: step 1: [read-only] try to look for current results for key's ID B32 id_exists = 0; B32 id_stale = 0; + B32 id_working = 0; OS_MutexScopeR(process_stripe->rw_mutex) { for(CTRL_ProcessMemoryCacheNode *process_n = process_slot->first; process_n != 0; process_n = process_n->next) @@ -1671,6 +1673,7 @@ ctrl_key_from_process_vaddr_range(CTRL_Handle process, Rng1U64 vaddr_range, B32 { id_exists = 1; id_stale = (n->mem_gen < mem_gen); + id_working = (ins_atomic_u64_eval(&n->working_count) != 0); goto end_fast_lookup; } } @@ -1691,7 +1694,7 @@ ctrl_key_from_process_vaddr_range(CTRL_Handle process, Rng1U64 vaddr_range, B32 //- rjf: step 3: if the ID does not exist in the process' cache, then we // need to build a node for it. if that, or if the ID is stale, then also // request that that range is streamed. - if(!id_exists || (id_exists && id_stale)) + if(!id_exists || (id_exists && id_stale && !id_working)) { B32 node_needs_stream = 0; U64 *node_working_count = 0; @@ -1719,12 +1722,16 @@ ctrl_key_from_process_vaddr_range(CTRL_Handle process, Rng1U64 vaddr_range, B32 range_n->vaddr_range = vaddr_range; range_n->zero_terminated = zero_terminated; range_n->id = id; - range_n->working_count += 1; + ins_atomic_u64_inc_eval(&range_n->working_count); node_needs_stream = 1; } else { node_needs_stream = (range_n->mem_gen < mem_gen); + if(node_needs_stream) + { + ins_atomic_u64_inc_eval(&range_n->working_count); + } } node_working_count = &range_n->working_count; break; @@ -1733,17 +1740,45 @@ ctrl_key_from_process_vaddr_range(CTRL_Handle process, Rng1U64 vaddr_range, B32 } if(node_needs_stream) { - ctrl_u2ms_enqueue_req(key, process, vaddr_range, zero_terminated, max_U64); - async_push_work(ctrl_mem_stream_work, .working_counter = node_working_count); + if(ctrl_u2ms_enqueue_req(key, process, vaddr_range, zero_terminated, endt_us)) + { + async_push_work(ctrl_mem_stream_work, .working_counter = node_working_count); + requested = 1; + } + else OS_MutexScopeR(process_stripe->rw_mutex) + { + for(CTRL_ProcessMemoryCacheNode *process_n = process_slot->first; process_n != 0; process_n = process_n->next) + { + if(ctrl_handle_match(process_n->handle, process)) + { + U64 range_slot_idx = range_hash%process_n->range_hash_slots_count; + CTRL_ProcessMemoryRangeHashSlot *range_slot = &process_n->range_hash_slots[range_slot_idx]; + for(CTRL_ProcessMemoryRangeHashNode *n = range_slot->first; n != 0; n = n->next) + { + if(hs_id_match(n->id, id)) + { + ins_atomic_u64_dec_eval(&n->working_count); + goto end_fail_work; + } + } + } + } + end_fail_work:; + } } } - //- rjf: step 4: if we have no time to wait, then abort; otherwise, - // wait on this process' stripe + //- rjf: step 4: if we have no time to wait, then abort; if we submitted a + // request, but the work is done and we have no results, then abort; + // otherwise, wait on this process' stripe if(os_now_microseconds() >= endt_us) { break; } + else if(!id_working && requested) + { + break; + } else OS_MutexScopeR(process_stripe->rw_mutex) { os_condition_variable_wait_rw_r(process_stripe->cv, process_stripe->rw_mutex, endt_us);