implement 'work stealing' mechanism for falling back to synchronous task execution, if all work threads are taken

This commit is contained in:
Ryan Fleury
2024-11-01 16:41:43 -07:00
parent 5a0efe1261
commit 61e7aaadeb
2 changed files with 90 additions and 37 deletions
+81 -37
View File
@@ -37,20 +37,32 @@ async_thread_count(void)
internal B32
async_push_work_(ASYNC_WorkFunctionType *work_function, ASYNC_WorkParams *params)
{
B32 result = 0;
// rjf: build work package
ASYNC_Work work = {0};
work.work_function = work_function;
work.input = params->input;
work.output = params->output;
work.semaphore = params->semaphore;
work.completion_counter = params->completion_counter;
// rjf: loop; try to write into user -> writer ring buffer. if we're on a
// worker thread, determine if we need to execute this task locally on this
// thread, and skip ring buffer if so.
B32 queued_in_ring_buffer = 0;
B32 need_to_execute_on_this_thread = 0;
OS_MutexScope(async_shared->u2w_ring_mutex) for(;;)
{
U64 num_available_work_threads = (async_shared->work_threads_count - ins_atomic_u64_eval(&async_shared->work_threads_live_count));
if(num_available_work_threads == 0 && async_work_thread_depth > 0)
{
need_to_execute_on_this_thread = 1;
break;
}
U64 unconsumed_size = async_shared->u2w_ring_write_pos - async_shared->u2w_ring_read_pos;
U64 available_size = async_shared->u2w_ring_size - unconsumed_size;
if(available_size >= sizeof(work))
{
result = 1;
queued_in_ring_buffer = 1;
if(!os_handle_match(params->semaphore, os_handle_zero()))
{
os_semaphore_take(params->semaphore, max_U64);
@@ -64,10 +76,23 @@ async_push_work_(ASYNC_WorkFunctionType *work_function, ASYNC_WorkParams *params
}
os_condition_variable_wait(async_shared->u2w_ring_cv, async_shared->u2w_ring_mutex, params->endt_us);
}
if(result)
// rjf: broadcast ring buffer cv if we wrote successfully
if(queued_in_ring_buffer)
{
os_condition_variable_broadcast(async_shared->u2w_ring_cv);
}
// rjf: if we did not queue successfully, and we have determined that
// we need to execute this work on the current thread, then execute the
// work before returning
if(need_to_execute_on_this_thread)
{
async_execute_work(work);
}
// rjf: return success signal
B32 result = (queued_in_ring_buffer || need_to_execute_on_this_thread);
return result;
}
@@ -114,6 +139,54 @@ async_task_join(ASYNC_Task *task)
return result;
}
////////////////////////////////
//~ rjf: Work Execution
internal ASYNC_Work
async_pop_work(void)
{
ASYNC_Work work = {0};
OS_MutexScope(async_shared->u2w_ring_mutex) for(;;)
{
U64 unconsumed_size = async_shared->u2w_ring_write_pos - async_shared->u2w_ring_read_pos;
if(unconsumed_size >= sizeof(work))
{
async_shared->u2w_ring_read_pos += ring_read_struct(async_shared->u2w_ring_base, async_shared->u2w_ring_size, async_shared->u2w_ring_read_pos, &work);
break;
}
os_condition_variable_wait(async_shared->u2w_ring_cv, async_shared->u2w_ring_mutex, max_U64);
}
os_condition_variable_broadcast(async_shared->u2w_ring_cv);
return work;
}
internal void
async_execute_work(ASYNC_Work work)
{
//- rjf: run work
async_work_thread_depth += 1;
void *work_out = work.work_function(async_work_thread_idx, work.input);
async_work_thread_depth -= 1;
//- rjf: store output
if(work.output != 0)
{
ins_atomic_u64_eval_assign((U64 *)work.output, (U64)work_out);
}
//- rjf: release semaphore
if(!os_handle_match(work.semaphore, os_handle_zero()))
{
os_semaphore_drop(work.semaphore);
}
//- rjf: increment completion counter
if(work.completion_counter != 0)
{
ins_atomic_u64_inc_eval(work.completion_counter);
}
}
////////////////////////////////
//~ rjf: Work Thread Entry Point
@@ -122,41 +195,12 @@ async_work_thread__entry_point(void *p)
{
U64 thread_idx = (U64)p;
ThreadNameF("[async] work thread #%I64u", thread_idx);
async_work_thread_idx = thread_idx;
for(;;)
{
//- rjf: grab next work
ASYNC_Work work = {0};
OS_MutexScope(async_shared->u2w_ring_mutex) for(;;)
{
U64 unconsumed_size = async_shared->u2w_ring_write_pos - async_shared->u2w_ring_read_pos;
if(unconsumed_size >= sizeof(work))
{
async_shared->u2w_ring_read_pos += ring_read_struct(async_shared->u2w_ring_base, async_shared->u2w_ring_size, async_shared->u2w_ring_read_pos, &work);
break;
}
os_condition_variable_wait(async_shared->u2w_ring_cv, async_shared->u2w_ring_mutex, max_U64);
}
os_condition_variable_broadcast(async_shared->u2w_ring_cv);
//- rjf: run work
void *work_out = work.work_function(thread_idx, work.input);
//- rjf: store output
if(work.output != 0)
{
ins_atomic_u64_eval_assign((U64 *)work.output, (U64)work_out);
}
//- rjf: release semaphore
if(!os_handle_match(work.semaphore, os_handle_zero()))
{
os_semaphore_drop(work.semaphore);
}
//- rjf: increment completion counter
if(work.completion_counter != 0)
{
ins_atomic_u64_inc_eval(work.completion_counter);
}
ASYNC_Work work = async_pop_work();
ins_atomic_u64_inc_eval(&async_shared->work_threads_live_count);
async_execute_work(work);
ins_atomic_u64_dec_eval(&async_shared->work_threads_live_count);
}
}
+9
View File
@@ -78,11 +78,14 @@ struct ASYNC_Shared
// rjf: work threads
OS_Handle *work_threads;
U64 work_threads_count;
U64 work_threads_live_count;
};
////////////////////////////////
//~ rjf: Globals
thread_static B32 async_work_thread_depth = 0;
thread_static U64 async_work_thread_idx = 0;
global ASYNC_Shared *async_shared = 0;
////////////////////////////////
@@ -110,6 +113,12 @@ internal ASYNC_Task *async_task_launch_(Arena *arena, ASYNC_WorkFunctionType *wo
internal void *async_task_join(ASYNC_Task *task);
#define async_task_join_struct(task, T) (T *)async_task_join(task)
////////////////////////////////
//~ rjf: Work Execution
internal ASYNC_Work async_pop_work(void);
internal void async_execute_work(ASYNC_Work work);
////////////////////////////////
//~ rjf: Work Thread Entry Point