diff --git a/src/async/async.c b/src/async/async.c index 67420618..9548b577 100644 --- a/src/async/async.c +++ b/src/async/async.c @@ -37,20 +37,32 @@ async_thread_count(void) internal B32 async_push_work_(ASYNC_WorkFunctionType *work_function, ASYNC_WorkParams *params) { - B32 result = 0; + // rjf: build work package ASYNC_Work work = {0}; work.work_function = work_function; work.input = params->input; work.output = params->output; work.semaphore = params->semaphore; work.completion_counter = params->completion_counter; + + // rjf: loop; try to write into user -> writer ring buffer. if we're on a + // worker thread, determine if we need to execute this task locally on this + // thread, and skip ring buffer if so. + B32 queued_in_ring_buffer = 0; + B32 need_to_execute_on_this_thread = 0; OS_MutexScope(async_shared->u2w_ring_mutex) for(;;) { + U64 num_available_work_threads = (async_shared->work_threads_count - ins_atomic_u64_eval(&async_shared->work_threads_live_count)); + if(num_available_work_threads == 0 && async_work_thread_depth > 0) + { + need_to_execute_on_this_thread = 1; + break; + } U64 unconsumed_size = async_shared->u2w_ring_write_pos - async_shared->u2w_ring_read_pos; U64 available_size = async_shared->u2w_ring_size - unconsumed_size; if(available_size >= sizeof(work)) { - result = 1; + queued_in_ring_buffer = 1; if(!os_handle_match(params->semaphore, os_handle_zero())) { os_semaphore_take(params->semaphore, max_U64); @@ -64,10 +76,23 @@ async_push_work_(ASYNC_WorkFunctionType *work_function, ASYNC_WorkParams *params } os_condition_variable_wait(async_shared->u2w_ring_cv, async_shared->u2w_ring_mutex, params->endt_us); } - if(result) + + // rjf: broadcast ring buffer cv if we wrote successfully + if(queued_in_ring_buffer) { os_condition_variable_broadcast(async_shared->u2w_ring_cv); } + + // rjf: if we did not queue successfully, and we have determined that + // we need to execute this work on the current thread, then execute the + // work before returning + if(need_to_execute_on_this_thread) + { + async_execute_work(work); + } + + // rjf: return success signal + B32 result = (queued_in_ring_buffer || need_to_execute_on_this_thread); return result; } @@ -114,6 +139,54 @@ async_task_join(ASYNC_Task *task) return result; } +//////////////////////////////// +//~ rjf: Work Execution + +internal ASYNC_Work +async_pop_work(void) +{ + ASYNC_Work work = {0}; + OS_MutexScope(async_shared->u2w_ring_mutex) for(;;) + { + U64 unconsumed_size = async_shared->u2w_ring_write_pos - async_shared->u2w_ring_read_pos; + if(unconsumed_size >= sizeof(work)) + { + async_shared->u2w_ring_read_pos += ring_read_struct(async_shared->u2w_ring_base, async_shared->u2w_ring_size, async_shared->u2w_ring_read_pos, &work); + break; + } + os_condition_variable_wait(async_shared->u2w_ring_cv, async_shared->u2w_ring_mutex, max_U64); + } + os_condition_variable_broadcast(async_shared->u2w_ring_cv); + return work; +} + +internal void +async_execute_work(ASYNC_Work work) +{ + //- rjf: run work + async_work_thread_depth += 1; + void *work_out = work.work_function(async_work_thread_idx, work.input); + async_work_thread_depth -= 1; + + //- rjf: store output + if(work.output != 0) + { + ins_atomic_u64_eval_assign((U64 *)work.output, (U64)work_out); + } + + //- rjf: release semaphore + if(!os_handle_match(work.semaphore, os_handle_zero())) + { + os_semaphore_drop(work.semaphore); + } + + //- rjf: increment completion counter + if(work.completion_counter != 0) + { + ins_atomic_u64_inc_eval(work.completion_counter); + } +} + //////////////////////////////// //~ rjf: Work Thread Entry Point @@ -122,41 +195,12 @@ async_work_thread__entry_point(void *p) { U64 thread_idx = (U64)p; ThreadNameF("[async] work thread #%I64u", thread_idx); + async_work_thread_idx = thread_idx; for(;;) { - //- rjf: grab next work - ASYNC_Work work = {0}; - OS_MutexScope(async_shared->u2w_ring_mutex) for(;;) - { - U64 unconsumed_size = async_shared->u2w_ring_write_pos - async_shared->u2w_ring_read_pos; - if(unconsumed_size >= sizeof(work)) - { - async_shared->u2w_ring_read_pos += ring_read_struct(async_shared->u2w_ring_base, async_shared->u2w_ring_size, async_shared->u2w_ring_read_pos, &work); - break; - } - os_condition_variable_wait(async_shared->u2w_ring_cv, async_shared->u2w_ring_mutex, max_U64); - } - os_condition_variable_broadcast(async_shared->u2w_ring_cv); - - //- rjf: run work - void *work_out = work.work_function(thread_idx, work.input); - - //- rjf: store output - if(work.output != 0) - { - ins_atomic_u64_eval_assign((U64 *)work.output, (U64)work_out); - } - - //- rjf: release semaphore - if(!os_handle_match(work.semaphore, os_handle_zero())) - { - os_semaphore_drop(work.semaphore); - } - - //- rjf: increment completion counter - if(work.completion_counter != 0) - { - ins_atomic_u64_inc_eval(work.completion_counter); - } + ASYNC_Work work = async_pop_work(); + ins_atomic_u64_inc_eval(&async_shared->work_threads_live_count); + async_execute_work(work); + ins_atomic_u64_dec_eval(&async_shared->work_threads_live_count); } } diff --git a/src/async/async.h b/src/async/async.h index baff742e..5f2b76d0 100644 --- a/src/async/async.h +++ b/src/async/async.h @@ -78,11 +78,14 @@ struct ASYNC_Shared // rjf: work threads OS_Handle *work_threads; U64 work_threads_count; + U64 work_threads_live_count; }; //////////////////////////////// //~ rjf: Globals +thread_static B32 async_work_thread_depth = 0; +thread_static U64 async_work_thread_idx = 0; global ASYNC_Shared *async_shared = 0; //////////////////////////////// @@ -110,6 +113,12 @@ internal ASYNC_Task *async_task_launch_(Arena *arena, ASYNC_WorkFunctionType *wo internal void *async_task_join(ASYNC_Task *task); #define async_task_join_struct(task, T) (T *)async_task_join(task) +//////////////////////////////// +//~ rjf: Work Execution + +internal ASYNC_Work async_pop_work(void); +internal void async_execute_work(ASYNC_Work work); + //////////////////////////////// //~ rjf: Work Thread Entry Point