diff --git a/src/linker/thread_pool/thread_pool.c b/src/linker/thread_pool/thread_pool.c index 6399cd78..26637119 100644 --- a/src/linker/thread_pool/thread_pool.c +++ b/src/linker/thread_pool/thread_pool.c @@ -2,25 +2,27 @@ // Licensed under the MIT license (https://opensource.org/license/mit/) internal void -tp_execute_tasks(TP_Worker *worker) +tp_run_tasks(TP_Context *pool, TP_Worker *worker) { - TP_Context *pool = worker->pool; - for (;;) { - // do we have tasks? - S64 task_left_count = ins_atomic_u64_dec_eval(&pool->task_left_count); - if (task_left_count < 0) { + S64 task_left = ins_atomic_u64_dec_eval(&pool->task_left); + + // are there any tasks left to run? + if (task_left < 0) { break; } - - // invoke task - Arena *arena = pool->worker_arena ? pool->worker_arena->v[worker->id] : 0; - U64 task_id = pool->task_count - (task_left_count+1); + + // run task + Arena *arena = pool->task_arena ? pool->task_arena->v[worker->id] : 0; + U64 task_id = pool->task_count - (task_left+1); pool->task_func(arena, worker->id, task_id, pool->task_data); - // before last worker takes semaphore wake up main thread - U64 task_done_count = ins_atomic_u64_inc_eval(&pool->task_done_count); - if (task_done_count == pool->task_count) { + // cache task count so we dont touch pool memory after atomic inc + U64 task_count = pool->task_count; + + // on last task ping main thread + U64 task_done = ins_atomic_u64_inc_eval(&pool->task_done); + if (task_done == task_count) { os_semaphore_drop(pool->main_semaphore); } } @@ -29,16 +31,27 @@ tp_execute_tasks(TP_Worker *worker) internal void tp_worker_main(void *raw_worker) { - TCTX tctx_; tctx_init_and_equip(&tctx_); - + TCTX tctx_; tctx_init_and_equip(&tctx_); TP_Worker *worker = raw_worker; TP_Context *pool = worker->pool; - - while (pool->is_live) { + for (; pool->is_live; ) { if (os_semaphore_take(pool->task_semaphore, max_U64)) { - tp_execute_tasks(worker); - } else { - Assert(!"time out"); + tp_run_tasks(pool, worker); + } + } +} + +internal void +tp_worker_main_shared(void *raw_worker) +{ + TCTX tctx_; tctx_init_and_equip(&tctx_); + TP_Worker *worker = raw_worker; + TP_Context *pool = worker->pool; + for (; pool->is_live; ) { + if (os_semaphore_take(pool->exec_semaphore, max_U64)) { + if (os_semaphore_take(pool->task_semaphore, max_U64)) { + tp_run_tasks(pool, worker); + } } } } @@ -47,14 +60,32 @@ internal TP_Context * tp_alloc(Arena *arena, U32 worker_count, String8 name) { ProfBeginDynamic("Alloc Thread Pool [Worker Count: %u]", worker_count); - Assert(worker_count > 0); - - // init pool - TP_Context *pool = push_array(arena, TP_Context, 1); + AssertAlways(worker_count > 0); + + B32 is_shared = (name.size > 0); + + // alloc semaphores + OS_Handle main_semaphore = {0}; + OS_Handle task_semaphore = {0}; + OS_Handle exec_semaphore = {0}; if (worker_count > 1) { - pool->task_semaphore = os_semaphore_alloc(0, worker_count - 1, name); - pool->main_semaphore = os_semaphore_alloc(0, 1, str8(0,0)); + main_semaphore = os_semaphore_alloc(0, 1, str8_zero()); + if (is_shared) { + task_semaphore = os_semaphore_alloc(0, worker_count, name); + exec_semaphore = os_semaphore_alloc(0, worker_count, str8_zero()); + } else { + task_semaphore = os_semaphore_alloc(0, worker_count, str8_zero()); + } } + + // pick entry point for the workers + void *worker_entry = is_shared ? tp_worker_main_shared : tp_worker_main; + + // init pool + TP_Context *pool = push_array(arena, TP_Context, 1); + pool->exec_semaphore = exec_semaphore; + pool->task_semaphore = task_semaphore; + pool->main_semaphore = main_semaphore; pool->is_live = 1; pool->worker_count = worker_count; pool->worker_arr = push_array(arena, TP_Worker, worker_count); @@ -62,14 +93,14 @@ tp_alloc(Arena *arena, U32 worker_count, String8 name) // init worker data for (U64 i = 0; i < worker_count; i += 1) { TP_Worker *worker = &pool->worker_arr[i]; - worker->id = i; - worker->pool = pool; + worker->id = i; + worker->pool = pool; } // launch worker threads for (U64 i = 1; i < worker_count; i += 1) { TP_Worker *worker = &pool->worker_arr[i]; - worker->handle = os_thread_launch(tp_worker_main, worker, 0); + worker->handle = os_thread_launch(worker_entry, worker, 0); } ProfEnd(); @@ -80,11 +111,25 @@ internal void tp_release(TP_Context *pool) { pool->is_live = 0; - os_semaphore_release(pool->task_semaphore); - os_semaphore_release(pool->main_semaphore); + + B32 is_shared = !os_handle_match(pool->exec_semaphore, os_handle_zero()); + if (is_shared) { + for (U64 i = 0; i < pool->worker_count; ++i) { + os_semaphore_drop(pool->exec_semaphore); + } + } + for (U64 i = 0; i < pool->worker_count; ++i) { + os_semaphore_drop(pool->task_semaphore); + } for (U64 i = 1; i < pool->worker_count; i += 1) { os_thread_detach(pool->worker_arr[i].handle); } + if (is_shared) { + os_semaphore_release(pool->exec_semaphore); + } + os_semaphore_release(pool->task_semaphore); + os_semaphore_release(pool->main_semaphore); + MemoryZeroStruct(pool); } @@ -151,27 +196,33 @@ tp_temp_end(TP_Temp temp) } internal void -tp_for_parallel(TP_Context *pool, TP_Arena *arena, U64 task_count, TP_TaskFunc *task_func, void *task_data) +tp_for_parallel(TP_Context *pool, TP_Arena *task_arena, U64 task_count, TP_TaskFunc *task_func, void *task_data) { if (task_count > 0) { - Assert(!arena || arena->count == pool->worker_count); + // init run + pool->task_arena = task_arena; + pool->task_func = task_func; + pool->task_data = task_data; + pool->task_count = task_count; + pool->task_done = 0; + ins_atomic_u64_eval_assign(&pool->task_left, task_count); - // init context - pool->worker_arena = arena; - pool->task_count = task_count; - pool->task_func = task_func; - pool->task_data = task_data; - pool->task_done_count = 0; - pool->task_left_count = task_count; + U64 drop_count = Min(task_count, pool->worker_count); + + // if we are in shared mode ping local semaphore + if (!os_handle_match(pool->exec_semaphore, os_handle_zero())) { + for (U64 worker_idx = 0; worker_idx < drop_count; worker_idx +=1) { + os_semaphore_drop(pool->exec_semaphore); + } + } - // wake up workers - for (U64 worker_idx = 1; worker_idx < pool->worker_count; worker_idx += 1) { + // ping shared semaphore + for (U64 worker_idx = 0; worker_idx < drop_count; worker_idx += 1) { os_semaphore_drop(pool->task_semaphore); } - // execute tasks on main worker - TP_Worker *main_worker = &pool->worker_arr[0]; - tp_execute_tasks(main_worker); + // run tasks on main worker + tp_run_tasks(pool, &pool->worker_arr[0]); // wait for workers to finish tasks os_semaphore_take(pool->main_semaphore, max_U64); diff --git a/src/linker/thread_pool/thread_pool.h b/src/linker/thread_pool/thread_pool.h index 0c31913b..78aef33b 100644 --- a/src/linker/thread_pool/thread_pool.h +++ b/src/linker/thread_pool/thread_pool.h @@ -27,17 +27,20 @@ typedef struct TP_Worker typedef struct TP_Context { + B32 is_live; + OS_Handle exec_semaphore; OS_Handle task_semaphore; OS_Handle main_semaphore; - B32 is_live; + U32 worker_count; TP_Worker *worker_arr; - TP_Arena *worker_arena; - U64 task_count; + + TP_Arena *task_arena; TP_TaskFunc *task_func; void *task_data; - volatile S64 task_left_count; - volatile U64 task_done_count; + U64 task_count; + U64 task_done; + S64 task_left; } TP_Context; internal TP_Context * tp_alloc(Arena *arena, U32 worker_count, String8 name);